Handling datetimes in database tables

When a database column is defined using the SQLAlchemy ORM using the DateTime generic type, it cannot store a timezone. The SQL standard type DATETIME may include a timezone with some database backends, but it is database-specific. It is therefore normally easier to store times in the database in UTC without timezone information.

However, datetime objects in regular Python code should always be timezone-aware and use the UTC timezone. Timezone-naive datetime objects are often interpreted as being in the local timezone, whatever that happens to be. Keeping all datetime objects as timezone-aware in the UTC timezone will minimize surprises from unexpected timezone conversions.

This unfortunately means that the code for storing and retrieving datetime objects from the database needs a conversion layer. asyncpg wisely declines to convert datetime objects. It therefore returns timezone-naive objects from the database, and raises an exception if a timezone-aware datetime object is stored in a DateTime field. The conversion must therefore be done in the code making SQLAlchemy calls.

Safir provides datetime_to_db and datetime_from_db helper functions to convert from a timezone-aware datetime to a timezone-naive datetime suitable for storing in a DateTime column, and vice versa. These helper functions should be used wherever DateTime columns are read or updated.

datetime_to_db ensures the provided datetime object is timezone-aware and in UTC and converts it to a timezone-naive UTC datetime for database storage. It raises ValueError if passed a timezone-naive datetime object.

datetime_from_db ensures the provided datetime object is either timezone-naive or in UTC and returns a timezone-aware UTC datetime object.

Both raise ValueError if passed datetime objects in some other timezone. Both return None if passed None.

Examples

Here is example of reading an object from the database that includes DateTime columns:

from safir.database import datetime_from_db


stmt = select(SQLJob).where(SQLJob.id == job_id)
result = (await session.execute(stmt)).scalar_one()
job = Job(
    job_id=job.id,
    # ...
    creation_time=datetime_from_db(job.creation_time),
    start_time=datetime_from_db(job.start_time),
    end_time=datetime_from_db(job.end_time),
    destruction_time=datetime_from_db(job.destruction_time),
    # ...
)

Here is an example of updating a DateTime field in the database:

from safir.database import datetime_to_db


async with session.begin():
    stmt = select(SQLJob).where(SQLJob.id == job_id)
    job = (await session.execute(stmt)).scalar_one()
    job.destruction_time = datetime_to_db(destruction_time)