EventManager¶
- class safir.metrics.EventManager(*, app_name, base_topic_prefix, kafka_broker, kafka_admin_client, schema_manager, manage_kafka=False, disable=False, logger=None)¶
Bases:
object
A tool for publishing application metrics events.
Events are published to Kafka as avro-serialized messages. The schemas for the messages are stored in a Confluent Schema Registry, and this EventManager will manage those schemas and throw an exception if the event payloads evolve in an incompatible way.
- Parameters:
app_name (
str
) – The name of the application that is generating eventsbase_topic_prefix (
str
) – The Kafka topic prefix for the metrics events topic for this applicationkafka_broker (
KafkaBroker
) – Does the Kafka publishingkafka_admin_client (
AIOKafkaAdminClient
) – Ensures that Kafka is prepared for event publishing; that the topic exists, for example.schema_manager (
PydanticSchemaManager
) – Handles all Confluent Schema Registry interactionsmanage_kafka (
bool
, default:False
) – IfTrue
, close thekafka_broker
andkafka_admin_client
whenaclose
is called. If your app’s only use of Kafka is to publish metrics events, then this should beTrue
. If you have a FastStream app that already configures some of these clients, this should probably beFalse
, and you should pass pre-configured clients in.disable (
bool
, default:False
) – IfFalse
, don’t actually do anything. No Kafka or Schema Registry interactions will happen. This is useful for running apps locally.logger (
BoundLogger
|None
, default:None
) – A logger to use for internal logging
Examples
from safir.metrics import MetricsConfigurationWithKafka config = MetricsConfigurationWithKafka( metrics_events=MetricsConfiguration( app_name="myapp", base_topic_prefix="what.ever", ), kafka=KafkaConnectionSettings( bootstrap_servers="someserver:1234", security_protocol=KafkaSecurityProtocol.PLAINTEXT, ), schema_registry=SchemaRegistryConnectionSettings( url=AnyUrl("https://some.registry") ), schema_manager=SchemaManagerSettings(), ) manager = config.make_manager() class MyEvent(EventPayload): foo: str publisher = manager.create_publisher(MyEvent) await manager.register_and_initialize() await publisher.publish(MyEvent(foo="bar1")) await publisher.publish(MyEvent(foo="bar2")) await manager.aclose()
Methods Summary
aclose
()Clean up the Kafka clients, if we're managing them.
create_publisher
(name, payload_model)Create an EventPublisher that can publish an event of type
P
.Initialize Kafka clients (if this
EventManager
is managing them).publish
(*, payload, publisher, event_class, ...)Serialize an event to Avro and publish it to Kafka.
Methods Documentation
- async create_publisher(name, payload_model)¶
Create an EventPublisher that can publish an event of type
P
.All event publishers must be created with this method before
register_and_initialize
is called.- Parameters:
name (
str
) – The name of the event. This will be the name of the Avro schema and record, and the name of the event in the event storage backend.payload_model (
type
[TypeVar
(P
, bound=EventPayload
)]) – AnEventPayload
type. Instance of this type can be passed into thepublish
method of this publisher, and they will be published.
- Returns:
An object that can be used to publish events of type
P
- Return type:
- Raises:
DuplicateEventError – Upon an attempt to register a publisher with the same name as one that was already registered.
EventManagerUnintializedError – Upon an attempt to create a publisher before calling
initialize
on thisEventManager
instance.KafkaTopicError – If the topic for publishing events doesn’t exist, or we don’t have access to it.
- async initialize()¶
Initialize Kafka clients (if this
EventManager
is managing them).- Return type:
- async publish(*, payload, publisher, event_class, schema_info)¶
Serialize an event to Avro and publish it to Kafka.
You shouldn’t call this method directly, it will usually be called from the
publish
method of anEventPublisher
.- Raises:
EventManagerUnintializedError – Upon an attempt to create a publisher before calling
initialize
on thisEventManager
instance.- Parameters:
payload (
EventPayload
)publisher (
AsyncAPIDefaultPublisher
)event_class (
type
[AvroBaseModel
])schema_info (
SchemaInfo
|None
)
- Return type:
AvroBaseModel