KafkaMetricsConfiguration

pydantic settings safir.metrics.KafkaMetricsConfiguration

Metrics configuration when enabled, including Kafka configuration.

Parameters:

Show JSON schema
{
   "title": "KafkaMetricsConfiguration",
   "description": "Metrics configuration when enabled, including Kafka configuration.",
   "type": "object",
   "properties": {
      "appName": {
         "description": "The name of the application that is emitting these metrics",
         "title": "Application name",
         "type": "string"
      },
      "events": {
         "$ref": "#/$defs/EventsConfiguration",
         "title": "Events configuration"
      },
      "enabled": {
         "default": true,
         "description": "If set to false, no events will be sent and all calls to publish events will be no-ops.",
         "title": "Whether to send events",
         "type": "boolean"
      },
      "kafka": {
         "$ref": "#/$defs/KafkaConnectionSettings",
         "title": "Kafka connection settings"
      },
      "schemaManager": {
         "$ref": "#/$defs/SchemaManagerSettings",
         "title": "Kafka schema manager settings"
      }
   },
   "$defs": {
      "EventsConfiguration": {
         "additionalProperties": false,
         "description": "Configuration for emitting events.",
         "properties": {
            "topicPrefix": {
               "default": "lsst.square.metrics.events",
               "description": "You probably should use the default here. It could be useful in development scenarios to change this.",
               "title": "Metrics topic prefix",
               "type": "string"
            }
         },
         "title": "EventsConfiguration",
         "type": "object"
      },
      "KafkaConnectionSettings": {
         "additionalProperties": false,
         "description": "Settings for connecting to Kafka.\n\nThis settings model supports different authentication methods, which each\nhave different sets of required settings. All of these settings can be\nprovided in ``KAFKA_`` prefixed environment variables. Instances of this\nmodel have properties that can be used to construct different types of\nkafka clients:\n\n.. code-block:: python\n\n   from faststream.broker import KafkaBroker\n\n   from safir.kafka import KafkaConnectionSettings\n\n\n   config = KafkaConnectionSettings()\n   kafka_broker = KafkaBroker(**config.faststream_broker_params)\n\nWhen using this model directly, The ``validated`` property enforces at\nruntime that the correct settings were provided for the desired\nauthentication method, and returns models to access those settings in a\ntype-safe way:\n\n.. code-block:: python\n\n   from pathlib import Path\n\n\n   # ValidationError at runtime: ``client_key_path`` is not provided\n   config = KafkaConnectionSettings(\n       bootstrap_servers=\"something:1234\",\n       security_protocol=KafkaSecurityProtocol.SSL,\n       cluster_ca_path=Path(\"/some/cert.crt\"),\n       client_cert_path=Path(\"/some/other/cert.crt\"),\n   )\n\n   config = KafkaConnectionSettings(\n       bootstrap_servers=\"something:1234\",\n       security_protocol=KafkaSecurityProtocol.SSL,\n       cluster_ca_path=Path(\"/some/path/ca.crt\"),\n       client_cert_path=Path(\"/some/path/user.crt\"),\n       client_key_path=Path(\"/some/path/user.key\"),\n   )\n\n   blah = config.validated.sasl_username  # Static type error",
         "properties": {
            "bootstrapServers": {
               "description": "A comma-separated list of Kafka brokers to connect to. This should be a list of hostnames or IP addresses, each optionally followed by a port number, separated by commas.",
               "examples": [
                  "kafka-1:9092,kafka-2:9092,kafka-3:9092",
                  "kafka:9092"
               ],
               "title": "Kafka bootstrap servers",
               "type": "string"
            },
            "securityProtocol": {
               "$ref": "#/$defs/SecurityProtocol",
               "description": "The authentication and encryption mode for the connection.",
               "title": "Security Protocol"
            },
            "clusterCaPath": {
               "anyOf": [
                  {
                     "format": "file-path",
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "The path to the PEM-formatted CA certificate file to use for verifying the broker's certificate. This is only needed for SSL and SASL_SSL security protocols, andeven in those cases, only when the broker's certificate is not signed by a CA trusted by the operating system.",
               "examples": [
                  "/some/dir/ca.crt"
               ],
               "title": "Path to CA certificate file"
            },
            "clientCertPath": {
               "anyOf": [
                  {
                     "format": "file-path",
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "The path to the PEM-formated client certificate file to use for authentication. This is only needed if the broker is configured to require SSL client authentication.",
               "examples": [
                  "/some/dir/user.crt"
               ],
               "title": "Path to client certificate file"
            },
            "clientKeyPath": {
               "anyOf": [
                  {
                     "format": "file-path",
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "The path to the PEM-formatted client key file to use for authentication. This is only needed if for the SSL securityprotocol.",
               "examples": [
                  "/some/dir/user.key"
               ],
               "title": "Path to client key file"
            },
            "saslMechanism": {
               "anyOf": [
                  {
                     "$ref": "#/$defs/SaslMechanism"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "The SASL mechanism to use for authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.",
               "title": "SASL mechanism"
            },
            "saslUsername": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "The username to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.",
               "title": "SASL username"
            },
            "saslPassword": {
               "anyOf": [
                  {
                     "format": "password",
                     "type": "string",
                     "writeOnly": true
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "The password to use for SASL authentication. This is only needed for the SASL_SSL and SASL_PLAINTEXT securityprotocols.",
               "title": "SASL password"
            }
         },
         "required": [
            "bootstrapServers",
            "securityProtocol"
         ],
         "title": "KafkaConnectionSettings",
         "type": "object"
      },
      "SaslMechanism": {
         "description": "Kafka SASL mechanisms.",
         "enum": [
            "PLAIN",
            "SCRAM-SHA-256",
            "SCRAM-SHA-512"
         ],
         "title": "SaslMechanism",
         "type": "string"
      },
      "SchemaManagerSettings": {
         "additionalProperties": false,
         "description": "Settings for constructing a `~safir.kafka.PydanticSchemaManager`.",
         "properties": {
            "registryUrl": {
               "description": "URL of a a Confluent-compatible schema registry",
               "format": "uri",
               "minLength": 1,
               "title": "Schema registry URL",
               "type": "string"
            },
            "suffix": {
               "default": "",
               "description": "A suffix that is added to the schema name (and thus the subject name). The suffix creates alternate subjects in the Schema Registry so schemas registered during testing and staging don't affect the compatibility continuity of a production subject. For production, it's best to not set a suffix.",
               "examples": [
                  "_dev1"
               ],
               "title": "Suffix",
               "type": "string"
            }
         },
         "required": [
            "registryUrl"
         ],
         "title": "SchemaManagerSettings",
         "type": "object"
      },
      "SecurityProtocol": {
         "description": "Kafka SASL security protocols.",
         "enum": [
            "SASL_PLAINTEXT",
            "SASL_SSL",
            "PLAINTEXT",
            "SSL"
         ],
         "title": "SecurityProtocol",
         "type": "string"
      }
   },
   "additionalProperties": false,
   "required": [
      "appName"
   ]
}

Config:
  • alias_generator: function = <function to_camel at 0x7ff01d1a3ba0>

  • populate_by_name: bool = True

Fields:
field enabled: Annotated[bool, AfterValidator(lambda x: _require_bool(x, True))] = True

If set to false, no events will be sent and all calls to publish events will be no-ops.

Constraints:
  • func = <function <lambda> at 0x7ff01cc3ae80>

field kafka: KafkaConnectionSettings [Optional]
field schema_manager: SchemaManagerSettings [Optional] (alias 'schemaManager')
make_manager(logger=None)

Construct an EventManager and all of it’s Kafka dependencies.

If your app doesn’t use Kafka or the Schema Registry, this is a shortcut to getting a working event manager without having to manually construct all of the Kafka dependencies.

Parameters:

logger (BoundLogger | None, default: None) – Logger to use for internal logging. If not given, the safir.metrics logger will be used.

Returns:

An event manager appropriate to the configuration.

Return type:

EventManager