ArqDependency¶
- class safir.dependencies.arq.ArqDependency¶
Bases:
object
A FastAPI dependency that maintains a Redis client for enqueuing tasks to the worker pool.
Methods Summary
__call__
()Get the arq queue.
initialize
(*, mode, redis_settings)Initialize the dependency (call during the FastAPI start-up event).
Methods Documentation
- async __call__()¶
Get the arq queue.
This method is called for your by
fastapi.Depends
.- Return type:
- async initialize(*, mode, redis_settings)¶
Initialize the dependency (call during the FastAPI start-up event).
- Parameters:
mode (
ArqMode
) –The mode to operate the queue dependency in. With
safir.arq.ArqMode.production
, this method initializes a Redis-based arq queue and the dependency creates asafir.arq.RedisArqQueue
client.With
safir.arq.ArqMode.test
, this method instead initializes an in-memory mocked version of arq that you use with thesafir.arq.MockArqQueue
client.redis_settings (
RedisSettings
|None
) – The arq Redis settings, required when themode
issafir.arq.ArqMode.production
. See arq’sRedisSettings
documentation for details on this object.
- Return type:
Examples
from collections.abc import AsyncIterator from contextlib import asynccontextmanager from fastapi import Depends, FastAPI from safir.arq import ArqMode, ArqQueue from safir.dependencies.arq import arq_dependency @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: await arq_dependency.initialize(mode=ArqMode.test) yield app = FastAPI() @app.post("/") async def post_job( arq_queue: ArqQueue = Depends(arq_dependency), ) -> dict[str, Any]: job = await arq_queue.enqueue("test_task", "hello", an_int=42) return {"job_id": job.id}