Deserializer¶
-
class
kafkit.registry.Deserializer(*, registry: RegistryApi)¶ Bases:
objectAn Avro message deserializer that understands the Confluent Wire Format and obtains schemas on-demand from a Confluent Schema Registry.
- Parameters
registry (
kafkit.registry.sansio.RegistryApi) – A registry client.
Notes
The Deserializer works exclusively with Avro-encoded messages in the Confluent Wire Format. This means that schemas for messages must be available from a Confluent Schema Registry.
When an encoded message is deserialized in the
deserializemethod, it does the following steps:Unpacks the wire format prefix to discover the ID of the message’s schema in the schema registry.
Obtains the schema from the
RegistryApi. Schemas are cached, so this is a fast operation.Decodes the message using
fastavro.schemaless_reader.
Why not implement a __call__ method?
The
Serializerimplements a__call__method so that it can be used as a key or value serializer by the aiokafka producer. This Deserializer doesn’t do that becauseDeserializer.deserializeis a coroutine (internally it works with the asynchronousRegistryApi) and magic methods can’t be coroutines. It’s not the end of the world, though, just calldeserializemanually on by bytes obtained by the consumer.Methods Summary
deserialize(data[, include_schema])Deserialize a message.
Methods Documentation
-
async
deserialize(data: bytes, include_schema: bool = False) → Dict[str, Any]¶ Deserialize a message.
- Parameters
data (
bytes) – The encoded message, usually obtained directly from a Kafka consumer. The message must be in the Confluent Wire Format.include_schema (
bool, optional) – IfTrue, the schema itself is included in the returned value. This is useful if your application operates on many different types of messages, and needs a convenient way to introspect a message’s type.
- Returns
message_info – The deserialized message is wrapped in a dictionary to include metadata. The keys are:
- Return type