KafkaConnectionSettings#
- pydantic settings safir.kafka.KafkaConnectionSettings#
Settings for connecting to Kafka.
This settings model supports different authentication methods, which each have different sets of required settings. All of these settings can be provided in
KAFKA_
prefixed environment variables. Instances of this model have properties that can be used to construct different types of kafka clients:from faststream.broker import KafkaBroker from safir.kafka import KafkaConnectionSettings config = KafkaConnectionSettings() kafka_broker = KafkaBroker(**config.faststream_broker_params)
When using this model directly, The
validated
property enforces at runtime that the correct settings were provided for the desired authentication method, and returns models to access those settings in a type-safe way:from pathlib import Path # ValidationError at runtime: ``client_key_path`` is not provided config = KafkaConnectionSettings( bootstrap_servers="something:1234", security_protocol=KafkaSecurityProtocol.SSL, cluster_ca_path=Path("/some/cert.crt"), client_cert_path=Path("/some/other/cert.crt"), ) config = KafkaConnectionSettings( bootstrap_servers="something:1234", security_protocol=KafkaSecurityProtocol.SSL, cluster_ca_path=Path("/some/path/ca.crt"), client_cert_path=Path("/some/path/user.crt"), client_key_path=Path("/some/path/user.key"), ) blah = config.validated.sasl_username # Static type error
- Parameters:
_nested_model_default_partial_update (
bool
|None
, default:None
)_env_file (
Union
[Path
,str
,Sequence
[Union
[Path
,str
]],None
], default:PosixPath('.')
)_cli_parse_args (
bool
|list
[str
] |tuple
[str
,...
] |None
, default:None
)_cli_settings_source (
Optional
[CliSettingsSource
[Any
]], default:None
)_cli_shortcuts (
Mapping
[str
,str
|list
[str
]] |None
, default:None
)_secrets_dir (
Union
[Path
,str
,Sequence
[Union
[Path
,str
]],None
], default:None
)values (
Any
)
Show JSON schema
{ "title": "KafkaConnectionSettings", "description": "Settings for connecting to Kafka.\n\nThis settings model supports different authentication methods, which each\nhave different sets of required settings. All of these settings can be\nprovided in ``KAFKA_`` prefixed environment variables. Instances of this\nmodel have properties that can be used to construct different types of\nkafka clients:\n\n.. code-block:: python\n\n from faststream.broker import KafkaBroker\n\n from safir.kafka import KafkaConnectionSettings\n\n\n config = KafkaConnectionSettings()\n kafka_broker = KafkaBroker(**config.faststream_broker_params)\n\nWhen using this model directly, The ``validated`` property enforces at\nruntime that the correct settings were provided for the desired\nauthentication method, and returns models to access those settings in a\ntype-safe way:\n\n.. code-block:: python\n\n from pathlib import Path\n\n\n # ValidationError at runtime: ``client_key_path`` is not provided\n config = KafkaConnectionSettings(\n bootstrap_servers=\"something:1234\",\n security_protocol=KafkaSecurityProtocol.SSL,\n cluster_ca_path=Path(\"/some/cert.crt\"),\n client_cert_path=Path(\"/some/other/cert.crt\"),\n )\n\n config = KafkaConnectionSettings(\n bootstrap_servers=\"something:1234\",\n security_protocol=KafkaSecurityProtocol.SSL,\n cluster_ca_path=Path(\"/some/path/ca.crt\"),\n client_cert_path=Path(\"/some/path/user.crt\"),\n client_key_path=Path(\"/some/path/user.key\"),\n )\n\n blah = config.validated.sasl_username # Static type error", "type": "object", "properties": { "bootstrapServers": { "description": "A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas.", "examples": [ "kafka-1:9092,kafka-2:9092,kafka-3:9092", "kafka:9092" ], "title": "Kafka bootstrap servers", "type": "string" }, "securityProtocol": { "$ref": "#/$defs/SecurityProtocol", "description": "The authentication and encryption mode for the connection.", "title": "Security Protocol" }, "clusterCaPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formatted CA certificate file to use for verifying the broker's certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker's certificate is not signed by a CA trusted by the operating system.", "examples": [ "/some/dir/ca.crt" ], "title": "Path to CA certificate file" }, "clientCertPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.", "examples": [ "/some/dir/user.crt" ], "title": "Path to client certificate file" }, "clientKeyPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.", "examples": [ "/some/dir/user.key" ], "title": "Path to client key file" }, "saslMechanism": { "anyOf": [ { "$ref": "#/$defs/SaslMechanism" }, { "type": "null" } ], "default": null, "description": "The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL mechanism" }, "saslUsername": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL username" }, "saslPassword": { "anyOf": [ { "format": "password", "type": "string", "writeOnly": true }, { "type": "null" } ], "default": null, "description": "The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL password" } }, "$defs": { "SaslMechanism": { "description": "Kafka SASL mechanisms.", "enum": [ "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512" ], "title": "SaslMechanism", "type": "string" }, "SecurityProtocol": { "description": "Kafka SASL security protocols.", "enum": [ "SASL_PLAINTEXT", "SASL_SSL", "PLAINTEXT", "SSL" ], "title": "SecurityProtocol", "type": "string" } }, "additionalProperties": false, "required": [ "bootstrapServers", "securityProtocol" ] }
- Config:
populate_by_name: bool = True
validate_by_alias: bool = True
validate_by_name: bool = True
- Fields:
- Validators:
validate_auth_settings
»all fields
- field bootstrap_servers: str [Required]#
A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas.
- Validated by:
- field client_cert_path: FilePath | None = None#
The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.
- Validated by:
- field client_key_path: FilePath | None = None#
The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.
- Validated by:
- field cluster_ca_path: FilePath | None = None#
The path to the PEM-formatted CA certificate file to use for verifying the broker’s certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker’s certificate is not signed by a CA trusted by the operating system.
- Validated by:
- field sasl_mechanism: SaslMechanism | None = None#
The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.
- Validated by:
- field sasl_password: SecretStr | None = None#
The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.
- Validated by:
- field sasl_username: str | None = None#
The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.
- Validated by:
- field security_protocol: SecurityProtocol [Required]#
The authentication and encryption mode for the connection.
- Validated by:
- to_aiokafka_params()#
- Return type:
- to_faststream_params()#
- Return type:
- validator validate_auth_settings » all fields#
Validate that the correct combination of parameters is specified.
- Return type:
Self
- property validated: SslSettings | SaslSslSettings | SaslPlaintextSettings | PlaintextSettings#
Return a model with a subset of settings for a Kafka auth method.
This method will fail with a ValidationError if an invalid set of settings were provided.