Managing schema registry schemas with Pydantic Models

Safir provides a PydanticSchemaManager to register and evolve Avro schemas in a Confluent-compatible schema registry via Pydantic models. Specifically, it can:

  • Create schemas in the registry

  • Create new versions of schemas in the registry from changes in the Pydantic models, while validating that those changes are compatible with the compatibility strategies specified in the registry

  • Serialize Pydantic model instances to Avro schemas with the registry’s schema ID

Interactions with the remote registry are cached, so your app will rarely have to make calls to it after initialization.

Constructing the manager

You can use the SchemaManagerSettings Pydantic settings model to take settings from environment variables and construct a .`~safir.kafka.PydanticSchemaManager` with the make_manager method. The SchemaManagerSettings model will try to find its attributes from SCHEMA_MANAGER-prefixed environment variables:

$ export SCHEMA_MANAGER_REGISTRY_URL=https://sasquatch-schema-registry.sasquatch:8081
from safir.kafka import SchemaManagerSettings

config = SchemaManagerSettings()
manager = config.make_manager()

You can also construct the manager manually by giving it an instance of AsyncSchemaRegistryClient:

from safir.kafka import PydanticSchemaManager
from schema_registry.client import AsyncSchemaRegistryClient

registry = AsyncSchemaRegistryClient(url="https://some.url")
manager = PydanticSchemaManager(registry=registry)

Using the manager

Before you can serialize model instances, you need to register the model class with the manager. Models must be subclasses of AvroBaseModel from dataclasses-avroschema. Then, you can serialize instances of the model into Avro messages with the registry schema ID embeded in them.

from dataclasses_avroschema.pydantic import AvroBaseModel


# Define a model
class MyModel(AvroBaseModel):
    field1: int = Field(default=0)
    field2: str


# Register your model, which will:
#
# * Create the schema in the registry if it doesn't exist
# * Create a new version of the schema in the registry if this model has
#   changed since the last time it was registered
# * Do nothing if the schema already exists in the registry and the
#   model hasn't changed.
await manager.register(MyModel)

# Serialize an instance
instance = MyModel(field1=1, field2="woohoo!")
avro: bytes = manager.serialize(instance)

You probably want to register all of your models at the start of your application so that:

  • Any schema registry lookups are cached before your app starts doing stuff

  • Any incompatible changes you have made to the models will prevent the app from starting

Compatibility

One of the main benefits of using the schema registry is that it can tell you if you try to update a schema in a way that is incompatible with how other users of the schema are expecting it to be updated. The specific changes that are incompatible could vary from subject to subject, and depend on that subject’s compatibility type.

You can specify the compatibility type when registering a model. If you don’t specify a compatibility type, the subject will have the default compatibility type set on the schema registry server.

Once the initial version of the schema is created, if you change the model in your app in an incompatible way and try to register it again, the manager will throw an IncompatibleSchemaError.

Warning

Do not deploy your application to an int or dev environment until any changes to your schema are finalized! If an incorrect version of a schema gets registered in one of these environments, and the corrected schema is incompatible with the the incorrect one, you will have to manually delete the incorrect version from the registry.

from safir.kafka import (
    IncompatibleSchemaError,
    SchemaRegistryCompatibility,
    PydanticSchemaManager,
)


class MyModel(AvroBaseModel):
    field1: int
    field2: str


await manager.register(
    MyModel, compatibility=SchemaManagerCompatibility.FORWARD
)

Sometime in the future, if the model changes like this, an exception will be raised upon registration:

class MyModel(AvroBaseModel):
    field1: int


# This will throw an exception!
await manager.register(MyModel)

Subject names

The subject that a schema is registered under is completely independent of any Kafka topics that serialized messages may or may not be published to. In other words, it uses the RecordNameStrategy. The manager uses the combined Avro namespace and record name as the subject name. The record name and namespace come from certain fields on an inner class named Meta:

  • schema_name

  • namespace

Subject: my.namespace.mymodelcustom
class MyModel(AvroBaseModel):
    str_field: str
    int_field: int

    class Meta:
        schema_name = "mymodel"
        namespace = "my.namespace"

If Meta.namespace is absent, then the avro record will have no namespace. You should always include it unless you have a very good reason not to, so that your record names won’t conflict with any other record names in the schema registry. If Meta.schema_name is absent, then the class name will be used as the schema name, but it is good practice to explicitly define Meta.schema_name to avoid unintentionaly changing the schema_name and subject in the process of otherwise routine code refactoring.

Subject suffixes for development

When you’re developing and testing your app, you probably don’t want to register new versions of its schemas in the subjects that actual deployed versions of the app are using. You can instantiate the PydanticSchemaManager with a suffix argument to add that suffix onto all subjects used by the manager:

registry: AsyncSchemaRegistryClient
manager = PydanticSchemaManager(registry=registry, suffix="_testing")

Or by using the helper:

$ export SCHEMA_MANAGER_REGISTRY_URL=https://sasquatch-schema-registry.sasquatch:8081
$ export SCHEMA_MANAGER_SUFFIX=_testing
from safir.kafka import SchemaManagerSettings

config = SchemaManagerSettings()
manager = config.make_manager()

Then the subjects are modified like this:

# subject: my.namespace.mymodelcustom_testing
class MyModel(AvroBaseModel):
    str_field: str
    int_field: int

    class Meta:
        schema_name = "mymodelcustom"
        namespace = "my.namespace"


# subject: my.namespace.MyModel_testing
class MyModel(AvroBaseModel):
    str_field: str
    int_field: int

    class Meta:
        namespace = "my.namespace"


# ...etc.

You shouldn’t use suffixes in production environments.