Handling datetimes in database tables¶
When a database column is defined using the SQLAlchemy ORM using the DateTime
generic type, it cannot store a timezone.
The SQL standard type DATETIME
may include a timezone with some database backends, but it is database-specific.
It is therefore normally easier to store times in the database in UTC without timezone information.
However, datetime
objects in regular Python code should always be timezone-aware and use the UTC timezone.
Timezone-naive datetime objects are often interpreted as being in the local timezone, whatever that happens to be.
Keeping all datetime objects as timezone-aware in the UTC timezone will minimize surprises from unexpected timezone conversions.
This unfortunately means that the code for storing and retrieving datetime objects from the database needs a conversion layer.
asyncpg wisely declines to convert datetime objects.
It therefore returns timezone-naive objects from the database, and raises an exception if a timezone-aware datetime object is stored in a DateTime
field.
The conversion must therefore be done in the code making SQLAlchemy calls.
Safir provides datetime_to_db
and datetime_from_db
helper functions to convert from a timezone-aware datetime to a timezone-naive datetime suitable for storing in a DateTime column, and vice versa.
These helper functions should be used wherever DateTime
columns are read or updated.
datetime_to_db
ensures the provided datetime object is timezone-aware and in UTC and converts it to a timezone-naive UTC datetime for database storage.
It raises ValueError
if passed a timezone-naive datetime object.
datetime_from_db
ensures the provided datetime object is either timezone-naive or in UTC and returns a timezone-aware UTC datetime object.
Both raise ValueError
if passed datetime objects in some other timezone.
Both return None
if passed None
.
Examples¶
Here is example of reading an object from the database that includes DateTime columns:
from safir.database import datetime_from_db
stmt = select(SQLJob).where(SQLJob.id == job_id)
result = (await session.execute(stmt)).scalar_one()
job = Job(
job_id=job.id,
# ...
creation_time=datetime_from_db(job.creation_time),
start_time=datetime_from_db(job.start_time),
end_time=datetime_from_db(job.end_time),
destruction_time=datetime_from_db(job.destruction_time),
# ...
)
Here is an example of updating a DateTime field in the database:
from safir.database import datetime_to_db
async with session.begin():
stmt = select(SQLJob).where(SQLJob.id == job_id)
job = (await session.execute(stmt)).scalar_one()
job.destruction_time = datetime_to_db(destruction_time)