KafkaEventManager¶
- class safir.metrics.KafkaEventManager(*, application, topic_prefix, kafka_broker, kafka_admin_client, schema_manager, manage_kafka_broker=False, logger=None)¶
Bases:
EventManagerA tool for publishing application metrics events.
Events are published to Kafka as avro-serialized messages. The schemas for the messages are stored in a Confluent Schema Registry, and this EventManager will manage those schemas and throw an exception if the event payloads evolve in an incompatible way.
- Parameters:
application (
str) – Name of the application that is generating events.topic_prefix (
str) – Kafka topic prefix for the metrics events topic for this application.kafka_broker (
KafkaBroker) – Broker to use to publish events to Kafka.kafka_admin_client (
AIOKafkaAdminClient) – Admin client to Kafka used to check that it is prepared for event publishing. For example, it is used to check if the topic exists.schema_manager (
PydanticSchemaManager) – Client to the Confluent-compatible schema registry.manage_kafka_broker (
bool, default:False) – IfTrue, start thekafka_brokeroninitializeand close thekafka_brokerwhenacloseis called. If your app’s only use of Kafka is to publish metrics events, then this should beTrue. If you have a FastStream app that already configures a Kafka broker that you want to reuse for metrics, this should probably beFalse, and you should pass in your existing Kafka broker. In this case, you will need to start the broker before callinginitializeand stop it after closing the event manager.logger (
BoundLogger|None, default:None) – Logger to use for internal logging.
Examples
from safir.kafka import KafkaConnectionSettings, SchemaManagerSettings from safir.metrics import ( EventsConfiguration, EventPayload, KafkaMetricsConfiguration, ) config = KafkaMetricsConfiguration( events=EventsConfiguration( application="myapp", topic_prefix="what.ever", ), kafka=KafkaConnectionSettings( bootstrap_servers="someserver:1234", security_protocol=KafkaSecurityProtocol.PLAINTEXT, ), schema_manager=SchemaRegistryConnectionSettings( registry_url=AnyUrl("https://some.registry") ), ) manager = config.make_manager() class MyEvent(EventPayload): foo: str publisher = manager.create_publisher(MyEvent) await manager.register_and_initialize() await publisher.publish(MyEvent(foo="bar1")) await publisher.publish(MyEvent(foo="bar2")) await manager.aclose()
Methods Summary
aclose()Clean up the Kafka clients if they are managed.
build_publisher_for_model(model)Build a Kafka publisher for a specific enriched model.
Initialize the Kafka clients if they are managed.
publish(event, publisher, schema_info)Serialize an event to Avro and publish it to Kafka.
Methods Documentation
- async build_publisher_for_model(model)¶
Build a Kafka publisher for a specific enriched model.
- Parameters:
model (
type[TypeVar(P, bound=EventPayload)]) – Enriched and configured model representing the event that will be published.- Returns:
An appropriate event publisher implementation instance.
- Return type:
- async publish(event, publisher, schema_info)¶
Serialize an event to Avro and publish it to Kafka.
This method should generally not be called directly. It will be called from
publish.- Parameters:
event (
TypeVar(P, bound=EventPayload)) – Fully-enhanced event to publish.publisher (
AsyncAPIDefaultPublisher) – FastStream publisher to use to publish the event.schema_info (
SchemaInfo|None) – Confluent schema registry information about the event type.
- Raises:
EventManagerUnintializedError – Raised if the
initializemethod was not been called before calling this method.- Return type: