KafkaEventManager

class safir.metrics.KafkaEventManager(*, application, topic_prefix, kafka_broker, kafka_admin_client, schema_manager, manage_kafka_broker=False, logger=None)

Bases: EventManager

A tool for publishing application metrics events.

Events are published to Kafka as avro-serialized messages. The schemas for the messages are stored in a Confluent Schema Registry, and this EventManager will manage those schemas and throw an exception if the event payloads evolve in an incompatible way.

Parameters:
  • application (str) – Name of the application that is generating events.

  • topic_prefix (str) – Kafka topic prefix for the metrics events topic for this application.

  • kafka_broker (KafkaBroker) – Broker to use to publish events to Kafka.

  • kafka_admin_client (AIOKafkaAdminClient) – Admin client to Kafka used to check that it is prepared for event publishing. For example, it is used to check if the topic exists.

  • schema_manager (PydanticSchemaManager) – Client to the Confluent-compatible schema registry.

  • manage_kafka_broker (bool, default: False) – If True, start the kafka_broker on initialize and close the kafka_broker when aclose is called. If your app’s only use of Kafka is to publish metrics events, then this should be True. If you have a FastStream app that already configures a Kafka broker that you want to reuse for metrics, this should probably be False, and you should pass in your existing Kafka broker. In this case, you will need to start the broker before calling initialize and stop it after closing the event manager.

  • logger (BoundLogger | None, default: None) – Logger to use for internal logging.

Examples

from safir.kafka import KafkaConnectionSettings, SchemaManagerSettings
from safir.metrics import (
    EventsConfiguration,
    EventPayload,
    KafkaMetricsConfiguration,
)

config = KafkaMetricsConfiguration(
    events=EventsConfiguration(
        application="myapp",
        topic_prefix="what.ever",
    ),
    kafka=KafkaConnectionSettings(
        bootstrap_servers="someserver:1234",
        security_protocol=KafkaSecurityProtocol.PLAINTEXT,
    ),
    schema_manager=SchemaRegistryConnectionSettings(
        registry_url=AnyUrl("https://some.registry")
    ),
)
manager = config.make_manager()


class MyEvent(EventPayload):
    foo: str


publisher = manager.create_publisher(MyEvent)
await manager.register_and_initialize()

await publisher.publish(MyEvent(foo="bar1"))
await publisher.publish(MyEvent(foo="bar2"))

await manager.aclose()

Methods Summary

aclose()

Clean up the Kafka clients if they are managed.

build_publisher_for_model(model)

Build a Kafka publisher for a specific enriched model.

initialize()

Initialize the Kafka clients if they are managed.

publish(event, publisher, schema_info)

Serialize an event to Avro and publish it to Kafka.

Methods Documentation

async aclose()

Clean up the Kafka clients if they are managed.

Return type:

None

async build_publisher_for_model(model)

Build a Kafka publisher for a specific enriched model.

Parameters:

model (type[TypeVar(P, bound= EventPayload)]) – Enriched and configured model representing the event that will be published.

Returns:

An appropriate event publisher implementation instance.

Return type:

EventPublisher

async initialize()

Initialize the Kafka clients if they are managed.

Return type:

None

async publish(event, publisher, schema_info)

Serialize an event to Avro and publish it to Kafka.

This method should generally not be called directly. It will be called from publish.

Parameters:
  • event (TypeVar(P, bound= EventPayload)) – Fully-enhanced event to publish.

  • publisher (AsyncAPIDefaultPublisher) – FastStream publisher to use to publish the event.

  • schema_info (SchemaInfo | None) – Confluent schema registry information about the event type.

Raises:

EventManagerUnintializedError – Raised if the initialize method was not been called before calling this method.

Return type:

None