EventManager

class safir.metrics.EventManager(*, app_name, base_topic_prefix, kafka_broker, kafka_admin_client, schema_manager, manage_kafka=False, disable=False, logger=None)

Bases: object

A tool for publishing application metrics events.

Events are published to Kafka as avro-serialized messages. The schemas for the messages are stored in a Confluent Schema Registry, and this EventManager will manage those schemas and throw an exception if the event payloads evolve in an incompatible way.

Parameters:
  • app_name (str) – The name of the application that is generating events

  • base_topic_prefix (str) – The Kafka topic prefix for the metrics events topic for this application

  • kafka_broker (KafkaBroker) – Does the Kafka publishing

  • kafka_admin_client (AIOKafkaAdminClient) – Ensures that Kafka is prepared for event publishing; that the topic exists, for example.

  • schema_manager (PydanticSchemaManager) – Handles all Confluent Schema Registry interactions

  • manage_kafka (bool, default: False) – If True, close the kafka_broker and kafka_admin_client when aclose is called. If your app’s only use of Kafka is to publish metrics events, then this should be True. If you have a FastStream app that already configures some of these clients, this should probably be False, and you should pass pre-configured clients in.

  • disable (bool, default: False) – If False, don’t actually do anything. No Kafka or Schema Registry interactions will happen. This is useful for running apps locally.

  • logger (BoundLogger | None, default: None) – A logger to use for internal logging

Examples

from safir.metrics import MetricsConfigurationWithKafka

config = MetricsConfigurationWithKafka(
    metrics_events=MetricsConfiguration(
        app_name="myapp",
        base_topic_prefix="what.ever",
    ),
    kafka=KafkaConnectionSettings(
        bootstrap_servers="someserver:1234",
        security_protocol=KafkaSecurityProtocol.PLAINTEXT,
    ),
    schema_registry=SchemaRegistryConnectionSettings(
        url=AnyUrl("https://some.registry")
    ),
    schema_manager=SchemaManagerSettings(),
)
manager = config.make_manager()

class MyEvent(EventPayload):
    foo: str


publisher = manager.create_publisher(MyEvent)
await manager.register_and_initialize()

await publisher.publish(MyEvent(foo="bar1"))
await publisher.publish(MyEvent(foo="bar2"))

await manager.aclose()

Methods Summary

aclose()

Clean up the Kafka clients, if we're managing them.

create_publisher(name, payload_model)

Create an EventPublisher that can publish an event of type P.

initialize()

Initialize Kafka clients (if this EventManager is managing them).

publish(*, payload, publisher, event_class, ...)

Serialize an event to Avro and publish it to Kafka.

Methods Documentation

async aclose()

Clean up the Kafka clients, if we’re managing them.

Return type:

None

async create_publisher(name, payload_model)

Create an EventPublisher that can publish an event of type P.

All event publishers must be created with this method before register_and_initialize is called.

Parameters:
  • name (str) – The name of the event. This will be the name of the Avro schema and record, and the name of the event in the event storage backend.

  • payload_model (type[TypeVar(P, bound= EventPayload)]) – An EventPayload type. Instance of this type can be passed into the publish method of this publisher, and they will be published.

Returns:

An object that can be used to publish events of type P

Return type:

EventPublisher

Raises:
  • DuplicateEventError – Upon an attempt to register a publisher with the same name as one that was already registered.

  • EventManagerUnintializedError – Upon an attempt to create a publisher before calling initialize on this EventManager instance.

  • KafkaTopicError – If the topic for publishing events doesn’t exist, or we don’t have access to it.

async initialize()

Initialize Kafka clients (if this EventManager is managing them).

Return type:

None

async publish(*, payload, publisher, event_class, schema_info)

Serialize an event to Avro and publish it to Kafka.

You shouldn’t call this method directly, it will usually be called from the publish method of an EventPublisher.

Raises:

EventManagerUnintializedError – Upon an attempt to create a publisher before calling initialize on this EventManager instance.

Parameters:
Return type:

AvroBaseModel