PydanticSchemaManager

class safir.kafka.PydanticSchemaManager(registry, suffix='', logger=None)

Bases: object

A manager for schemas that are represented as Pydantic models in Python, and translated into Avro for the Confluent Schema Registry.

It can be used to:

  • Register new schemas from Pydantic models

  • Register new versions of existing schemas from changed Pydantic models, after checking compatibility with the latest version in the registry

Parameters:
  • registry (AsyncSchemaRegistryClient) – A schema registry client, or connection settings for creating one

  • suffix (str, default: '') –

    A suffix that is added to the schema name (and thus subject name), for example _dev1.

    The suffix creates alternate subjects in the Schema Registry so schemas registered during testing and staging don’t affect the compatibility continuity of a production subject.

    For production, it’s best to not set a suffix.

  • logger (BoundLogger | None, default: None) – Logger to use for internal logging. If not given, the safir.kafka.manager logger will be used.

Methods Summary

register_model(model[, compatibility])

Register the model with the registry.

serialize(data)

Serialize the data.

Methods Documentation

async register_model(model, compatibility=None)

Register the model with the registry.

Parameters:
  • model (type[AvroBaseModel]) – The model to register.

  • compatibility (Compatibility | None, default: None) – The registry compatibility type for this model. If not provided, it will default to the default type configured in the registry.

Raises:

IncompatibleSchemaError – If the schema is incompatible with the latest version of schema in the registry, according to the subject’s compatibility type.

Return type:

SchemaInfo

async serialize(data)

Serialize the data.

The model’s schema must have been registered before calling this method.

Parameters:

data (AvroBaseModel) – The data to serialize.

Returns:

The serialized data in the Confluent Wire Format.

Return type:

bytes