Application metrics

Safir provides helpers for publishing application metrics to Sasquatch. Using these helpers, your can instrument your app to push custom events to a Sasquatch-managed InfluxDB instance, and then use the Sasquatch-managed Chronograf to query and graph them.

Metrics vs. telemetry

Note that this system is not meant to handle telemetry. Telemetry includes things like:

  • The amount of CPU and Memory an app is using

  • Kubernetes events and usage

Metrics, which this system deals with, are things like:

  • A user generated a token. This event might include the username and token details.

  • A user made a database query. This event might include the username, and type of query (sync vs async).

  • A user starts a Nublado lab. This event might include the username, the size of the pod, and the image used.

For more details and examples of the difference between metrics and telemetery, see https://sqr-089.lsst.io/ (still in progress).

Full example

Here’s an example of how you might use these tools in typical FastAPI app. Individual components in this example are detailed in subsequent sections.

To publish events, you’ll instantiate one of the subclasses of EventManager, generally KafkaEventManager. When your app starts up, you’ll use it to create an event publisher for every type of event that your app will publish. Then, throughout your app, you’ll call the publish method on these publishers to publish individual events.

Sasquatch config

Add your app to the Sasquatch app metrics configuration!

Config

We need to put some config in our environment for the metrics functionality, a Kafka connection, and a schema registry connection. The Kafka and schema manager values come from the Sasquatch configuration that you did and they usually are set in a Kubernetes Deployment template in your app’s Phalanx config.

METRICS_APPLICATION=myapp
METRICS_EVENTS_TOPIC_PREFIX=what.ever
KAFKA_SECURITY_PROTOCOL=SSL
KAFKA_BOOSTRAP_SERVERS=sasquatch.kafka-1:9092,sasquatcy.kafka2-9092
KAFKA_CLUSTER_CA_PATH=/some/path/ca.crt
KAFKA_CLIENT_CERT_PATH=/some/path/user.crt
KAFKA_CLIENT_KEY_PATH=/some/path/user.key
SCHEMA_MANAGER_REGISTRY_URL=https://sasquatch-schema-registry.sasquatch:8081

Then we can use the metrics config helpers and the Safir Kafka connection helpers to write a Pydantic BaseSettings config model and singleton for our app. Instantiating the model will pull values from the above environment variables. See Configuration details for more info.

config.py
from pydantic import Field, HttpUrl
from pydantic_settings import BaseSettings, SettingsConfigDict
from safir.metrics import KafkaMetricsConfiguration, MetricsConfiguration


class Config(BaseSettings):
    an_important_url: HttpUrl = Field(
        ...,
        title="URL to something important",
    )

    metrics: MetricsConfiguration = Field(
        default_factory=KafkaMetricsConfiguration,
        title="Metrics configuration",
    )

    model_config = SettingsConfigDict(
        env_prefix="MYAPP_", case_sensitive=False
    )


config = Config()

Define events

Next, we need to:

  • Define our event payloads

  • Define and an events container that takes an EventManager and creates a publisher for each event our app will ever publish

  • Instantiate an EventDependency, which we’ll initialize in our app start up laster.

We can do this all in an events.py file.

Note

Fields in metrics events can’t be other models or other nested types like dicts, because the current event datastore (InfluxDB) does not support this. Basing our event payloads on safir.metrics.EventPayload will enable the EventManager to ensure at runtime when our events are registered that they don’t contain incompatible fields.

Note

Any timedelta fields will be serialized as an Avro double number of seconds.

metrics.py
from enum import Enum
from datetime import timedelta

from pydantic import Field
from safir.metrics import (
    EventManager,
    EventPayload,
)
from safir.dependencies.metrics import EventDependency, EventMaker


class QueryType(Enum):
    async_ = "async"
    sync = "sync"


class QueryEvent(EventPayload):
    """Information about a user-submitted query."""

    type: QueryType = Field(
        title="Query type", description="The kind of query"
    )

    duration: timedelta = Field(
        title="Query duration", description="How long the query took to run"
    )


class Events(EventMaker):
    async def initialize(self, manager: EventManager) -> None:
        self.query = await manager.create_publisher("query", QueryEvent)


# We'll call .initalize on this in our app start up
events_dependency = EventDependency(Events())

Initialize

Then, in a FastAPI lifespan function, we’ll create an safir.metrics.EventManager and initialize our events_dependency with it. We need to do this in a lifespan function, because we need to do it only once for our whole application, not once for each request. In more complex apps, this would probably use the ProcessContext pattern.

main.py
from contextlib import asynccontextmanager

from fastapi import FastAPI
from safir.metrics import EventManager

from .config import config
from .events import events_dependency


@asynccontextmanager
async def lifespan(app: FastAPI):
    event_manager = config.metrics.make_manager()
    await event_manager.initialize()
    await events_dependency.initialize(event_manager)

    yield

    await event_manager.aclose()


app = FastAPI(lifespan=lifespan)

Handlers

In your handler functions, you can inject your events container as a FastAPI dependency. You can then publish events using the attributes on the dependency. It is statically checked that calls to the publishers’ publish methods receive instances of the payload types that they were registered with.

In real apps:

  • The injection would probably happen via a RequestContext

  • The request handling and event publishing would probably happen in a Service

But the principle remains the same:

main.py (continued)
from datetime import timedelta

from fastapi import Depends
from pydantic import BaseModel

from .metrics import Events, events_dependency, QueryEvent
from .models import QueryRequest  # Not shown


@app.get("/query")
async def query(
    query: QueryRequest,
    events: Annotated[Events, Depends(events_dependency)],
):
    duration: timedelta = do_the_query(query.type, query.query)
    await events.query.publish(
        QueryEvent(type=query.type, duration=duration)
    )

Unit testing

Setting enabled to False and mock to True in your metrics configuration will give you a safir.metrics.MockEventManager. This is a no-op event manager that produces publishers that record all of the events that they publish. You can make assertions about these published events in your unit tests.

Warning

Do not use the safir.metrics.MockEventManager in any deployed instance of your application. Recorded events are never cleaned up, and memory usage will grow unbounded.

METRICS_APPLICATION=myapp
METRICS_ENABLED=false
METRICS_MOCK=true
METRICS_EVENTS_TOPIC_PREFIX=what.ever
from pydantic import ConfigDict

from safir.metrics import (
    EventPayload,
    MockEventPublisher,
    metrics_configuration_factory,
)


config = metrics_configuration_factory()
manager = config.make_manager()


class SomeEvent(EventPayload):
    model_config = ConfigDict(ser_json_timedelta="float")

    foo: str
    count: int
    duration: float | None


await manager.initialize()
publisher = await manager.create_publisher("someevent", SomeEvent)

await publisher.publish(SomeEvent(foo="foo1", count=1, duration=1.234))
await publisher.publish(SomeEvent(foo="foo2", count=2, duration=2.345))
await publisher.publish(SomeEvent(foo="foo3", count=3, duration=3.456))
await publisher.publish(SomeEvent(foo="foo4", count=4, duration=None))
await publisher.publish(SomeEvent(foo="foo5", count=5, duration=5.678))

await manager.aclose()

pub = cast(MockEventPublisher, pub).published

A mock publisher has an safir.metrics.MockEventPublisher.published attribute which is a safir.metrics.PublishedList containing of all of the safir.metrics.EventPayload’s published by that publisher. A safir.metrics.PublishedList is a regular Python list with some mixed-in assertion methods. All of these assertion methods take a list of dicts and compare them to the model_dump(mode="json") serialization of the published EventPayloads.

assert_published

Use safir.metrics.PublishedList.assert_published to assert that some set of payloads is an ordered subset of all of the payloads that were published, with no events in between. If not, an exception (a subclass of AssertionError) will be raised. Other events could have been published before or after the expected payloads.

pub.assert_published(
    [
        {"foo": "foo1", "count": 1, "duration": 1.234},
        {"foo": "foo2", "count": 2, "duration": 2.345},
        {"foo": "foo3", "count": 3, "duration": 3.456},
    ]
)

You can also assert that the all of the expected payloads were published in any order, and possibly with events in between:

pub.assert_published(
    [
        {"foo": "foo1", "count": 1, "duration": 1.234},
        {"foo": "foo3", "count": 3, "duration": 3.456},
        {"foo": "foo2", "count": 2, "duration": 2.345},
    ],
    any_order=True,
)

assert_published_all

Use safir.metrics.PublishedList.assert_published_all to assert that the expected payloads, and only the expected payloads, were published:

pub.assert_published_all(
    [
        {"foo": "foo1", "count": 1, "duration": 1.234},
        {"foo": "foo2", "count": 2, "duration": 2.345},
        {"foo": "foo3", "count": 3, "duration": 3.456},
        {"foo": "foo4", "count": 4, "duration": None},
        {"foo": "foo5", "count": 5, "duration": 5.678},
    ],
)

This would raise an exception because it is missing the foo5 event:

pub.assert_published_all(
    [
        {"foo": "foo1", "count": 1, "duration": 1.234},
        {"foo": "foo2", "count": 2, "duration": 2.345},
        {"foo": "foo3", "count": 3, "duration": 3.456},
        {"foo": "foo4", "count": 4, "duration": None},
    ],
)

You can use any_order here too:

pub.assert_published_all(
    [
        {"foo": "foo2", "count": 2, "duration": 2.345},
        {"foo": "foo5", "count": 5, "duration": 5.678},
        {"foo": "foo3", "count": 3, "duration": 3.456},
        {"foo": "foo1", "count": 1, "duration": 1.234},
        {"foo": "foo4", "count": 4, "duration": None},
    ],
    any_order=True,
)

ANY and NOT_NONE

You can use safir.metrics.ANY to indicate that any value, event None is OK. This is just a re-export of unittest.mock.ANY.

from safir.metrics import ANY


pub.assert_published_all(
    [
        {"foo": "foo3", "count": 3, "duration": ANY},
        {"foo": "foo4", "count": 4, "duration": ANY},
    ],
)

You can use safir.metrics.NOT_NONE to indicate that any value except None is OK:

from safir.metrics import ANY


pub.assert_published_all(
    [
        {"foo": "foo3", "count": 3, "duration": NOT_NONE},
        {"foo": "foo4", "count": 4, "duration": ANY},
    ],
)

This would raise an exception, because duration for the foo4 payload is None:

from safir.metrics import ANY


pub.assert_published_all(
    [
        {"foo": "foo3", "count": 3, "duration": NOT_NONE},
        {"foo": "foo4", "count": 4, "duration": NOT_NONE},
    ],
)

Configuration details

Initializing an EventManager requires some information about your app (currently just the name, and both Kafka and a schema registry clients. Safir provides a configuration type and some Pydantic BaseSettings models to help get the necessary config for these things into your app via environment variables.

You’ll need to provide some metrics-specific info, Kafka connection settings, and schema registry connection settings:

export METRICS_APPLICATION=myapp
export KAFKA_SECURITY_PROTOCOL=SSL
export KAFKA_BOOSTRAP_SERVERS=sasquatch.kafka-1:9092,sasquatcy.kafka2-9092
export KAFKA_CLUSTER_CA_PATH=/some/path/ca.crt
export KAFKA_CLIENT_CERT_PATH=/some/path/user.crt
export KAFKA_CLIENT_KEY_PATH=/some/path/user.key
export SCHEMA_MANAGER_REGISTRY_URL=https://sasquatch-schema-registry.sasquatch:8081

To disable metrics at runtime, set METRICS_ENABLED to false. This will still verify that the event objects are valid, but will then discard them rather than trying to record them.

Your app doesn’t use Kafka

If your app won’t use Kafka for anything except publishing metrics, add a metrics member to your applications BaseSettings class with the type MetricsConfiguration. This will become an appropriate instance of BaseMetricsConfiguration at runtime, based on the configuration from any of the normal sources that BaseSettings supports.

config.py
from pydantic_settings import BaseSettings
from safir.metrics import (
    MetricsConfiguration,
    metrics_configuration_factory,
)


class Config(BaseSettings):
    metrics: MetricsConfiguration = Field(
        default_factory=metrics_configuration_factory,
        title="Metrics configuration",
    )


config = Config()
manager = config.metrics.make_manager()

Unfortunately, due to limitations in Pydantic, you need to specify metrics_configuration_factory as a default factory. This will choose an appropriate metrics configuration based on which environment variables are set. This default_factory setting is not required if the configuration is provided via a YAML file or similar input with a metrics key, rather than purely via the environment.

Your app uses Kafka

If your app uses Kafka for things other than metrics publishing (maybe it’s a FastStream app), you can use the Safir Kafka connection helpers to create clients and pass them to the EventManager constructor.

Note

The manage_kafaka parameter is False here. This means that calling aclose on your EventManager will NOT stop the Kafka clients. You are expected to do this yourself somewhere else in your app.

from safir.kafka import KafkaConnectionSettings, SchemaManagerSettings
from safir.metrics import EventManager, MetricsConfiguration

kafka_config = KafkaConnectionSettings()
schema_manager_config = SchemaManagerSettings()
events_config = EventsConfiguration()

# You can use this in all parts of your app
broker = KafkaBroker(**kafka_config.to_faststream_params())

admin_client = AIOKafkaAdminClient(
    **kafka_config.to_aiokafka_params(),
)
schema_manager = schema_manager_config.make_manager()

return EventManager(
    application="myapp",
    topic_prefix=events_config.topic_prefix,
    kafka_broker=broker,
    kafka_admin_client=admin_client,
    schema_manager=schema_manager,
    manage_kafka=False,
)