ArqDependency#

class safir.dependencies.arq.ArqDependency#

Bases: object

A FastAPI dependency that maintains a Redis client for enqueuing tasks to the worker pool.

Methods Summary

__call__()

Get the arq queue.

initialize(*, mode, redis_settings)

Initialize the dependency (call during the FastAPI start-up event).

Methods Documentation

async __call__()#

Get the arq queue.

This method is called for your by fastapi.Depends.

Return type:

ArqQueue

async initialize(*, mode, redis_settings)#

Initialize the dependency (call during the FastAPI start-up event).

Parameters:
  • mode (ArqMode) –

    The mode to operate the queue dependency in. With safir.arq.ArqMode.production, this method initializes a Redis-based arq queue and the dependency creates a safir.arq.RedisArqQueue client.

    With safir.arq.ArqMode.test, this method instead initializes an in-memory mocked version of arq that you use with the safir.arq.MockArqQueue client.

  • redis_settings (Optional[RedisSettings]) – The arq Redis settings, required when the mode is safir.arq.ArqMode.production. See arq’s ~arq.connections.RedisSettings documentation for details on this object.

Return type:

None

Examples

from fastapi import Depends, FastAPI
from safir.arq import ArqMode, ArqQueue
from safir.dependencies.arq import arq_dependency

app = FastAPI()

@app.on_event("startup")
async def startup() -> None:
    await arq_dependency.initialize(mode=ArqMode.test)


@app.post("/")
async def post_job(
    arq_queue: ArqQueue = Depends(arq_dependency),
) -> Dict[str, Any]:
    job = await arq_queue.enqueue("test_task", "hello", an_int=42)
    return {"job_id": job.id}