ArqDependency#

class safir.dependencies.arq.ArqDependency#

Bases: object

A FastAPI dependency that maintains a Redis client for enqueuing tasks to the worker pool.

Methods Summary

__call__()

Get the arq queue.

initialize(*, mode, redis_settings)

Initialize the dependency (call during the FastAPI start-up event).

Methods Documentation

async __call__()#

Get the arq queue.

This method is called for your by fastapi.Depends. :rtype: ArqQueue

async initialize(*, mode, redis_settings)#

Initialize the dependency (call during the FastAPI start-up event).

Parameters:
Return type:

None

Examples

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import Depends, FastAPI
from safir.arq import ArqMode, ArqQueue
from safir.dependencies.arq import arq_dependency

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    await arq_dependency.initialize(mode=ArqMode.test)
    yield


app = FastAPI()


@app.post("/")
async def post_job(
    arq_queue: ArqQueue = Depends(arq_dependency),
) -> dict[str, Any]:
    job = await arq_queue.enqueue("test_task", "hello", an_int=42)
    return {"job_id": job.id}