ArqDependency#
- class safir.dependencies.arq.ArqDependency#
Bases:
object
A FastAPI dependency that maintains a Redis client for enqueuing tasks to the worker pool.
Methods Summary
__call__
()Get the arq queue.
initialize
(*, mode, redis_settings)Initialize the dependency (call during the FastAPI start-up event).
Methods Documentation
- async __call__()#
Get the arq queue.
This method is called for your by
fastapi.Depends
.- Return type:
- async initialize(*, mode, redis_settings)#
Initialize the dependency (call during the FastAPI start-up event).
- Parameters:
mode (
ArqMode
) –The mode to operate the queue dependency in. With safir.arq.ArqMode.production, this method initializes a Redis-based arq queue and the dependency creates a safir.arq.RedisArqQueue client.
With safir.arq.ArqMode.test, this method instead initializes an in-memory mocked version of arq that you use with the safir.arq.MockArqQueue client.
redis_settings (
Optional
[RedisSettings
]) – The arq Redis settings, required when themode
is safir.arq.ArqMode.production. See arq’s ~arq.connections.RedisSettings documentation for details on this object.
- Return type:
Examples
from fastapi import Depends, FastAPI from safir.arq import ArqMode, ArqQueue from safir.dependencies.arq import arq_dependency app = FastAPI() @app.on_event("startup") async def startup() -> None: await arq_dependency.initialize(mode=ArqMode.test) @app.post("/") async def post_job( arq_queue: ArqQueue = Depends(arq_dependency), ) -> Dict[str, Any]: job = await arq_queue.enqueue("test_task", "hello", an_int=42) return {"job_id": job.id}