Serializer¶
-
class
kafkit.registry.Serializer(*, schema: Dict[str, Any], schema_id: int)¶ Bases:
objectAn Avro message serializer that writes in the Confluent Wire Format.
Use the
Serializer.registerclass method to create a Serializer instance.Notes
A
Serializerinstance is dedicated to serializing messages of a single schema. The serializer is a callable. Send the message to the serializer to encode it.This is an example using the MockRegistryApi client. Real-world applications would use the
kafkit.registry.aiohttp.RegistryApiinstead.>>> from kafkit.registry.sansio import MockRegistryApi >>> client = MockRegistryApi()
For demonstration purposes, assume the schema is already cached:
>>> schema = { ... 'type': 'record', ... 'name': 'schema1', ... 'namespace': 'test-schemas', ... 'fields': [ ... {'name': 'a', 'type': 'int'}, ... {'name': 'b', 'type': 'string'} ... ] ... } >>> client.schemas.insert(schema, 1) >>> serializer = await Serializer.register( ... registry=client, ... schema=schema)
Serialize messages:
>>> data = serializer({'a': 42, 'b': 'Hello world!'})
This serializer works well as
key_serializerorvalue_serializerparameters to theaiokafka.AIOKafkaProducer. See https://aiokafka.readthedocs.io/en/stable/examples/serialize_and_compress.htmlMethods Summary
__call__(data)Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix.
register(*, registry, schema[, subject])Create a serializer ensuring that the schema is registered with the schema registry.
Methods Documentation
-
__call__(data: Any) → bytes¶ Serialize a dataset in the Confluent Schema Registry Wire format, which is an Avro-encoded message with a schema-identifying prefix.
-
async classmethod
register(*, registry: RegistryApi, schema: Dict[str, Any], subject: Optional[str] = None) → Serializer¶ Create a serializer ensuring that the schema is registered with the schema registry.
- Parameters
registry (
kafkit.registry.sansio.RegistryApi) – A registry client.schema (
dict) – An Avro schema.subject (
str, optional) – The name of the Schema Registry subject that the schema is registered under. If not provided, the schema is automatically set from the fully-qualified name of the schema (the'name'field of the schema’s record type). Seekafkit.registry.sansio.RegistryApi.register_schemafor details.
- Returns
serializer – A serializer instance.
- Return type
Notes
It’s safe to call this method even if you know that the schema has been registered before. This process is necessary to get the schema’s ID, and won’t create a new schema if an identical schema is already registered.
-