Message Payload

In a decorated channel any argument that is not a channel parameter, or header is treated as the payload.

Important

Only one payload argument is allowed, there is no attempt to merge multiple, instead an error will be raised

BaseModel

You can declare a data model as a class that inherits from BaseModel. This can then be used as the payload argument:

from asyncfast import AsyncFast
from pydantic import BaseModel

app = AsyncFast()


class Payload(BaseModel):
    id: str
    name: str


@app.channel("channel")
async def handle_channel(payload: Payload) -> None:
    print(payload)
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "HandleChannel": {
      "address": "channel",
      "messages": {
        "HandleChannelMessage": {
          "$ref": "#/components/messages/HandleChannelMessage"
        }
      }
    }
  },
  "operations": {
    "receiveHandleChannel": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/HandleChannel"
      }
    }
  },
  "components": {
    "messages": {
      "HandleChannelMessage": {
        "payload": {
          "$ref": "#/components/schemas/Payload"
        }
      }
    },
    "schemas": {
      "Payload": {
        "properties": {
          "id": {
            "title": "Id",
            "type": "string"
          },
          "name": {
            "title": "Name",
            "type": "string"
          }
        },
        "required": [
          "id",
          "name"
        ],
        "title": "Payload",
        "type": "object"
      }
    }
  }
}

List

from asyncfast import AsyncFast
from pydantic import BaseModel

app = AsyncFast()


class Item(BaseModel):
    id: str
    name: str


@app.channel("channel")
async def handle_channel(items: list[Item]) -> None:
    print(items)
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "HandleChannel": {
      "address": "channel",
      "messages": {
        "HandleChannelMessage": {
          "$ref": "#/components/messages/HandleChannelMessage"
        }
      }
    }
  },
  "operations": {
    "receiveHandleChannel": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/HandleChannel"
      }
    }
  },
  "components": {
    "messages": {
      "HandleChannelMessage": {
        "payload": {
          "type": "array",
          "items": {
            "$ref": "#/components/schemas/Item"
          }
        }
      }
    },
    "schemas": {
      "Item": {
        "properties": {
          "id": {
            "title": "Id",
            "type": "string"
          },
          "name": {
            "title": "Name",
            "type": "string"
          }
        },
        "required": [
          "id",
          "name"
        ],
        "title": "Item",
        "type": "object"
      }
    }
  }
}

Dataclass

from dataclasses import dataclass

from asyncfast import AsyncFast

app = AsyncFast()


@dataclass
class Order:
    id: str
    skus: list[str]


@app.channel("order")
async def handle_order(order: Order) -> None:
    print(order)
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "HandleOrder": {
      "address": "order",
      "messages": {
        "HandleOrderMessage": {
          "$ref": "#/components/messages/HandleOrderMessage"
        }
      }
    }
  },
  "operations": {
    "receiveHandleOrder": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/HandleOrder"
      }
    }
  },
  "components": {
    "messages": {
      "HandleOrderMessage": {
        "payload": {
          "$ref": "#/components/schemas/Order"
        }
      }
    },
    "schemas": {
      "Order": {
        "properties": {
          "id": {
            "title": "Id",
            "type": "string"
          },
          "skus": {
            "items": {
              "type": "string"
            },
            "title": "Skus",
            "type": "array"
          }
        },
        "required": [
          "id",
          "skus"
        ],
        "title": "Order",
        "type": "object"
      }
    }
  }
}

Built in

from asyncfast import AsyncFast

app = AsyncFast()


@app.channel("channel")
async def handle_channel(payload: int) -> None:
    print(payload)
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "HandleChannel": {
      "address": "channel",
      "messages": {
        "HandleChannelMessage": {
          "$ref": "#/components/messages/HandleChannelMessage"
        }
      }
    }
  },
  "operations": {
    "receiveHandleChannel": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/HandleChannel"
      }
    }
  },
  "components": {
    "messages": {
      "HandleChannelMessage": {
        "payload": {
          "type": "integer"
        }
      }
    }
  }
}