Retrying database transactions¶
To aid in retrying transactions, Safir provides a decorator function, safir.database.retry_async_transaction
.
Retrying transactions is often useful in conjunction with a custom transaction isolation level.
Setting an isolation level¶
If you have multiple simultaneous database writers and need to coordinate their writes to ensure consistent results, you may have to set a custom isolation level, such as REPEATABLE READ
,
In this case, transactions that attempt to modify an object that was modified by a different connection will raise an exception and can be retried.
create_database_engine
and the initialize
method of db_session_dependency
take an optional isolation_level
argument that can be used to set a non-default isolation level.
If given, this parameter is passed through to the underlying SQLAlchemy engine.
See the SQLAlchemy isolation level documentation for more information. See Using a database session in request handlers and Creating a database session for more information about initializing the database engine.
Retrying transactions¶
To retry failed transactions in a function or method, decorate that function or method with retry_async_transaction
.
If the decorated function or method raises sqlalchemy.exc.DBAPIError
(the parent exception for exceptions raised by the underlying database API), it will be re-ran with the same arguments.
This will be repeated a configurable number of times (three by default).
The decorated function or method must therefore be idempotent and safe to run repeatedly.
Here’s a simplified example from the storage layer of a Safir application:
from datetime import datetime
from safir.database import datetime_to_db, retry_async_transaction
from sqlalchemy.ext.asyncio import async_scoped_session
class Storage:
def __init__(self, session: async_scoped_session) -> None:
self._session = session
@retry_async_transaction
async def mark_start(self, job_id: str, start: datetime) -> None:
async with self._session.begin():
job = await self._get_job(job_id)
if job.phase in ("PENDING", "QUEUED"):
job.phase = "EXECUTING"
job.start_time = start
If this method races with other methods updating the same job, the custom isolation level will force this update to fail with an exception, and it will then be retried by the decorator.
Changing the retry delay¶
The decorator will delay for half a second (configurable with the delay
parameter) between attempts, and by default the method is attempted three times.
These can be changed with a parameter to the decorator, such as:
class Storage:
@retry_async_transaction(max_tries=5, delay=2.5)
async def mark_start(self, job_id: str, start: datetime) -> None:
async with self._session.begin():
...