KafkaEventManager¶
- class safir.metrics.KafkaEventManager(*, application, topic_prefix, kafka_broker, kafka_admin_client, schema_manager, backoff_interval=None, manage_kafka_broker=False, raise_on_error=False, logger=None)¶
Bases:
EventManager
A tool for publishing application metrics events.
Events are published to Kafka as avro-serialized messages. The schemas for the messages are stored in a Confluent Schema Registry, and this EventManager will manage those schemas and throw an exception if the event payloads evolve in an incompatible way.
- Parameters:
application (
str
) – Name of the application that is generating events.topic_prefix (
str
) – Kafka topic prefix for the metrics events topic for this application.kafka_broker (
KafkaBroker
) – Broker to use to publish events to Kafka.kafka_admin_client (
AIOKafkaAdminClient
) – Admin client to Kafka used to check that it is prepared for event publishing. For example, it is used to check if the topic exists.schema_manager (
PydanticSchemaManager
) – Client to the Confluent-compatible schema registry.backoff_interval (
timedelta
|None
, default:None
) – The amount of time to wait until further operations are attempted after the KafkaEventManager is put into an error state.manage_kafka_broker (
bool
, default:False
) – IfTrue
, start thekafka_broker
oninitialize
and close thekafka_broker
whenaclose
is called. If your app’s only use of Kafka is to publish metrics events, then this should beTrue
. If you have a FastStream app that already configures a Kafka broker that you want to reuse for metrics, this should probably beFalse
, and you should pass in your existing Kafka broker. In this case, you will need to start the broker before callinginitialize
and stop it after closing the event manager.raise_on_error (
bool
, default:False
) – True if we should raise an exception whenever there is an error with the metrics system dependencies, like Kafka or the Schema Manager. False if we should just log an error instead. This should be False for most production apps so that issues with the metrics infrastructure don’t bring down the app.logger (
BoundLogger
|None
, default:None
) – Logger to use for internal logging.
Examples
from safir.kafka import KafkaConnectionSettings, SchemaManagerSettings from safir.metrics import ( EventsConfiguration, EventPayload, KafkaMetricsConfiguration, ) config = KafkaMetricsConfiguration( events=EventsConfiguration( application="myapp", topic_prefix="what.ever", ), kafka=KafkaConnectionSettings( bootstrap_servers="someserver:1234", security_protocol=KafkaSecurityProtocol.PLAINTEXT, ), schema_manager=SchemaRegistryConnectionSettings( registry_url=AnyUrl("https://some.registry") ), ) manager = config.make_manager() class MyEvent(EventPayload): foo: str publisher = manager.create_publisher(MyEvent) await manager.register_and_initialize() await publisher.publish(MyEvent(foo="bar1")) await publisher.publish(MyEvent(foo="bar2")) await manager.aclose()
Methods Summary
aclose
()Clean up the Kafka clients if they are managed.
build_publisher_for_model
(model)Build a Kafka publisher for a specific enriched model.
Initialize the Kafka clients if they are managed.
publish
(event, publisher, schema_info)Serialize an event to Avro and publish it to Kafka.
Methods Documentation
- async build_publisher_for_model(model)¶
Build a Kafka publisher for a specific enriched model.
- Parameters:
model (
type
[TypeVar
(P
, bound=EventPayload
)]) – Enriched and configured model representing the event that will be published.- Returns:
An appropriate event publisher implementation instance. We don’t want to crash the whole application if we can’t build a publisher due to problems like the metrics infrastructure being unavailable, so return a FailedEventPublisher in those cases.
- Return type:
- async publish(event, publisher, schema_info)¶
Serialize an event to Avro and publish it to Kafka.
This method should generally not be called directly. It will be called from
publish
.- Parameters:
event (
EventPayload
) – Fully-enhanced event to publish.publisher (
AsyncAPIDefaultPublisher
) – FastStream publisher to use to publish the event.schema_info (
SchemaInfo
|None
) – Confluent schema registry information about the event type.
- Raises:
EventManagerUnintializedError – Raised if the
initialize
method was not been called before calling this method.- Return type: