
class safir.metrics.EventManager(*, app_name, base_topic_prefix, kafka_broker, kafka_admin_client, schema_manager, manage_kafka=False, disable=False, logger=None)

A tool for publishing application metrics events.

Events are published to Kafka as avro-serialized messages. The schemas for the messages are stored in a Confluent Schema Registry, and this EventManager will manage those schemas and throw an exception if the event payloads evolve in an incompatible way.

  • app_name (str) – The name of the application that is generating events

  • base_topic_prefix (str) – The Kafka topic prefix for the metrics events topic for this application

  • kafka_broker (KafkaBroker) – Does the Kafka publishing

  • kafka_admin_client (AIOKafkaAdminClient) – Ensures that Kafka is prepared for event publishing; that the topic exists, for example.

  • schema_manager (PydanticSchemaManager) – Handles all Confluent Schema Registry interactions

  • manage_kafka (bool, default: False) – If True, close the kafka_broker and kafka_admin_client when aclose is called. If your app’s only use of Kafka is to publish metrics events, then this should be True. If you have a FastStream app that already configures some of these clients, this should probably be False, and you should pass pre-configured clients in.

  • disable (bool, default: False) – If False, don’t actually do anything. No Kafka or Schema Registry interactions will happen. This is useful for running apps locally.

  • logger (BoundLogger | None, default: None) – A logger to use for internal logging


from safir.metrics import MetricsConfigurationWithKafka

config = MetricsConfigurationWithKafka(
manager = config.make_manager()

class MyEvent(EventPayload):
    foo: str

publisher = manager.create_publisher(MyEvent)
await manager.register_and_initialize()

await publisher.publish(MyEvent(foo="bar1"))
await publisher.publish(MyEvent(foo="bar2"))

await manager.aclose()

Methods Summary


Clean up the Kafka clients, if we're managing them.

create_publisher(name, payload_model)

Create an EventPublisher that can publish an event of type P.


Initialize Kafka clients (if this EventManager is managing them).

publish(*, payload, publisher, event_class, ...)

Serialize an event to Avro and publish it to Kafka.

Methods Documentation

async aclose()

Clean up the Kafka clients, if we’re managing them.

async create_publisher(name, payload_model)

Create an EventPublisher that can publish an event of type P.

All event publishers must be created with this method before register_and_initialize is called.

  • name (str) – The name of the event. This will be the name of the Avro schema and record, and the name of the event in the event storage backend.

  • payload_model (type[TypeVar(P, bound= EventPayload)]) – An EventPayload type. Instance of this type can be passed into the publish method of this publisher, and they will be published.


An object that can be used to publish events of type P

  • DuplicateEventError – Upon an attempt to register a publisher with the same name as one that was already registered.

  • EventManagerUnintializedError – Upon an attempt to create a publisher before calling initialize on this EventManager instance.

  • KafkaTopicError – If the topic for publishing events doesn’t exist, or we don’t have access to it.

async initialize()

Initialize Kafka clients (if this EventManager is managing them).

async publish(*, payload, publisher, event_class, schema_info)

Serialize an event to Avro and publish it to Kafka.

You shouldn’t call this method directly, it will usually be called from the publish method of an EventPublisher.


EventManagerUnintializedError – Upon an attempt to create a publisher before calling initialize on this EventManager instance.

