ArqDependency

class safir.dependencies.arq.ArqDependency

Bases: object

A FastAPI dependency that maintains a Redis client for enqueing tasks to the worker pool.

Methods Summary

__call__()

Get the arq queue.

initialize(*, mode, redis_settings)

Initialize the dependency (call during the FastAPI start-up event).

Methods Documentation

async __call__() ArqQueue

Get the arq queue.

This method is called for your by fastapi.Depends.

async initialize(*, mode: ArqMode, redis_settings: Optional[RedisSettings]) None

Initialize the dependency (call during the FastAPI start-up event).

Parameters:

Examples

from fastapi import Depends, FastAPI
from safir.arq import ArqMode, ArqQueue
from safir.dependencies.arq import arq_dependency

app = FastAPI()

@app.on_event("startup")
async def startup() -> None:
    await arq_dependency.initialize(mode=ArqMode.test)


@app.post("/")
async def post_job(
    arq_queue: ArqQueue = Depends(arq_dependency),
) -> Dict[str, Any]:
    job = await arq_queue.enqueue("test_task", "hello", an_int=42)
    return {"job_id": job.id}