Sending¶
Sending can be achieved by yielding a message to send:
from collections.abc import AsyncGenerator
from typing import Any
from asyncfast import AsyncFast
app = AsyncFast()
@app.channel("input_channel")
async def input_channel_handler() -> AsyncGenerator[dict[str, Any], None]:
yield {"address": "output_channel", "payload": b"Hello", "headers": [(b"Id", b"1")]}
{
"asyncapi": "3.0.0",
"info": {
"title": "AsyncFast",
"version": "0.1.0"
},
"channels": {
"InputChannelHandler": {
"address": "input_channel",
"messages": {
"InputChannelHandlerMessage": {
"$ref": "#/components/messages/InputChannelHandlerMessage"
}
}
}
},
"operations": {
"receiveInputChannelHandler": {
"action": "receive",
"channel": {
"$ref": "#/channels/InputChannelHandler"
}
}
},
"components": {
"messages": {
"InputChannelHandlerMessage": {}
}
}
}
The most dynamic method is to use a dict, with the dict form being:
- address (
str): The address to send the message
- payload (
bytes | None): The payload of the message to send
- headers (
list[tuple[bytes, bytes]]): The headers to send with the message
While this is useful, the type cannot be inferred for documentation generation. To do this the class Message must be
used.
Message¶
The Message class uses the classes annotations to discover the types of a message. The equivalent of the above
example would be:
from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Annotated
from asyncfast import AsyncFast
from asyncfast import Header
from asyncfast import Message
app = AsyncFast()
@dataclass
class OutputMessage(Message, address="output_channel"):
id: Annotated[int, Header()]
payload: str
@app.channel("input_channel")
async def input_channel_handler() -> AsyncGenerator[OutputMessage, None]:
yield OutputMessage(id=1, payload="Hello")
{
"asyncapi": "3.0.0",
"info": {
"title": "AsyncFast",
"version": "0.1.0"
},
"channels": {
"InputChannelHandler": {
"address": "input_channel",
"messages": {
"InputChannelHandlerMessage": {
"$ref": "#/components/messages/InputChannelHandlerMessage"
}
}
},
"OutputMessage": {
"address": "output_channel",
"messages": {
"OutputMessage": {
"$ref": "#/components/messages/OutputMessage"
}
}
}
},
"operations": {
"receiveInputChannelHandler": {
"action": "receive",
"channel": {
"$ref": "#/channels/InputChannelHandler"
}
},
"sendOutputMessage": {
"action": "send",
"channel": {
"$ref": "#/channels/OutputMessage"
}
}
},
"components": {
"messages": {
"InputChannelHandlerMessage": {},
"OutputMessage": {
"payload": {
"type": "string"
}
}
}
}
}
This follows the same rules as setting up a receiver. For example, one payload, and using annotated types for headers.
Concurrent Message Sender¶
If you want to send messages concurrently this is supported by the MessageSender class. This can be passed
into the arguments of a handler, which will allow you to send using the MessageSender.send() method:
from dataclasses import dataclass
from typing import Annotated
from asyncfast import AsyncFast
from asyncfast import Header
from asyncfast import Message
from asyncfast import MessageSender
app = AsyncFast()
@dataclass
class OutputMessage(Message, address="output_channel"):
id: Annotated[int, Header()]
payload: str
@app.channel("input_channel")
async def input_channel_handler(message_sender: MessageSender[OutputMessage]) -> None:
await message_sender.send(OutputMessage(id=1, payload="Hello"))
{
"asyncapi": "3.0.0",
"info": {
"title": "AsyncFast",
"version": "0.1.0"
},
"channels": {
"InputChannelHandler": {
"address": "input_channel",
"messages": {
"InputChannelHandlerMessage": {
"$ref": "#/components/messages/InputChannelHandlerMessage"
}
}
},
"OutputMessage": {
"address": "output_channel",
"messages": {
"OutputMessage": {
"$ref": "#/components/messages/OutputMessage"
}
}
}
},
"operations": {
"receiveInputChannelHandler": {
"action": "receive",
"channel": {
"$ref": "#/channels/InputChannelHandler"
}
},
"sendOutputMessage": {
"action": "send",
"channel": {
"$ref": "#/channels/OutputMessage"
}
}
},
"components": {
"messages": {
"InputChannelHandlerMessage": {},
"OutputMessage": {
"payload": {
"type": "string"
}
}
}
}
}
As MessageSender.send() is a coroutine you can await multiple sends:
import asyncio
from dataclasses import dataclass
from typing import Annotated
from asyncfast import AsyncFast
from asyncfast import Header
from asyncfast import Message
from asyncfast import MessageSender
app = AsyncFast()
@dataclass
class OutputMessage(Message, address="output_channel"):
id: Annotated[int, Header()]
payload: str
@app.channel("input_channel")
async def input_channel_handler(message_sender: MessageSender[OutputMessage]) -> None:
await asyncio.gather(
message_sender.send(OutputMessage(id=1, payload="Hello1")),
message_sender.send(OutputMessage(id=2, payload="Hello2")),
)
{
"asyncapi": "3.0.0",
"info": {
"title": "AsyncFast",
"version": "0.1.0"
},
"channels": {
"InputChannelHandler": {
"address": "input_channel",
"messages": {
"InputChannelHandlerMessage": {
"$ref": "#/components/messages/InputChannelHandlerMessage"
}
}
},
"OutputMessage": {
"address": "output_channel",
"messages": {
"OutputMessage": {
"$ref": "#/components/messages/OutputMessage"
}
}
}
},
"operations": {
"receiveInputChannelHandler": {
"action": "receive",
"channel": {
"$ref": "#/channels/InputChannelHandler"
}
},
"sendOutputMessage": {
"action": "send",
"channel": {
"$ref": "#/channels/OutputMessage"
}
}
},
"components": {
"messages": {
"InputChannelHandlerMessage": {},
"OutputMessage": {
"payload": {
"type": "string"
}
}
}
}
}