KafkaEventManager¶
- class safir.metrics.KafkaEventManager(*, application, topic_prefix, kafka_broker, kafka_admin_client, schema_manager, manage_kafka=False, logger=None)¶
Bases:
EventManager
A tool for publishing application metrics events.
Events are published to Kafka as avro-serialized messages. The schemas for the messages are stored in a Confluent Schema Registry, and this EventManager will manage those schemas and throw an exception if the event payloads evolve in an incompatible way.
- Parameters:
application (
str
) – Name of the application that is generating events.topic_prefix (
str
) – Kafka topic prefix for the metrics events topic for this application.kafka_broker (
KafkaBroker
) – Broker to use to publish events to Kafka.kafka_admin_client (
AIOKafkaAdminClient
) – Admin client to Kafka used to check that it is prepared for event publishing. For example, it is used to check if the topic exists.schema_manager (
PydanticSchemaManager
) – Client to the Confluent-compatible schema registry.manage_kafka (
bool
, default:False
) – IfTrue
, close thekafka_broker
andkafka_admin_client
whenaclose
is called. If your app’s only use of Kafka is to publish metrics events, then this should beTrue
. If you have a FastStream app that already configures some of these clients, this should probably beFalse
, and you should pass pre-configured clients in.logger (
BoundLogger
|None
, default:None
) – Logger to use for internal logging.
Examples
from safir.kafka import KafkaConnectionSettings, SchemaManagerSettings from safir.metrics import ( EventsConfiguration, EventPayload, KafkaMetricsConfiguration, ) config = KafkaMetricsConfiguration( events=EventsConfiguration( application="myapp", topic_prefix="what.ever", ), kafka=KafkaConnectionSettings( bootstrap_servers="someserver:1234", security_protocol=KafkaSecurityProtocol.PLAINTEXT, ), schema_manager=SchemaRegistryConnectionSettings( registry_url=AnyUrl("https://some.registry") ), ) manager = config.make_manager() class MyEvent(EventPayload): foo: str publisher = manager.create_publisher(MyEvent) await manager.register_and_initialize() await publisher.publish(MyEvent(foo="bar1")) await publisher.publish(MyEvent(foo="bar2")) await manager.aclose()
Methods Summary
aclose
()Clean up the Kafka clients if they are managed.
build_publisher_for_model
(model)Build a Kafka publisher for a specific enriched model.
Initialize the Kafka clients if they are managed.
publish
(event, publisher, schema_info)Serialize an event to Avro and publish it to Kafka.
Methods Documentation
- async build_publisher_for_model(model)¶
Build a Kafka publisher for a specific enriched model.
- Parameters:
model (
type
[TypeVar
(P
, bound=EventPayload
)]) – Enriched and configured model representing the event that will be published.- Returns:
An appropriate event publisher implementation instance.
- Return type:
- async publish(event, publisher, schema_info)¶
Serialize an event to Avro and publish it to Kafka.
This method should generally not be called directly. It will be called from
publish
.- Parameters:
event (
AvroBaseModel
) – Fully-enhanced event to publish.publisher (
AsyncAPIDefaultPublisher
) – FastStream publisher to use to publish the event.schema_info (
SchemaInfo
|None
) – Confluent schema registry information about the event type.
- Raises:
EventManagerUnintializedError – Raised if the
initialize
method was not been called before calling this method.- Return type: