KafkaConnectionSettings

pydantic settings safir.kafka.KafkaConnectionSettings

Settings for connecting to Kafka.

This settings model supports different authentication methods, which each have different sets of required settings. All of these settings can be provided in KAFKA_ prefixed environment variables. Instances of this model have properties that can be used to construct different types of kafka clients:

from faststream.broker import KafkaBroker

from safir.kafka import KafkaConnectionSettings


config = KafkaConnectionSettings()
kafka_broker = KafkaBroker(**config.faststream_broker_params)

When using this model directly, The validated property enforces at runtime that the correct settings were provided for the desired authentication method, and returns models to access those settings in a type-safe way:

from pathlib import Path


# ValidationError at runtime: ``client_key_path`` is not provided
config = KafkaConnectionSettings(
    bootstrap_servers="something:1234",
    security_protocol=KafkaSecurityProtocol.SSL,
    cluster_ca_path=Path("/some/cert.crt"),
    client_cert_path=Path("/some/other/cert.crt"),
)

config = KafkaConnectionSettings(
    bootstrap_servers="something:1234",
    security_protocol=KafkaSecurityProtocol.SSL,
    cluster_ca_path=Path("/some/path/ca.crt"),
    client_cert_path=Path("/some/path/user.crt"),
    client_key_path=Path("/some/path/user.key"),
)

blah = config.validated.sasl_username  # Static type error
Parameters:

Show JSON schema
{
   "title": "KafkaConnectionSettings",
   "description": "Settings for connecting to Kafka.\n\nThis settings model supports different authentication methods, which each\nhave different sets of required settings. All of these settings can be\nprovided in ``KAFKA_`` prefixed environment variables. Instances of this\nmodel have properties that can be used to construct different types of\nkafka clients:\n\n.. code-block:: python\n\n   from faststream.broker import KafkaBroker\n\n   from safir.kafka import KafkaConnectionSettings\n\n\n   config = KafkaConnectionSettings()\n   kafka_broker = KafkaBroker(**config.faststream_broker_params)\n\nWhen using this model directly, The ``validated`` property enforces at\nruntime that the correct settings were provided for the desired\nauthentication method, and returns models to access those settings in a\ntype-safe way:\n\n.. code-block:: python\n\n   from pathlib import Path\n\n\n   # ValidationError at runtime: ``client_key_path`` is not provided\n   config = KafkaConnectionSettings(\n       bootstrap_servers=\"something:1234\",\n       security_protocol=KafkaSecurityProtocol.SSL,\n       cluster_ca_path=Path(\"/some/cert.crt\"),\n       client_cert_path=Path(\"/some/other/cert.crt\"),\n   )\n\n   config = KafkaConnectionSettings(\n       bootstrap_servers=\"something:1234\",\n       security_protocol=KafkaSecurityProtocol.SSL,\n       cluster_ca_path=Path(\"/some/path/ca.crt\"),\n       client_cert_path=Path(\"/some/path/user.crt\"),\n       client_key_path=Path(\"/some/path/user.key\"),\n   )\n\n   blah = config.validated.sasl_username  # Static type error",
   "type": "object",
   "properties": {
      "bootstrap_servers": {
         "description": "A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas. ",
         "examples": [
            "kafka-1:9092,kafka-2:9092,kafka-3:9092",
            "kafka:9092"
         ],
         "title": "Kafka bootstrap servers",
         "type": "string"
      },
      "security_protocol": {
         "$ref": "#/$defs/SecurityProtocol",
         "description": "The authentication and encryption mode for the connection.",
         "title": "Security Protocol"
      },
      "cluster_ca_path": {
         "anyOf": [
            {
               "format": "file-path",
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The path to the PEM-formatted CA certificate file to use for verifying the broker's certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker's certificate is not signed by a CA trusted by the operating system.",
         "examples": [
            "/some/dir/ca.crt"
         ],
         "title": "Path to CA certificate file"
      },
      "client_cert_path": {
         "anyOf": [
            {
               "format": "file-path",
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.",
         "examples": [
            "/some/dir/user.crt"
         ],
         "title": "Path to client certificate file"
      },
      "client_key_path": {
         "anyOf": [
            {
               "format": "file-path",
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.",
         "examples": [
            "/some/dir/user.key"
         ],
         "title": "Path to client key file"
      },
      "sasl_mechanism": {
         "anyOf": [
            {
               "$ref": "#/$defs/SaslMechanism"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.",
         "title": "SASL mechanism"
      },
      "sasl_username": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.",
         "title": "SASL username"
      },
      "sasl_password": {
         "anyOf": [
            {
               "format": "password",
               "type": "string",
               "writeOnly": true
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.",
         "title": "SASL password"
      }
   },
   "$defs": {
      "SaslMechanism": {
         "description": "Kafka SASL mechanisms.",
         "enum": [
            "PLAIN",
            "SCRAM-SHA-256",
            "SCRAM-SHA-512"
         ],
         "title": "SaslMechanism",
         "type": "string"
      },
      "SecurityProtocol": {
         "description": "Kafka SASL security protocols.",
         "enum": [
            "SASL_PLAINTEXT",
            "SASL_SSL",
            "PLAINTEXT",
            "SSL"
         ],
         "title": "SecurityProtocol",
         "type": "string"
      }
   },
   "additionalProperties": false,
   "required": [
      "bootstrap_servers",
      "security_protocol"
   ]
}

Config:
  • env_prefix: str = KAFKA_

Fields:
Validators:
field bootstrap_servers: str [Required]

A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas.

Validated by:
field client_cert_path: FilePath | None = None

The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.

Validated by:
field client_key_path: FilePath | None = None

The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.

Validated by:
field cluster_ca_path: FilePath | None = None

The path to the PEM-formatted CA certificate file to use for verifying the broker’s certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker’s certificate is not signed by a CA trusted by the operating system.

Validated by:
field sasl_mechanism: SaslMechanism | None = None

The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.

Validated by:
field sasl_password: SecretStr | None = None

The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.

Validated by:
field sasl_username: str | None = None

The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.

Validated by:
field security_protocol: SecurityProtocol [Required]

The authentication and encryption mode for the connection.

Validated by:
to_aiokafka_params()
Return type:

AIOKafkaParams

to_faststream_params()
Return type:

FastStreamBrokerParams

validator validate_auth_settings  »  all fields

Validate that the correct combination of parameters is specified.

Return type:

Self

property validated: SslSettings | SaslSslSettings | SaslPlaintextSettings | PlaintextSettings

Return a model with a subset of settings for a Kafka auth method.

This method will fail with a ValidationError if an invalid set of settings were provided.