Application metrics

Safir provides helpers for publishing application metrics to Sasquatch. Using these helpers, your can instrument your app to push custom events to a Sasquatch-managed InfluxDB instance, and then use the Sasquatch-managed Chronograf to query and graph them.

Metrics vs. telemetry

Note that this system is not meant to handle telemetry. Telemetry includes things like:

  • The amount of CPU and Memory an app is using

  • Kubernetes events and usage

Metrics, which this system deals with, are things like:

  • A user generated a token. This event might include the username and token details.

  • A user made a database query. This event might include the username, and type of query (sync vs async).

  • A user starts a Nublado lab. This event might include the username, the size of the pod, and the image used.

For more details and examples of the difference between metrics and telemetery, see https://sqr-089.lsst.io/ (still in progress).

Full example

Here’s an example of how you might use these tools in typical FastAPI app. Individual components in this example are detailed in subsequent sections.

To publish events, you’ll instantiate an EventManager. When your app starts up, you’ll use it to create an event publisher for every kind of event that your app will publish. Then, throughout your app, you’ll call the publish method on these publishers to publish individual events.

Sasquatch config

Add your app to the Sasquatch app metrics configuration!

Config

We need to put some config in our environment for the metrics functionality, a Kafka connection, and a schema registry connection. The Kafka and schema manager values come from the Sasquatch configuration that you did and they usually are set in a Kubernetes Deployment template in your app’s Phalanx config.

METRICS_APP_NAME=myapp
KAFKA_SECURITY_PROTOCOL=SSL
KAFKA_BOOSTRAP_SERVERS=sasquatch.kafka-1:9092,sasquatcy.kafka2-9092
KAFKA_CLUSTER_CA_PATH=/some/path/ca.crt
KAFKA_CLIENT_CERT_PATH=/some/path/user.crt
KAFKA_CLIENT_KEY_PATH=/some/path/user.key
SCHEMA_MANAGER_REGISTRY_URL=https://sasquatch-schema-registry.sasquatch:8081

Then we can use the metrics config helpers and the Safir Kafka connection helpers to write a Pydantic BaseSettings config model and singleton for our app. Instantiating the model will pull values from the above environment variables. See Configuration details for more info.

config.py
from pydantic import Field, HttpUrl
from pydantic_settings import BaseSettings, SettingsConfigDict
from safir.metrics import KafkaMetricsConfiguration


class Configuration(BaseSettings):
    an_important_url: HttpUrl = Field(
        ...,
        title="URL to something important",
    )

    metrics: KafkaMetricsConfiguration(
        default_factory=KafkaMetricsConfiguration
    )

    model_config = SettingsConfigDict(
        env_prefix="MYAPP_", case_sensitive=False
    )


config = Configuration()

Define events

Next, we need to:

  • Define our event payloads

  • Define and an events container that takes an EventManager and creates a publisher for each event our app will ever publish

  • Instantiate an EventDependency, which we’ll initialize in our app start up laster.

We can do this all in an events.py file.

Note

Fields in metrics events can’t be other models or other nested types like dicts, because the current event datastore (InfluxDB) does not support this. Basing our event payloads on safir.metrics.EventPayload will enable the EventManager to ensure at runtime when our events are registered that they don’t contain incompatible fields.

metrics.py
from enum import Enum
from datetime import timedelta

from pydantic import Field
from safir.metrics import (
    EventManager,
    EventPayload,
)
from safir.dependencies.metrics import EventDependency, EventMaker


class QueryType(Enum):
    async_ = "async"
    sync = "sync"


class QueryEvent(EventPayload):
    """Information about a user-submitted query."""

    type: QueryType = Field(
        title="Query type", description="The kind of query"
    )

    duration: timedelta = Field(
        title="Query duration", description="How long the query took to run"
    )


class Events(EventMaker):
    def initialize(manager: EventManager) -> None:
        self.query = await manager.create_publisher("query", QueryEvent)


# We'll call .initalize on this in our app start up
events_dependency = EventDependency(Events())

Initialize

Then, in a FastAPI lifespan function, we’ll create an safir.metrics.EventManager and initialize our events_dependency with it. We need to do this in a lifespan function, because we need to do it only once for our whole application, not once for each request. In more complex apps, this would probably use the ProcessContext pattern.

main.py
from contextlib import asynccontextmanager

from fastapi import FastAPI
from safir.metrics import EventManager

from .config import config
from .events import events_dependency


@asynccontextmanager
async def lifespan(app: FastAPI):
    event_manager = config.metrics.make_manager()
    await event_manager.initialize()
    await events_dependency.initialize(event_manager)

    yield

    await event_manager.aclose()


app = FastAPI(lifespan=lifespan)

Handlers

In your handler functions, you can inject your events container as a FastAPI dependency. You can then publish events using the attributes on the dependency. It is statically checked that calls to the publishers’ publish methods receive instances of the payload types that they were registered with.

In real apps:

  • The injection would probably happen via a RequestContext

  • The request handling and event publishing would probably happen in a Service

But the principle remains the same:

main.py (continued)
from datetime import timedelta

from fastapi import Depends
from pydantic import BaseModel

from .metrics import Events, events_dependency, QueryEvent
from .models import QueryRequest  # Not shown


@app.get("/query")
async def query(
    query: QueryRequest,
    events: Annotated[Events, Depends(events_dependency)],
):
    duration: timedelta = do_the_query(query.type, query.query)
    await events.query.publish(
        QueryEvent(type=query.type, duration=duration)
    )

Configuration details

Initializing an EventManager requires some information about your app (currently just the name, and both Kafka and a schema registry clients. Safir provides some Pydantic BaseSettings models to help get the necessary config for these things into your app via environment variables.

You’ll need to provide some metrics-specific info, Kafka connection settings, and schema registry connection settings:

export METRICS_APP_NAME=myapp
export METRICS_DISABLE=false
export KAFKA_SECURITY_PROTOCOL=SSL
export KAFKA_BOOSTRAP_SERVERS=sasquatch.kafka-1:9092,sasquatcy.kafka2-9092
export KAFKA_CLUSTER_CA_PATH=/some/path/ca.crt
export KAFKA_CLIENT_CERT_PATH=/some/path/user.crt
export KAFKA_CLIENT_KEY_PATH=/some/path/user.key
export SCHEMA_MANAGER_REGISTRY_URL=https://sasquatch-schema-registry.sasquatch:8081

Your app doesn’t use Kafka

If your app won’t use Kafka for anything except publishing metrics, there is another config helper, KafkaMetricsConfiguration that will construct an EventManager and all of its Kafka dependencies:

from safir.metrics import EventManager, MetricsConfiguration

config = KafkaMetricsConfiguration()
manager = config.make_manager()

Your app uses Kafka

If your app uses Kafka for things other than metrics publishing (maybe it’s a FastStream app), you can use the Safir Kafka connection helpers to create clients and pass them to the EventManager constructor.

Note

The manage_kafaka parameter is False here. This means that calling aclose on your EventManager will NOT stop the Kafka clients. You are expected to do this yourself somewhere else in your app.

from safir.kafka import KafkaConnectionSettings, SchemaManagerSettings
from safir.metrics import EventManager, MetricsConfiguration

kafka_config = KafkaConnectionSettings()
schema_manager_config = SchemaManagerSettings()
metrics_config = MetricsConfiguration()

# You can use this in all parts of your app
broker = KafkaBroker(**kafka_config.to_faststream_params())

admin_client = AIOKafkaAdminClient(
    **kafka_config.to_aiokafka_params(),
)
schema_manager = schema_manager_config.make_manager()

return EventManager(
    app_name=metrics_config.app_name,
    base_topic_prefix=metrics_config.topic_prefix,
    kafka_broker=broker,
    kafka_admin_client=admin_client,
    schema_manager=schema_manager,
    manage_kafka=False,
    disable=self.metrics_events.disable,
)