KafkaConnectionSettings¶
- pydantic settings safir.kafka.KafkaConnectionSettings¶
Settings for connecting to Kafka.
This settings model supports different authentication methods, which each have different sets of required settings. All of these settings can be provided in
KAFKA_
prefixed environment variables. Instances of this model have properties that can be used to construct different types of kafka clients:from faststream.broker import KafkaBroker from safir.kafka import KafkaConnectionSettings config = KafkaConnectionSettings() kafka_broker = KafkaBroker(**config.faststream_broker_params)
When using this model directly, The
validated
property enforces at runtime that the correct settings were provided for the desired authentication method, and returns models to access those settings in a type-safe way:from pathlib import Path # ValidationError at runtime: ``client_key_path`` is not provided config = KafkaConnectionSettings( bootstrap_servers="something:1234", security_protocol=KafkaSecurityProtocol.SSL, cluster_ca_path=Path("/some/cert.crt"), client_cert_path=Path("/some/other/cert.crt"), ) config = KafkaConnectionSettings( bootstrap_servers="something:1234", security_protocol=KafkaSecurityProtocol.SSL, cluster_ca_path=Path("/some/path/ca.crt"), client_cert_path=Path("/some/path/user.crt"), client_key_path=Path("/some/path/user.key"), ) blah = config.validated.sasl_username # Static type error
- Parameters:
_nested_model_default_partial_update (
bool
|None
, default:None
)_env_file (
Union
[Path
,str
,List
[Union
[str
,Path
]],Tuple
[Union
[Path
,str
],...
],None
], default:PosixPath('.')
)_cli_parse_args (
bool
|list
[str
] |tuple
[str
,...
] |None
, default:None
)_cli_settings_source (
Optional
[CliSettingsSource
[Any
]], default:None
)_secrets_dir (
Union
[Path
,str
,List
[Union
[str
,Path
]],Tuple
[Union
[Path
,str
],...
],None
], default:None
)values (
Any
)
Show JSON schema
{ "title": "KafkaConnectionSettings", "description": "Settings for connecting to Kafka.\n\nThis settings model supports different authentication methods, which each\nhave different sets of required settings. All of these settings can be\nprovided in ``KAFKA_`` prefixed environment variables. Instances of this\nmodel have properties that can be used to construct different types of\nkafka clients:\n\n.. code-block:: python\n\n from faststream.broker import KafkaBroker\n\n from safir.kafka import KafkaConnectionSettings\n\n\n config = KafkaConnectionSettings()\n kafka_broker = KafkaBroker(**config.faststream_broker_params)\n\nWhen using this model directly, The ``validated`` property enforces at\nruntime that the correct settings were provided for the desired\nauthentication method, and returns models to access those settings in a\ntype-safe way:\n\n.. code-block:: python\n\n from pathlib import Path\n\n\n # ValidationError at runtime: ``client_key_path`` is not provided\n config = KafkaConnectionSettings(\n bootstrap_servers=\"something:1234\",\n security_protocol=KafkaSecurityProtocol.SSL,\n cluster_ca_path=Path(\"/some/cert.crt\"),\n client_cert_path=Path(\"/some/other/cert.crt\"),\n )\n\n config = KafkaConnectionSettings(\n bootstrap_servers=\"something:1234\",\n security_protocol=KafkaSecurityProtocol.SSL,\n cluster_ca_path=Path(\"/some/path/ca.crt\"),\n client_cert_path=Path(\"/some/path/user.crt\"),\n client_key_path=Path(\"/some/path/user.key\"),\n )\n\n blah = config.validated.sasl_username # Static type error", "type": "object", "properties": { "bootstrapServers": { "description": "A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas.", "examples": [ "kafka-1:9092,kafka-2:9092,kafka-3:9092", "kafka:9092" ], "title": "Kafka bootstrap servers", "type": "string" }, "securityProtocol": { "$ref": "#/$defs/SecurityProtocol", "description": "The authentication and encryption mode for the connection.", "title": "Security Protocol" }, "clusterCaPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formatted CA certificate file to use for verifying the broker's certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker's certificate is not signed by a CA trusted by the operating system.", "examples": [ "/some/dir/ca.crt" ], "title": "Path to CA certificate file" }, "clientCertPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.", "examples": [ "/some/dir/user.crt" ], "title": "Path to client certificate file" }, "clientKeyPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.", "examples": [ "/some/dir/user.key" ], "title": "Path to client key file" }, "saslMechanism": { "anyOf": [ { "$ref": "#/$defs/SaslMechanism" }, { "type": "null" } ], "default": null, "description": "The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL mechanism" }, "saslUsername": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL username" }, "saslPassword": { "anyOf": [ { "format": "password", "type": "string", "writeOnly": true }, { "type": "null" } ], "default": null, "description": "The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL password" } }, "$defs": { "SaslMechanism": { "description": "Kafka SASL mechanisms.", "enum": [ "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512" ], "title": "SaslMechanism", "type": "string" }, "SecurityProtocol": { "description": "Kafka SASL security protocols.", "enum": [ "SASL_PLAINTEXT", "SASL_SSL", "PLAINTEXT", "SSL" ], "title": "SecurityProtocol", "type": "string" } }, "additionalProperties": false, "required": [ "bootstrapServers", "securityProtocol" ] }
- Config:
env_prefix: str = KAFKA_
populate_by_name: bool = True
- Fields:
- Validators:
validate_auth_settings
»all fields
- field bootstrap_servers: str [Required]¶
A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas.
- Validated by:
- field client_cert_path: FilePath | None = None¶
The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.
- Validated by:
- field client_key_path: FilePath | None = None¶
The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.
- Validated by:
- field cluster_ca_path: FilePath | None = None¶
The path to the PEM-formatted CA certificate file to use for verifying the broker’s certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker’s certificate is not signed by a CA trusted by the operating system.
- Validated by:
- field sasl_mechanism: SaslMechanism | None = None¶
The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.
- Validated by:
- field sasl_password: SecretStr | None = None¶
The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.
- Validated by:
- field sasl_username: str | None = None¶
The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.
- Validated by:
- field security_protocol: SecurityProtocol [Required]¶
The authentication and encryption mode for the connection.
- Validated by:
- to_aiokafka_params()¶
- Return type:
- to_faststream_params()¶
- Return type:
- validator validate_auth_settings » all fields¶
Validate that the correct combination of parameters is specified.
- Return type:
Self
- property validated: SslSettings | SaslSslSettings | SaslPlaintextSettings | PlaintextSettings¶
Return a model with a subset of settings for a Kafka auth method.
This method will fail with a ValidationError if an invalid set of settings were provided.