ArqDependency#

class safir.dependencies.arq.ArqDependency#

Bases: object

A FastAPI dependency that maintains a Redis client for enqueuing tasks to the worker pool.

Methods Summary

__call__()

Get the arq queue.

initialize(*, mode, redis_settings)

Initialize the dependency (call during the FastAPI start-up event).

Methods Documentation

async __call__()#

Get the arq queue.

This method is called for your by fastapi.Depends.

Return type:

ArqQueue

async initialize(*, mode, redis_settings)#

Initialize the dependency (call during the FastAPI start-up event).

Parameters:
  • mode (ArqMode) –

    The mode to operate the queue dependency in. With safir.arq.ArqMode.production, this method initializes a Redis-based arq queue and the dependency creates a safir.arq.RedisArqQueue client.

    With safir.arq.ArqMode.test, this method instead initializes an in-memory mocked version of arq that you use with the safir.arq.MockArqQueue client.

  • redis_settings (Optional[RedisSettings]) – The arq Redis settings, required when the mode is safir.arq.ArqMode.production. See arq’s ~arq.connections.RedisSettings documentation for details on this object.

Return type:

None

Examples

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import Depends, FastAPI
from safir.arq import ArqMode, ArqQueue
from safir.dependencies.arq import arq_dependency

@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    await arq_dependency.initialize(mode=ArqMode.test)
    yield


app = FastAPI()


@app.post("/")
async def post_job(
    arq_queue: ArqQueue = Depends(arq_dependency),
) -> dict[str, Any]:
    job = await arq_queue.enqueue("test_task", "hello", an_int=42)
    return {"job_id": job.id}