Sending

Sending can be achieved by yielding a message to send:

from collections.abc import AsyncGenerator
from typing import Any

from asyncfast import AsyncFast

app = AsyncFast()


@app.channel("input_channel")
async def input_channel_handler() -> AsyncGenerator[dict[str, Any], None]:
    yield {"address": "output_channel", "payload": b"Hello", "headers": [(b"Id", b"1")]}
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "InputChannelHandler": {
      "address": "input_channel",
      "messages": {
        "InputChannelHandlerMessage": {
          "$ref": "#/components/messages/InputChannelHandlerMessage"
        }
      }
    }
  },
  "operations": {
    "receiveInputChannelHandler": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/InputChannelHandler"
      }
    }
  },
  "components": {
    "messages": {
      "InputChannelHandlerMessage": {}
    }
  }
}

The most dynamic method is to use a dict, with the dict form being:

address (str):

The address to send the message

payload (bytes | None):

The payload of the message to send

headers (list[tuple[bytes, bytes]]):

The headers to send with the message

While this is useful, the type cannot be inferred for documentation generation. To do this the class Message must be used.

Message

The Message class uses the classes annotations to discover the types of a message. The equivalent of the above example would be:

from collections.abc import AsyncGenerator
from dataclasses import dataclass
from typing import Annotated

from asyncfast import AsyncFast
from asyncfast import Header
from asyncfast import Message

app = AsyncFast()


@dataclass
class OutputMessage(Message, address="output_channel"):
    id: Annotated[int, Header()]
    payload: str


@app.channel("input_channel")
async def input_channel_handler() -> AsyncGenerator[OutputMessage, None]:
    yield OutputMessage(id=1, payload="Hello")
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "InputChannelHandler": {
      "address": "input_channel",
      "messages": {
        "InputChannelHandlerMessage": {
          "$ref": "#/components/messages/InputChannelHandlerMessage"
        }
      }
    },
    "OutputMessage": {
      "address": "output_channel",
      "messages": {
        "OutputMessage": {
          "$ref": "#/components/messages/OutputMessage"
        }
      }
    }
  },
  "operations": {
    "receiveInputChannelHandler": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/InputChannelHandler"
      }
    },
    "sendOutputMessage": {
      "action": "send",
      "channel": {
        "$ref": "#/channels/OutputMessage"
      }
    }
  },
  "components": {
    "messages": {
      "InputChannelHandlerMessage": {},
      "OutputMessage": {
        "payload": {
          "type": "string"
        }
      }
    }
  }
}

This follows the same rules as setting up a receiver. For example, one payload, and using annotated types for headers.

Concurrent Message Sender

If you want to send messages concurrently this is supported by the MessageSender class. This can be passed into the arguments of a handler, which will allow you to send using the MessageSender.send() method:

from dataclasses import dataclass
from typing import Annotated

from asyncfast import AsyncFast
from asyncfast import Header
from asyncfast import Message
from asyncfast import MessageSender

app = AsyncFast()


@dataclass
class OutputMessage(Message, address="output_channel"):
    id: Annotated[int, Header()]
    payload: str


@app.channel("input_channel")
async def input_channel_handler(message_sender: MessageSender[OutputMessage]) -> None:
    await message_sender.send(OutputMessage(id=1, payload="Hello"))
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "InputChannelHandler": {
      "address": "input_channel",
      "messages": {
        "InputChannelHandlerMessage": {
          "$ref": "#/components/messages/InputChannelHandlerMessage"
        }
      }
    },
    "OutputMessage": {
      "address": "output_channel",
      "messages": {
        "OutputMessage": {
          "$ref": "#/components/messages/OutputMessage"
        }
      }
    }
  },
  "operations": {
    "receiveInputChannelHandler": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/InputChannelHandler"
      }
    },
    "sendOutputMessage": {
      "action": "send",
      "channel": {
        "$ref": "#/channels/OutputMessage"
      }
    }
  },
  "components": {
    "messages": {
      "InputChannelHandlerMessage": {},
      "OutputMessage": {
        "payload": {
          "type": "string"
        }
      }
    }
  }
}

As MessageSender.send() is a coroutine you can await multiple sends:

import asyncio
from dataclasses import dataclass
from typing import Annotated

from asyncfast import AsyncFast
from asyncfast import Header
from asyncfast import Message
from asyncfast import MessageSender

app = AsyncFast()


@dataclass
class OutputMessage(Message, address="output_channel"):
    id: Annotated[int, Header()]
    payload: str


@app.channel("input_channel")
async def input_channel_handler(message_sender: MessageSender[OutputMessage]) -> None:
    await asyncio.gather(
        message_sender.send(OutputMessage(id=1, payload="Hello1")),
        message_sender.send(OutputMessage(id=2, payload="Hello2")),
    )
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "InputChannelHandler": {
      "address": "input_channel",
      "messages": {
        "InputChannelHandlerMessage": {
          "$ref": "#/components/messages/InputChannelHandlerMessage"
        }
      }
    },
    "OutputMessage": {
      "address": "output_channel",
      "messages": {
        "OutputMessage": {
          "$ref": "#/components/messages/OutputMessage"
        }
      }
    }
  },
  "operations": {
    "receiveInputChannelHandler": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/InputChannelHandler"
      }
    },
    "sendOutputMessage": {
      "action": "send",
      "channel": {
        "$ref": "#/channels/OutputMessage"
      }
    }
  },
  "components": {
    "messages": {
      "InputChannelHandlerMessage": {},
      "OutputMessage": {
        "payload": {
          "type": "string"
        }
      }
    }
  }
}