KafkaConnectionSettings#

pydantic settings safir.kafka.KafkaConnectionSettings#

Settings for connecting to Kafka.

This settings model supports different authentication methods, which each have different sets of required settings. All of these settings can be provided in KAFKA_ prefixed environment variables. Instances of this model have properties that can be used to construct different types of kafka clients:

from faststream.broker import KafkaBroker

from safir.kafka import KafkaConnectionSettings


config = KafkaConnectionSettings()
kafka_broker = KafkaBroker(**config.faststream_broker_params)

When using this model directly, The validated property enforces at runtime that the correct settings were provided for the desired authentication method, and returns models to access those settings in a type-safe way:

from pathlib import Path


# ValidationError at runtime: ``client_key_path`` is not provided
config = KafkaConnectionSettings(
    bootstrap_servers="something:1234",
    security_protocol=KafkaSecurityProtocol.SSL,
    cluster_ca_path=Path("/some/cert.crt"),
    client_cert_path=Path("/some/other/cert.crt"),
)

config = KafkaConnectionSettings(
    bootstrap_servers="something:1234",
    security_protocol=KafkaSecurityProtocol.SSL,
    cluster_ca_path=Path("/some/path/ca.crt"),
    client_cert_path=Path("/some/path/user.crt"),
    client_key_path=Path("/some/path/user.key"),
)

blah = config.validated.sasl_username  # Static type error
Parameters:
  • _case_sensitive (bool | None, default: None)

  • _nested_model_default_partial_update (bool | None, default: None)

  • _env_prefix (str | None, default: None)

  • _env_file (Union[Path, str, Sequence[Union[Path, str]], None], default: PosixPath('.'))

  • _env_file_encoding (str | None, default: None)

  • _env_ignore_empty (bool | None, default: None)

  • _env_nested_delimiter (str | None, default: None)

  • _env_nested_max_split (int | None, default: None)

  • _env_parse_none_str (str | None, default: None)

  • _env_parse_enums (bool | None, default: None)

  • _cli_prog_name (str | None, default: None)

  • _cli_parse_args (bool | list[str] | tuple[str, ...] | None, default: None)

  • _cli_settings_source (Optional[CliSettingsSource[Any]], default: None)

  • _cli_parse_none_str (str | None, default: None)

  • _cli_hide_none_type (bool | None, default: None)

  • _cli_avoid_json (bool | None, default: None)

  • _cli_enforce_required (bool | None, default: None)

  • _cli_use_class_docs_for_groups (bool | None, default: None)

  • _cli_exit_on_error (bool | None, default: None)

  • _cli_prefix (str | None, default: None)

  • _cli_flag_prefix_char (str | None, default: None)

  • _cli_implicit_flags (bool | None, default: None)

  • _cli_ignore_unknown_args (bool | None, default: None)

  • _cli_kebab_case (bool | None, default: None)

  • _cli_shortcuts (Mapping[str, str | list[str]] | None, default: None)

  • _secrets_dir (Union[Path, str, Sequence[Union[Path, str]], None], default: None)

  • values (Any)

Show JSON schema
{
   "title": "KafkaConnectionSettings",
   "description": "Settings for connecting to Kafka.\n\nThis settings model supports different authentication methods, which each\nhave different sets of required settings. All of these settings can be\nprovided in ``KAFKA_`` prefixed environment variables. Instances of this\nmodel have properties that can be used to construct different types of\nkafka clients:\n\n.. code-block:: python\n\n   from faststream.broker import KafkaBroker\n\n   from safir.kafka import KafkaConnectionSettings\n\n\n   config = KafkaConnectionSettings()\n   kafka_broker = KafkaBroker(**config.faststream_broker_params)\n\nWhen using this model directly, The ``validated`` property enforces at\nruntime that the correct settings were provided for the desired\nauthentication method, and returns models to access those settings in a\ntype-safe way:\n\n.. code-block:: python\n\n   from pathlib import Path\n\n\n   # ValidationError at runtime: ``client_key_path`` is not provided\n   config = KafkaConnectionSettings(\n       bootstrap_servers=\"something:1234\",\n       security_protocol=KafkaSecurityProtocol.SSL,\n       cluster_ca_path=Path(\"/some/cert.crt\"),\n       client_cert_path=Path(\"/some/other/cert.crt\"),\n   )\n\n   config = KafkaConnectionSettings(\n       bootstrap_servers=\"something:1234\",\n       security_protocol=KafkaSecurityProtocol.SSL,\n       cluster_ca_path=Path(\"/some/path/ca.crt\"),\n       client_cert_path=Path(\"/some/path/user.crt\"),\n       client_key_path=Path(\"/some/path/user.key\"),\n   )\n\n   blah = config.validated.sasl_username  # Static type error",
   "type": "object",
   "properties": {
      "bootstrapServers": {
         "description": "A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas.",
         "examples": [
            "kafka-1:9092,kafka-2:9092,kafka-3:9092",
            "kafka:9092"
         ],
         "title": "Kafka bootstrap servers",
         "type": "string"
      },
      "securityProtocol": {
         "$ref": "#/$defs/SecurityProtocol",
         "description": "The authentication and encryption mode for the connection.",
         "title": "Security Protocol"
      },
      "clusterCaPath": {
         "anyOf": [
            {
               "format": "file-path",
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The path to the PEM-formatted CA certificate file to use for verifying the broker's certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker's certificate is not signed by a CA trusted by the operating system.",
         "examples": [
            "/some/dir/ca.crt"
         ],
         "title": "Path to CA certificate file"
      },
      "clientCertPath": {
         "anyOf": [
            {
               "format": "file-path",
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.",
         "examples": [
            "/some/dir/user.crt"
         ],
         "title": "Path to client certificate file"
      },
      "clientKeyPath": {
         "anyOf": [
            {
               "format": "file-path",
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.",
         "examples": [
            "/some/dir/user.key"
         ],
         "title": "Path to client key file"
      },
      "saslMechanism": {
         "anyOf": [
            {
               "$ref": "#/$defs/SaslMechanism"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.",
         "title": "SASL mechanism"
      },
      "saslUsername": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.",
         "title": "SASL username"
      },
      "saslPassword": {
         "anyOf": [
            {
               "format": "password",
               "type": "string",
               "writeOnly": true
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.",
         "title": "SASL password"
      }
   },
   "$defs": {
      "SaslMechanism": {
         "description": "Kafka SASL mechanisms.",
         "enum": [
            "PLAIN",
            "SCRAM-SHA-256",
            "SCRAM-SHA-512"
         ],
         "title": "SaslMechanism",
         "type": "string"
      },
      "SecurityProtocol": {
         "description": "Kafka SASL security protocols.",
         "enum": [
            "SASL_PLAINTEXT",
            "SASL_SSL",
            "PLAINTEXT",
            "SSL"
         ],
         "title": "SecurityProtocol",
         "type": "string"
      }
   },
   "additionalProperties": false,
   "required": [
      "bootstrapServers",
      "securityProtocol"
   ]
}

Config:
  • populate_by_name: bool = True

  • validate_by_alias: bool = True

  • validate_by_name: bool = True

Fields:
Validators:
field bootstrap_servers: str [Required]#

A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas.

Validated by:
field client_cert_path: FilePath | None = None#

The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.

Validated by:
field client_key_path: FilePath | None = None#

The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.

Validated by:
field cluster_ca_path: FilePath | None = None#

The path to the PEM-formatted CA certificate file to use for verifying the broker’s certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker’s certificate is not signed by a CA trusted by the operating system.

Validated by:
field sasl_mechanism: SaslMechanism | None = None#

The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.

Validated by:
field sasl_password: SecretStr | None = None#

The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.

Validated by:
field sasl_username: str | None = None#

The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.

Validated by:
field security_protocol: SecurityProtocol [Required]#

The authentication and encryption mode for the connection.

Validated by:
to_aiokafka_params()#
Return type:

AIOKafkaParams

to_faststream_params()#
Return type:

FastStreamBrokerParams

validator validate_auth_settings  »  all fields#

Validate that the correct combination of parameters is specified.

Return type:

Self

property validated: SslSettings | SaslSslSettings | SaslPlaintextSettings | PlaintextSettings#

Return a model with a subset of settings for a Kafka auth method.

This method will fail with a ValidationError if an invalid set of settings were provided.