KafkaEventManager

class safir.metrics.KafkaEventManager(*, application, topic_prefix, kafka_broker, kafka_admin_client, schema_manager, manage_kafka=False, logger=None)

Bases: EventManager

A tool for publishing application metrics events.

Events are published to Kafka as avro-serialized messages. The schemas for the messages are stored in a Confluent Schema Registry, and this EventManager will manage those schemas and throw an exception if the event payloads evolve in an incompatible way.

Parameters:
  • application (str) – Name of the application that is generating events.

  • topic_prefix (str) – Kafka topic prefix for the metrics events topic for this application.

  • kafka_broker (KafkaBroker) – Broker to use to publish events to Kafka.

  • kafka_admin_client (AIOKafkaAdminClient) – Admin client to Kafka used to check that it is prepared for event publishing. For example, it is used to check if the topic exists.

  • schema_manager (PydanticSchemaManager) – Client to the Confluent-compatible schema registry.

  • manage_kafka (bool, default: False) – If True, close the kafka_broker and kafka_admin_client when aclose is called. If your app’s only use of Kafka is to publish metrics events, then this should be True. If you have a FastStream app that already configures some of these clients, this should probably be False, and you should pass pre-configured clients in.

  • logger (BoundLogger | None, default: None) – Logger to use for internal logging.

Examples

from safir.kafka import KafkaConnectionSettings, SchemaManagerSettings
from safir.metrics import (
    EventsConfiguration,
    EventPayload,
    KafkaMetricsConfiguration,
)

config = KafkaMetricsConfiguration(
    events=EventsConfiguration(
        application="myapp",
        topic_prefix="what.ever",
    ),
    kafka=KafkaConnectionSettings(
        bootstrap_servers="someserver:1234",
        security_protocol=KafkaSecurityProtocol.PLAINTEXT,
    ),
    schema_manager=SchemaRegistryConnectionSettings(
        registry_url=AnyUrl("https://some.registry")
    ),
)
manager = config.make_manager()


class MyEvent(EventPayload):
    foo: str


publisher = manager.create_publisher(MyEvent)
await manager.register_and_initialize()

await publisher.publish(MyEvent(foo="bar1"))
await publisher.publish(MyEvent(foo="bar2"))

await manager.aclose()

Methods Summary

aclose()

Clean up the Kafka clients if they are managed.

build_publisher_for_model(model)

Build a Kafka publisher for a specific enriched model.

initialize()

Initialize the Kafka clients if they are managed.

publish(event, publisher, schema_info)

Serialize an event to Avro and publish it to Kafka.

Methods Documentation

async aclose()

Clean up the Kafka clients if they are managed.

Return type:

None

async build_publisher_for_model(model)

Build a Kafka publisher for a specific enriched model.

Parameters:

model (type[TypeVar(P, bound= EventPayload)]) – Enriched and configured model representing the event that will be published.

Returns:

An appropriate event publisher implementation instance.

Return type:

EventPublisher

async initialize()

Initialize the Kafka clients if they are managed.

Return type:

None

async publish(event, publisher, schema_info)

Serialize an event to Avro and publish it to Kafka.

This method should generally not be called directly. It will be called from publish.

Parameters:
  • event (AvroBaseModel) – Fully-enhanced event to publish.

  • publisher (AsyncAPIDefaultPublisher) – FastStream publisher to use to publish the event.

  • schema_info (SchemaInfo | None) – Confluent schema registry information about the event type.

Raises:

EventManagerUnintializedError – Raised if the initialize method was not been called before calling this method.

Return type:

None