EventDependency#
- class safir.dependencies.metrics.EventDependency(event_maker)#
- Bases: - Generic- Provides EventManager-managed events for apps to publish. - Parameters:
- event_maker ( - TypeVar(- E, bound=- EventMaker)) – An instance of an implementation of- EventMaker
 - Examples myapp.dependencies.events_dependency#- from safir.metrics import ( EventMaker, EventManager, EventPayload, EventsDependency, MetricsConfigurationWithKafka, ) class Event1(EventPayload): foo: str bar: str class event2(EventPayload): baz: int buz: str class Events(EventMaker): async def initialize(self, manager: EventManager) -> None: self.event1 = await manager.create_publisher("event1", Event1) self.event2 = await manager.create_publisher("event2", Event2) dependency = EventsDependency(AppEvents()) myapp.main#- from myapp.dependencies.events_dependency import dependency as ed # In application init, maybe a lifecycle method in a FastAPI app manager = MetricsConfigurationWithKafka().make_manager() await manager.initialize() await events.initialize(manager) myapp.someservice#- from myapp.dependencies.events_dependency import dependency as ed events = ed() # In app code await events.event1.publish(Event1(foo="foo", bar="bar")) await events.event2.publish(Event2(baz=123, buz="buz")) - Attributes Summary - Return the instantiated event maker. - Methods Summary - __call__()- Return the initialized event maker for use as a FastAPI dependency. - initialize(manager)- Initialize the event maker and set the attribute. - Attributes Documentation - events#
- Return the instantiated event maker. - Raises:
- RuntimeError – If this - EventsDependencyhasn’t been intialized yet.
 
 - Methods Documentation - async __call__()#
- Return the initialized event maker for use as a FastAPI dependency. - Return type:
- TypeVar(- E, bound=- EventMaker)
 
 - async initialize(manager)#
- Initialize the event maker and set the attribute. - Parameters:
- manager ( - EventManager) – An- EventManagerto register and publish events.
- Return type: