ArqDependency#
- class safir.dependencies.arq.ArqDependency#
Bases:
object
A FastAPI dependency that maintains a Redis client for enqueing tasks to the worker pool.
Methods Summary
__call__
()Get the arq queue.
initialize
(*, mode, redis_settings)Initialize the dependency (call during the FastAPI start-up event).
Methods Documentation
- async __call__()#
Get the arq queue.
This method is called for your by
fastapi.Depends
.- Return type:
- async initialize(*, mode, redis_settings)#
Initialize the dependency (call during the FastAPI start-up event).
- Parameters:
mode (
safir.arq.ArqMode
) –The mode to operate the queue dependency in. With
safir.arq.ArqMode.production
, this method initializes a Redis-based arq queue and the dependency creates asafir.arq.RedisArqQueue
client.With
safir.arq.ArqMode.test
, this method instead initializes an in-memory mocked version of arq that you use with thesafir.arq.MockArqQueue
client.redis_settings (
arq.connections.RedisSettings
) – The arq Redis settings, required when themode
issafir.arq.ArqMode.production
. See arq’sRedisSettings
documentation for details on this object.
Examples
from fastapi import Depends, FastAPI from safir.arq import ArqMode, ArqQueue from safir.dependencies.arq import arq_dependency app = FastAPI() @app.on_event("startup") async def startup() -> None: await arq_dependency.initialize(mode=ArqMode.test) @app.post("/") async def post_job( arq_queue: ArqQueue = Depends(arq_dependency), ) -> Dict[str, Any]: job = await arq_queue.enqueue("test_task", "hello", an_int=42) return {"job_id": job.id}
- Return type: