Header Parameters¶
from typing import Annotated
from asyncfast import AsyncFast
from asyncfast import Header
app = AsyncFast()
@app.channel("order")
async def handle_order(
idempotency_key: Annotated[str, Header()],
) -> None:
print(idempotency_key)
{
"asyncapi": "3.0.0",
"info": {
"title": "AsyncFast",
"version": "0.1.0"
},
"channels": {
"HandleOrder": {
"address": "order",
"messages": {
"HandleOrderMessage": {
"$ref": "#/components/messages/HandleOrderMessage"
}
}
}
},
"operations": {
"receiveHandleOrder": {
"action": "receive",
"channel": {
"$ref": "#/channels/HandleOrder"
}
}
},
"components": {
"messages": {
"HandleOrderMessage": {
"headers": {
"$ref": "#/components/schemas/HandleOrderHeaders"
}
}
},
"schemas": {
"HandleOrderHeaders": {
"properties": {
"idempotency-key": {
"title": "Idempotency-Key",
"type": "string"
}
},
"required": [
"idempotency-key"
],
"title": "HandleOrderHeaders",
"type": "object"
}
}
}
}
Default¶
from typing import Annotated
from asyncfast import AsyncFast
from asyncfast import Header
app = AsyncFast()
@app.channel("notification_channel")
async def notification_channel_handler(
request_id: Annotated[int, Header()] = 0,
) -> None:
print(f"request_id: {request_id}")
{
"asyncapi": "3.0.0",
"info": {
"title": "AsyncFast",
"version": "0.1.0"
},
"channels": {
"NotificationChannelHandler": {
"address": "notification_channel",
"messages": {
"NotificationChannelHandlerMessage": {
"$ref": "#/components/messages/NotificationChannelHandlerMessage"
}
}
}
},
"operations": {
"receiveNotificationChannelHandler": {
"action": "receive",
"channel": {
"$ref": "#/channels/NotificationChannelHandler"
}
}
},
"components": {
"messages": {
"NotificationChannelHandlerMessage": {
"headers": {
"$ref": "#/components/schemas/NotificationChannelHandlerHeaders"
}
}
},
"schemas": {
"NotificationChannelHandlerHeaders": {
"properties": {
"request-id": {
"default": 0,
"title": "Request-Id",
"type": "integer"
}
},
"title": "NotificationChannelHandlerHeaders",
"type": "object"
}
}
}
}