class safir.dependencies.arq.ArqDependency#

Bases: object

A FastAPI dependency that maintains a Redis client for enqueuing tasks to the worker pool.

Methods Summary


Get the arq queue.

initialize(*, mode, redis_settings)

Initialize the dependency (call during the FastAPI start-up event).

Methods Documentation

async __call__()#

Get the arq queue.

This method is called for your by fastapi.Depends. :rtype: ArqQueue

async initialize(*, mode, redis_settings)#

Initialize the dependency (call during the FastAPI start-up event).

Return type:



from import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import Depends, FastAPI
from safir.arq import ArqMode, ArqQueue
from safir.dependencies.arq import arq_dependency

async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    await arq_dependency.initialize(mode=ArqMode.test)

app = FastAPI()"/")
async def post_job(
    arq_queue: ArqQueue = Depends(arq_dependency),
) -> dict[str, Any]:
    job = await arq_queue.enqueue("test_task", "hello", an_int=42)
    return {"job_id":}