KafkaMetricsConfiguration#
- pydantic settings safir.metrics.KafkaMetricsConfiguration#
Metrics configuration when enabled, including Kafka configuration.
- Parameters:
_nested_model_default_partial_update (
bool
|None
, default:None
)_env_file (
Union
[Path
,str
,Sequence
[Union
[Path
,str
]],None
], default:PosixPath('.')
)_cli_parse_args (
bool
|list
[str
] |tuple
[str
,...
] |None
, default:None
)_cli_settings_source (
Optional
[CliSettingsSource
[Any
]], default:None
)_cli_shortcuts (
Mapping
[str
,str
|list
[str
]] |None
, default:None
)_secrets_dir (
Union
[Path
,str
,Sequence
[Union
[Path
,str
]],None
], default:None
)values (
Any
)
Show JSON schema
{ "title": "KafkaMetricsConfiguration", "description": "Metrics configuration when enabled, including Kafka configuration.", "type": "object", "properties": { "appName": { "description": "The name of the application that is emitting these metrics", "title": "Application name", "type": "string" }, "events": { "$ref": "#/$defs/EventsConfiguration", "title": "Events configuration" }, "enabled": { "default": true, "description": "If set to false, no events will be sent and all calls to publish events will be no-ops.", "title": "Whether to send events", "type": "boolean" }, "kafka": { "$ref": "#/$defs/KafkaConnectionSettings", "title": "Kafka connection settings" }, "schemaManager": { "$ref": "#/$defs/SchemaManagerSettings", "title": "Kafka schema manager settings" }, "raiseOnError": { "default": false, "description": "True if we should raise an exception whenever there is an error with the metrics system dependencies, like Kafka or the Schema Manager. False if we should just log an error instead. This should be False for most production apps so that issues with the metrics infrastructure don't bring down the app.", "title": "Raise on error", "type": "boolean" }, "backoffInterval": { "$ref": "#/$defs/HumanTimedelta", "default": "PT5M", "description": "The amount of time to wait until further operations are attempted when the event manager is in an error state.", "title": "Backoff interval" } }, "$defs": { "EventsConfiguration": { "additionalProperties": false, "description": "Configuration for emitting events.", "properties": { "topicPrefix": { "default": "lsst.square.metrics.events", "description": "You probably should use the default here. It could be useful in development scenarios to change this.", "title": "Metrics topic prefix", "type": "string" } }, "title": "EventsConfiguration", "type": "object" }, "HumanTimedelta": { "format": "duration", "type": "string" }, "KafkaConnectionSettings": { "additionalProperties": false, "description": "Settings for connecting to Kafka.\n\nThis settings model supports different authentication methods, which each\nhave different sets of required settings. All of these settings can be\nprovided in ``KAFKA_`` prefixed environment variables. Instances of this\nmodel have properties that can be used to construct different types of\nkafka clients:\n\n.. code-block:: python\n\n from faststream.broker import KafkaBroker\n\n from safir.kafka import KafkaConnectionSettings\n\n\n config = KafkaConnectionSettings()\n kafka_broker = KafkaBroker(**config.faststream_broker_params)\n\nWhen using this model directly, The ``validated`` property enforces at\nruntime that the correct settings were provided for the desired\nauthentication method, and returns models to access those settings in a\ntype-safe way:\n\n.. code-block:: python\n\n from pathlib import Path\n\n\n # ValidationError at runtime: ``client_key_path`` is not provided\n config = KafkaConnectionSettings(\n bootstrap_servers=\"something:1234\",\n security_protocol=KafkaSecurityProtocol.SSL,\n cluster_ca_path=Path(\"/some/cert.crt\"),\n client_cert_path=Path(\"/some/other/cert.crt\"),\n )\n\n config = KafkaConnectionSettings(\n bootstrap_servers=\"something:1234\",\n security_protocol=KafkaSecurityProtocol.SSL,\n cluster_ca_path=Path(\"/some/path/ca.crt\"),\n client_cert_path=Path(\"/some/path/user.crt\"),\n client_key_path=Path(\"/some/path/user.key\"),\n )\n\n blah = config.validated.sasl_username # Static type error", "properties": { "bootstrapServers": { "description": "A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas.", "examples": [ "kafka-1:9092,kafka-2:9092,kafka-3:9092", "kafka:9092" ], "title": "Kafka bootstrap servers", "type": "string" }, "securityProtocol": { "$ref": "#/$defs/SecurityProtocol", "description": "The authentication and encryption mode for the connection.", "title": "Security Protocol" }, "clusterCaPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formatted CA certificate file to use for verifying the broker's certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker's certificate is not signed by a CA trusted by the operating system.", "examples": [ "/some/dir/ca.crt" ], "title": "Path to CA certificate file" }, "clientCertPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.", "examples": [ "/some/dir/user.crt" ], "title": "Path to client certificate file" }, "clientKeyPath": { "anyOf": [ { "format": "file-path", "type": "string" }, { "type": "null" } ], "default": null, "description": "The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.", "examples": [ "/some/dir/user.key" ], "title": "Path to client key file" }, "saslMechanism": { "anyOf": [ { "$ref": "#/$defs/SaslMechanism" }, { "type": "null" } ], "default": null, "description": "The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL mechanism" }, "saslUsername": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL username" }, "saslPassword": { "anyOf": [ { "format": "password", "type": "string", "writeOnly": true }, { "type": "null" } ], "default": null, "description": "The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.", "title": "SASL password" } }, "required": [ "bootstrapServers", "securityProtocol" ], "title": "KafkaConnectionSettings", "type": "object" }, "SaslMechanism": { "description": "Kafka SASL mechanisms.", "enum": [ "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512" ], "title": "SaslMechanism", "type": "string" }, "SchemaManagerSettings": { "additionalProperties": false, "description": "Settings for constructing a `~safir.kafka.PydanticSchemaManager`.", "properties": { "registryUrl": { "description": "URL of a a Confluent-compatible schema registry", "format": "uri", "minLength": 1, "title": "Schema registry URL", "type": "string" }, "suffix": { "default": "", "description": "A suffix that is added to the schema name (and thus the subject name). The suffix creates alternate subjects in the Schema Registry so schemas registered during testing and staging don't affect the compatibility continuity of a production subject. For production, it's best to not set a suffix.", "examples": [ "_dev1" ], "title": "Suffix", "type": "string" } }, "required": [ "registryUrl" ], "title": "SchemaManagerSettings", "type": "object" }, "SecurityProtocol": { "description": "Kafka SASL security protocols.", "enum": [ "SASL_PLAINTEXT", "SASL_SSL", "PLAINTEXT", "SSL" ], "title": "SecurityProtocol", "type": "string" } }, "additionalProperties": false, "required": [ "appName" ] }
- Config:
populate_by_name: bool = True
validate_by_alias: bool = True
validate_by_name: bool = True
alias_generator: function = <function to_camel at 0x7fdd4fcd9f80>
- Fields:
- field backoff_interval: HumanTimedelta = datetime.timedelta(seconds=300) (alias 'backoffInterval')#
The amount of time to wait until further operations are attempted when the event manager is in an error state.
- field enabled: Annotated[bool, AfterValidator(lambda x: _require_bool(x, True))] = True#
If set to false, no events will be sent and all calls to publish events will be no-ops.
- Constraints:
func = <function <lambda> at 0x7fdd4d34fc40>
- field kafka: KafkaConnectionSettings [Optional]#
- field raise_on_error: bool = False (alias 'raiseOnError')#
True if we should raise an exception whenever there is an error with the metrics system dependencies, like Kafka or the Schema Manager. False if we should just log an error instead. This should be False for most production apps so that issues with the metrics infrastructure don’t bring down the app.
- field schema_manager: SchemaManagerSettings [Optional] (alias 'schemaManager')#
- make_manager(logger=None, *, kafka_broker=None, request_timeout_ms=1000, extra_broker_settings=None, extra_admin_client_settings=None)#
Make a KafkaEventManager.
The request timeout is set low by default so we don’t block the instrumented app for too long if kafka is unavailable.
- Parameters:
- Return type: