Using FastMessage
FastMessage is an extension package for MessageFlux.
It is an easy way to create a PipelineHandlerBase and put it in a PipelineService
Limitations
With FastMessage you can register only a single callback for each input device,
meaning the service will expect a single type (schema) of messages for each input device.
Multiple callbacks can send messages to the same output device.
Also - Positional Only Arguments are not allowed on the callbacks
Examples
Creating a FastMessage
from pydantic import BaseModel
from fastmessage import FastMessage
fm = FastMessage(default_output_device='output')
@fm.map()
def do_something(x: int, y: str):
pass # do something with x and y
@fm.map()
async def do_something_async(x: int, y: str):
pass # do something with x and y asynchronously
class SomeModel(BaseModel):
x: int
y: str
@fm.map()
def do_something_else(m: SomeModel, a: int):
return "some_value" # do somthing with m and a
Creating the service runner
from messageflux.iodevices.rabbitmq.rabbitmq_input_device import RabbitMQInputDeviceManager
from messageflux.iodevices.rabbitmq.rabbitmq_output_device import RabbitMQOutputDeviceManager
from fastmessage import FastMessage
fm = FastMessage()
# Code for creating FastMessage here...
input_device_manager = RabbitMQInputDeviceManager(hosts='my.rabbit.host',
user='username',
password='password')
output_device_manager = RabbitMQOutputDeviceManager(hosts='my.rabbit.host',
user='username',
password='password')
service = fm.create_service(input_device_manager=input_device_manager,
output_device_manager=output_device_manager)
service.start()
Root Model
You can use a callback with a single param named __root__ in order to mark it as the root type for the message.
in that case, the message will not expect the param name, and instead will expect the root model as type
from pydantic import BaseModel
from fastmessage import FastMessage
fm = FastMessage()
class SomeModel(BaseModel):
x: int
y: str
@fm.map(input_device='some_other_queue')
def process_somemodel(__root__: SomeModel):
pass # this method will expect messages with the SomeModel schema ({"x":1, "y":"some string"})
Special Param Types
There are special types which you can annotate the arguments for the callback with.
InputDeviceName- arguments annotated with this type will receive the name of the input device the message came from. useful for registering the same callback for several input devicesMessage- arguments annotated with this type will receive the raw message which came from the deviceMessageBundle- arguments annotated with this type will receive the complete MessageBundle (with device headers)MethodValidator- argument of this type, will receive an object that can help validate return values for other methods Notice that arguments annotated with these types MUST NOT have default values (Since they always have values).
from fastmessage import FastMessage, InputDeviceName, MethodValidator
from messageflux.iodevices.base import Message, MessageBundle
fm = FastMessage()
@fm.map(input_device='some_queue')
def do_something(i: InputDeviceName, m: Message, mb: MessageBundle, x: int):
# i will be 'some_queue'
# m will be the message that arrived
# mb will be the MessageBundle that arrived
# x will be the serialized value of the message
pass # do something
@fm.map()
def func1(mv: MethodValidator):
yield mv.validate_and_return(func2, x=3, y="hello") # this will succeed
yield mv.validate_and_return(func2,
x=4) # this will raise MethodValidationError because y param is required but missing
@fm.map()
def func2(x: int, y: str):
pass
Returning Multiple Results
You can make the function return multiple results, where each one is serialized as its own message to the output queue.
All you have to do, is return a MultipleReturnValues (which is a List), and each item will be serialized as
its own output message
you can also use yield to return results as a generator (each result will be sent to output device before
continuing the iteration)
from fastmessage import FastMessage, MultipleReturnValues
fm = FastMessage(default_output_device="output_queue")
@fm.map()
def do_something(x: int):
return MultipleReturnValues([1, 'b', 3]) # will create 3 output messages, one for each item
@fm.map()
def do_something_generator(x: int): # this method does the same as the previous, but in a generator fashion
yield 1
yield 'b'
yield 3
Returning Result to a different output device
You can make the function return a result to a different output device then the one in the decorator
You do this by using the CustomOutput class, and giving it the value to send, and the output device name to
send to
from fastmessage import FastMessage, CustomOutput
fm = FastMessage()
@fm.map(input_device='some_queue', output_device='default_output_device')
def do_something(x: int):
return CustomOutput(value=1,
output_device='other_output_device') # this will send the value 1 to 'other_output_device' instead of the default
Returning Result to another method
You can make the function return a result to another mapped method, while validating its values BEFORE sending the output to the destination queue
you do this by returning OtherMethodOutput class, that receives the callable as its first parameter, and the
arguments as kwargs:
from fastmessage import FastMessage, OtherMethodOutput
fm = FastMessage()
@fm.map()
def func1():
yield OtherMethodOutput(func2, x=3, y="hello") # this will succeed
yield OtherMethodOutput(func2, x=4) # this will raise MethodValidationError because y param is required but missing
@fm.map()
def func2(x: int, y: str):
pass