KafkaEventManager¶
- class safir.metrics.KafkaEventManager(*, application, topic_prefix, kafka_broker, kafka_admin_client, schema_manager, manage_kafka=False, logger=None)¶
Bases:
EventManagerA tool for publishing application metrics events.
Events are published to Kafka as avro-serialized messages. The schemas for the messages are stored in a Confluent Schema Registry, and this EventManager will manage those schemas and throw an exception if the event payloads evolve in an incompatible way.
- Parameters:
application (
str) – Name of the application that is generating events.topic_prefix (
str) – Kafka topic prefix for the metrics events topic for this application.kafka_broker (
KafkaBroker) – Broker to use to publish events to Kafka.kafka_admin_client (
AIOKafkaAdminClient) – Admin client to Kafka used to check that it is prepared for event publishing. For example, it is used to check if the topic exists.schema_manager (
PydanticSchemaManager) – Client to the Confluent-compatible schema registry.manage_kafka (
bool, default:False) – IfTrue, close thekafka_brokerandkafka_admin_clientwhenacloseis called. If your app’s only use of Kafka is to publish metrics events, then this should beTrue. If you have a FastStream app that already configures some of these clients, this should probably beFalse, and you should pass pre-configured clients in.logger (
BoundLogger|None, default:None) – Logger to use for internal logging.
Examples
from safir.kafka import KafkaConnectionSettings, SchemaManagerSettings from safir.metrics import ( EventsConfiguration, EventPayload, KafkaMetricsConfiguration, ) config = KafkaMetricsConfiguration( events=EventsConfiguration( application="myapp", topic_prefix="what.ever", ), kafka=KafkaConnectionSettings( bootstrap_servers="someserver:1234", security_protocol=KafkaSecurityProtocol.PLAINTEXT, ), schema_manager=SchemaRegistryConnectionSettings( registry_url=AnyUrl("https://some.registry") ), ) manager = config.make_manager() class MyEvent(EventPayload): foo: str publisher = manager.create_publisher(MyEvent) await manager.register_and_initialize() await publisher.publish(MyEvent(foo="bar1")) await publisher.publish(MyEvent(foo="bar2")) await manager.aclose()
Methods Summary
aclose()Clean up the Kafka clients if they are managed.
build_publisher_for_model(model)Build a Kafka publisher for a specific enriched model.
Initialize the Kafka clients if they are managed.
publish(event, publisher, schema_info)Serialize an event to Avro and publish it to Kafka.
Methods Documentation
- async build_publisher_for_model(model)¶
Build a Kafka publisher for a specific enriched model.
- Parameters:
model (
type[TypeVar(P, bound=EventPayload)]) – Enriched and configured model representing the event that will be published.- Returns:
An appropriate event publisher implementation instance.
- Return type:
- async publish(event, publisher, schema_info)¶
Serialize an event to Avro and publish it to Kafka.
This method should generally not be called directly. It will be called from
publish.- Parameters:
event (
AvroBaseModel) – Fully-enhanced event to publish.publisher (
AsyncAPIDefaultPublisher) – FastStream publisher to use to publish the event.schema_info (
SchemaInfo|None) – Confluent schema registry information about the event type.
- Raises:
EventManagerUnintializedError – Raised if the
initializemethod was not been called before calling this method.- Return type: