EventDependency¶
- class safir.dependencies.metrics.EventDependency(event_maker)¶
-
Provides EventManager-managed events for apps to publish.
- Parameters:
event_maker (
TypeVar
(E
, bound= EventMaker)) – An instance of an implementation ofEventMaker
Examples
from safir.metrics import ( EventMaker, EventManager, EventPayload, EventsDependency, MetricsConfigurationWithKafka, ) class Event1(EventPayload): foo: str bar: str class event2(EventPayload): baz: int buz: str class Events(EventMaker): async def initialize(self, manager: EventManager) -> None: self.event1 = await manager.create_publisher("event1", Event1) self.event2 = await manager.create_publisher("event2", Event2) dependency = EventsDependency(AppEvents())
from myapp.dependencies.events_dependency import dependency as ed # In application init, maybe a lifecycle method in a FastAPI app manager = MetricsConfigurationWithKafka().make_manager() await manager.initialize() await events.initialize(manager)
from myapp.dependencies.events_dependency import dependency as ed events = ed() # In app code await events.event1.publish(Event1(foo="foo", bar="bar")) await events.event2.publish(Event2(baz=123, buz="buz"))
Attributes Summary
Return the instantiated event maker.
Methods Summary
__call__
()Return the initialized event maker for use as a FastAPI dependency.
initialize
(manager)Initialize the event maker and set the attribute.
Attributes Documentation
- events¶
Return the instantiated event maker.
- Raises:
RuntimeError – If this
EventsDependency
hasn’t been intialized yet.
Methods Documentation
- async __call__()¶
Return the initialized event maker for use as a FastAPI dependency.
- Return type:
TypeVar
(E
, bound= EventMaker)
- async initialize(manager)¶
Initialize the event maker and set the attribute.
- Parameters:
manager (
EventManager
) – AnEventManager
to register and publish events.- Return type: