Skip to content

Using FastMessage

FastMessage is an extension package for MessageFlux. It is an easy way to create a PipelineHandlerBase and put it in a PipelineService

Limitations

With FastMessage you can register only a single callback for each input device, meaning the service will expect a single type (schema) of messages for each input device.

Multiple callbacks can send messages to the same output device.

Also - Positional Only Arguments are not allowed on the callbacks

Examples

Creating a FastMessage

from pydantic import BaseModel

from fastmessage import FastMessage

fm = FastMessage(default_output_device='output')


@fm.map()
def do_something(x: int, y: str):
    pass  # do something with x and y


@fm.map()
async def do_something_async(x: int, y: str):
    pass  # do something with x and y asynchronously


class SomeModel(BaseModel):
    x: int
    y: str


@fm.map()
def do_something_else(m: SomeModel, a: int):
    return "some_value"  # do somthing with m and a

Creating the service runner

from messageflux.iodevices.rabbitmq.rabbitmq_input_device import RabbitMQInputDeviceManager
from messageflux.iodevices.rabbitmq.rabbitmq_output_device import RabbitMQOutputDeviceManager

from fastmessage import FastMessage

fm = FastMessage()

# Code for creating FastMessage here...


input_device_manager = RabbitMQInputDeviceManager(hosts='my.rabbit.host',
                                                  user='username',
                                                  password='password')

output_device_manager = RabbitMQOutputDeviceManager(hosts='my.rabbit.host',
                                                    user='username',
                                                    password='password')

service = fm.create_service(input_device_manager=input_device_manager,
                            output_device_manager=output_device_manager)

service.start()

Root Model

You can use a callback with a single param named __root__ in order to mark it as the root type for the message. in that case, the message will not expect the param name, and instead will expect the root model as type

from pydantic import BaseModel

from fastmessage import FastMessage

fm = FastMessage()


class SomeModel(BaseModel):
    x: int
    y: str


@fm.map(input_device='some_other_queue')
def process_somemodel(__root__: SomeModel):
    pass  # this method will expect messages with the SomeModel schema ({"x":1, "y":"some string"})  

Special Param Types

There are special types which you can annotate the arguments for the callback with.

  • InputDeviceName - arguments annotated with this type will receive the name of the input device the message came from. useful for registering the same callback for several input devices
  • Message - arguments annotated with this type will receive the raw message which came from the device
  • MessageBundle - arguments annotated with this type will receive the complete MessageBundle (with device headers)
  • MethodValidator - argument of this type, will receive an object that can help validate return values for other methods Notice that arguments annotated with these types MUST NOT have default values (Since they always have values).
from fastmessage import FastMessage, InputDeviceName, MethodValidator
from messageflux.iodevices.base import Message, MessageBundle

fm = FastMessage()


@fm.map(input_device='some_queue')
def do_something(i: InputDeviceName, m: Message, mb: MessageBundle, x: int):
    # i will be 'some_queue'
    # m will be the message that arrived
    # mb will be the MessageBundle that arrived
    # x will be the serialized value of the message
    pass  # do something


@fm.map()
def func1(mv: MethodValidator):
    yield mv.validate_and_return(func2, x=3, y="hello")  # this will succeed
    yield mv.validate_and_return(func2,
                                 x=4)  # this will raise MethodValidationError because y param is required but missing


@fm.map()
def func2(x: int, y: str):
    pass

Returning Multiple Results

You can make the function return multiple results, where each one is serialized as its own message to the output queue.

All you have to do, is return a MultipleReturnValues (which is a List), and each item will be serialized as its own output message

you can also use yield to return results as a generator (each result will be sent to output device before continuing the iteration)

from fastmessage import FastMessage, MultipleReturnValues

fm = FastMessage(default_output_device="output_queue")


@fm.map()
def do_something(x: int):
    return MultipleReturnValues([1, 'b', 3])  # will create 3 output messages, one for each item


@fm.map()
def do_something_generator(x: int):  # this method does the same as the previous, but in a generator fashion
    yield 1
    yield 'b'
    yield 3

Returning Result to a different output device

You can make the function return a result to a different output device then the one in the decorator

You do this by using the CustomOutput class, and giving it the value to send, and the output device name to send to

from fastmessage import FastMessage, CustomOutput

fm = FastMessage()


@fm.map(input_device='some_queue', output_device='default_output_device')
def do_something(x: int):
    return CustomOutput(value=1,
                        output_device='other_output_device')  # this will send the value 1 to 'other_output_device' instead of the default

Returning Result to another method

You can make the function return a result to another mapped method, while validating its values BEFORE sending the output to the destination queue

you do this by returning OtherMethodOutput class, that receives the callable as its first parameter, and the arguments as kwargs:

from fastmessage import FastMessage, OtherMethodOutput

fm = FastMessage()


@fm.map()
def func1():
    yield OtherMethodOutput(func2, x=3, y="hello")  # this will succeed
    yield OtherMethodOutput(func2, x=4)  # this will raise MethodValidationError because y param is required but missing


@fm.map()
def func2(x: int, y: str):
    pass