KafkaConnectionSettings#
- pydantic settings safir.kafka.KafkaConnectionSettings#
Settings for connecting to Kafka.
This settings model supports different authentication methods, which each have different sets of required settings. All of these settings can be provided in
KAFKA_prefixed environment variables. Instances of this model have properties that can be used to construct different types of kafka clients:from faststream.broker import KafkaBroker from safir.kafka import KafkaConnectionSettings config = KafkaConnectionSettings() kafka_broker = KafkaBroker(**config.faststream_broker_params)
When using this model directly, The
validatedproperty enforces at runtime that the correct settings were provided for the desired authentication method, and returns models to access those settings in a type-safe way:from pathlib import Path # ValidationError at runtime: ``client_key_path`` is not provided config = KafkaConnectionSettings( bootstrap_servers="something:1234", security_protocol=KafkaSecurityProtocol.SSL, cluster_ca_path=Path("/some/cert.crt"), client_cert_path=Path("/some/other/cert.crt"), ) config = KafkaConnectionSettings( bootstrap_servers="something:1234", security_protocol=KafkaSecurityProtocol.SSL, cluster_ca_path=Path("/some/path/ca.crt"), client_cert_path=Path("/some/path/user.crt"), client_key_path=Path("/some/path/user.key"), ) blah = config.validated.sasl_username # Static type error
- Parameters:
_nested_model_default_partial_update (
Optional[bool], default:None)_env_prefix_target (
Optional[Literal['variable','alias','all']], default:None)_env_file (
Union[Path,str,Sequence[Union[Path,str]],None], default:PosixPath('.'))_cli_parse_args (
Union[bool,list[str],tuple[str,...],None], default:None)_cli_settings_source (
Optional[CliSettingsSource[Any]], default:None)_cli_use_class_docs_for_groups (
Optional[bool], default:None)_cli_implicit_flags (
Union[bool,Literal['dual','toggle'],None], default:None)_cli_kebab_case (
Union[bool,Literal['all','no_enums'],None], default:None)_cli_shortcuts (
Optional[Mapping[str,Union[str,list[str]]]], default:None)_secrets_dir (
Union[Path,str,Sequence[Union[Path,str]],None], default:None)_build_sources (
Optional[tuple[tuple[PydanticBaseSettingsSource,...],dict[str,Any]]], default:None)values (
Any)
Show JSON schema
{ "title": "KafkaConnectionSettings", "description": "Settings for connecting to Kafka.\n\nThis settings model supports different authentication methods, which each\nhave different sets of required settings. All of these settings can be\nprovided in ``KAFKA_`` prefixed environment variables. Instances of this\nmodel have properties that can be used to construct different types of\nkafka clients:\n\n.. code-block:: python\n\n from faststream.broker import KafkaBroker\n\n from safir.kafka import KafkaConnectionSettings\n\n\n config = KafkaConnectionSettings()\n kafka_broker = KafkaBroker(**config.faststream_broker_params)\n\nWhen using this model directly, The ``validated`` property enforces at\nruntime that the correct settings were provided for the desired\nauthentication method, and returns models to access those settings in a\ntype-safe way:\n\n.. code-block:: python\n\n from pathlib import Path\n\n\n # ValidationError at runtime: ``client_key_path`` is not provided\n config = KafkaConnectionSettings(\n bootstrap_servers=\"something:1234\",\n security_protocol=KafkaSecurityProtocol.SSL,\n cluster_ca_path=Path(\"/some/cert.crt\"),\n client_cert_path=Path(\"/some/other/cert.crt\"),\n )\n\n config = KafkaConnectionSettings(\n bootstrap_servers=\"something:1234\",\n security_protocol=KafkaSecurityProtocol.SSL,\n cluster_ca_path=Path(\"/some/path/ca.crt\"),\n client_cert_path=Path(\"/some/path/user.crt\"),\n client_key_path=Path(\"/some/path/user.key\"),\n )\n\n blah = config.validated.sasl_username # Static type error", "type": "object", "properties": { "bootstrapServers": { "description": "A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas.", "examples": [ "kafka-1:9092,kafka-2:9092,kafka-3:9092", "kafka:9092" ], "title": "Kafka bootstrap servers", "type": "string" }, "securityProtocol": { "$ref": "#/$defs/SecurityProtocol", "description": "The authentication and encryption mode for the connection.", "title": "Security Protocol" }, "clusterCaPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formatted CA certificate file to use for verifying the broker's certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker's certificate is not signed by a CA trusted by the operating system.", "examples": [ "/some/dir/ca.crt" ], "title": "Path to CA certificate file" }, "clientCertPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.", "examples": [ "/some/dir/user.crt" ], "title": "Path to client certificate file" }, "clientKeyPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.", "examples": [ "/some/dir/user.key" ], "title": "Path to client key file" }, "saslMechanism": { "anyOf": [ { "$ref": "#/$defs/SaslMechanism" }, { "type": "null" } ], "default": null, "description": "The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL mechanism" }, "saslUsername": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL username" }, "saslPassword": { "anyOf": [ { "format": "password", "type": "string", "writeOnly": true }, { "type": "null" } ], "default": null, "description": "The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL password" } }, "$defs": { "SaslMechanism": { "description": "Kafka SASL mechanisms.", "enum": [ "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512" ], "title": "SaslMechanism", "type": "string" }, "SecurityProtocol": { "description": "Kafka SASL security protocols.", "enum": [ "SASL_PLAINTEXT", "SASL_SSL", "PLAINTEXT", "SSL" ], "title": "SecurityProtocol", "type": "string" } }, "additionalProperties": false, "required": [ "bootstrapServers", "securityProtocol" ] }
- Config:
populate_by_name: bool = True
validate_by_alias: bool = True
validate_by_name: bool = True
- Fields:
- Validators:
validate_auth_settings»all fields
-
field bootstrap_servers:
str[Required]# A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas.
- Validated by:
-
field client_cert_path:
Optional[Annotated[Path]] = None# The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.
- Validated by:
-
field client_key_path:
Optional[Annotated[Path]] = None# The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.
- Validated by:
-
field cluster_ca_path:
Optional[Annotated[Path]] = None# The path to the PEM-formatted CA certificate file to use for verifying the broker’s certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker’s certificate is not signed by a CA trusted by the operating system.
- Validated by:
-
field sasl_mechanism:
Optional[SaslMechanism] = None# The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.
- Validated by:
-
field sasl_password:
Optional[SecretStr] = None# The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.
- Validated by:
-
field sasl_username:
Optional[str] = None# The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.
- Validated by:
-
field security_protocol:
SecurityProtocol[Required]# The authentication and encryption mode for the connection.
- Validated by:
- to_aiokafka_params()#
- Return type:
- to_faststream_params()#
- Return type:
- validator validate_auth_settings » all fields#
Validate that the correct combination of parameters is specified.
- Return type:
Self
- property validated: SslSettings | SaslSslSettings | SaslPlaintextSettings | PlaintextSettings#
Return a model with a subset of settings for a Kafka auth method.
This method will fail with a ValidationError if an invalid set of settings were provided.