Header Parameters

from typing import Annotated

from asyncfast import AsyncFast
from asyncfast import Header

app = AsyncFast()


@app.channel("order")
async def handle_order(
    idempotency_key: Annotated[str, Header()],
) -> None:
    print(idempotency_key)
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "HandleOrder": {
      "address": "order",
      "messages": {
        "HandleOrderMessage": {
          "$ref": "#/components/messages/HandleOrderMessage"
        }
      }
    }
  },
  "operations": {
    "receiveHandleOrder": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/HandleOrder"
      }
    }
  },
  "components": {
    "messages": {
      "HandleOrderMessage": {
        "headers": {
          "$ref": "#/components/schemas/HandleOrderHeaders"
        }
      }
    },
    "schemas": {
      "HandleOrderHeaders": {
        "properties": {
          "idempotency-key": {
            "title": "Idempotency-Key",
            "type": "string"
          }
        },
        "required": [
          "idempotency-key"
        ],
        "title": "HandleOrderHeaders",
        "type": "object"
      }
    }
  }
}

Default

from typing import Annotated

from asyncfast import AsyncFast
from asyncfast import Header

app = AsyncFast()


@app.channel("notification_channel")
async def notification_channel_handler(
    request_id: Annotated[int, Header()] = 0,
) -> None:
    print(f"request_id: {request_id}")
{
  "asyncapi": "3.0.0",
  "info": {
    "title": "AsyncFast",
    "version": "0.1.0"
  },
  "channels": {
    "NotificationChannelHandler": {
      "address": "notification_channel",
      "messages": {
        "NotificationChannelHandlerMessage": {
          "$ref": "#/components/messages/NotificationChannelHandlerMessage"
        }
      }
    }
  },
  "operations": {
    "receiveNotificationChannelHandler": {
      "action": "receive",
      "channel": {
        "$ref": "#/channels/NotificationChannelHandler"
      }
    }
  },
  "components": {
    "messages": {
      "NotificationChannelHandlerMessage": {
        "headers": {
          "$ref": "#/components/schemas/NotificationChannelHandlerHeaders"
        }
      }
    },
    "schemas": {
      "NotificationChannelHandlerHeaders": {
        "properties": {
          "request-id": {
            "default": 0,
            "title": "Request-Id",
            "type": "integer"
          }
        },
        "title": "NotificationChannelHandlerHeaders",
        "type": "object"
      }
    }
  }
}