How to connect to an SSL-secured Kafka cluster (Strimzi example)

Kafkit’s kafkit.ssl module can help you connect to Kafka brokers that require your aiokafka-based clients to connect with the SSL protocol. SSL is commonly used to mutually authenticate the client and Kafka brokers: the broker authenticates the client, and the client authenticates the broker. SSL authentication is also commonly used in conjunction with Kafka’s ACL-based authorization system, which ensures that certain clients can only perform a specific set of operations.

This page describes how to use kafkit.ssl to help connect your aiokafka client for the specific case of a Strimzi-based Kafka cluster. Strimzi makes it convenient to deploy secured Kafka clusters in Kubernetes. The basic ideas in this tutorial, however, apply to any SSL-secured Kafka cluster.

Gathering the SSL client and broker certificates

In a Strimzi-based deployment, the Kafka broker’s SSL certificate and the client’s SSL certificates are in separate Kubernetes Secret resources.

  • The Kafka broker CA certificate is in a secret named {clustername}-cluster-ca-cert, where {clustername}} matches the name of the Strimzi Kafka resource.

  • The client certificates are in a secret named kafkauser-{clientname}, where {clientname} matches the name of the client’s Strimzi KafkaUser resource.

In your client’s Kubernetes Deployment resource, you can mount these secrets are files in your pod’s filesystem (extraneous Deployment fields omitted):

apiVersion: apps/v1
kind: Deployment
spec:
  template:
    spec:
      containers:
        - name: myapp
          volumeMounts:
            - name: "client-ssl"
              mountPath: "/var/strimzi-client"
              readOnly: True
            - name: "broker-ssl"
              mountPath: "/var/strimzi-broker"
              readOnly: True
      volumes:
        # Mount the TLS secret created by KafkaUser
        - name: "client-tls"
          secret:
            # matches name of KafkaUser
            secretName: kafkauser-myapp
        - name: "broker-tls"
          secret:
            # matches name of Strimzi cluster cluster CA cert secret
            secretName: "events-cluster-ca-cert"

In your Python application code, you can create paths to the individual certificates and key files:

from pathlib import Path

broker_ca_path = Path("/var/strimzi-broker/ca.crt")

client_ca_path = Path("/var/strimzi-client/ca.crt")
client_cert_path = Path("/var/strimzi-client/user.crt")
client_key_path = Path("/var/strimzi-client/user.key")

Concatenate the client’s certificate and CA

Before using the certificates, you need to concatenate the client’s certificate with the CA for Kafka clients in your cluster.

You can do this with the kafkit.ssl.concatenate_certificates function:

from kafkit.ssl import concatenate_certificates

client_concatenated_path = Path("./client.crt")

concatenate_certificates(
    output_path=client_concatenated_path,
    cert_path=client_cert_path,
    ca_path=client_ca_path,
)

Now at the client_concatenated_path path on your pod’s filesystem there is the client certificate with the client CA appended to it.

Create the SSLContext

Both aiokafka.AIOKafkaConsumer and aiokafka.AIOKafkaProducer use an SSL context (ssl.SSLContext) to support SSL communication with the Kafka brokers. The kafkit.ssl.create_ssl_context function lets you create an SSLContext instance with your certificates and keys:

from kafkit.ssl import create_ssl_context

ssl_context = create_ssl_context(
    cluster_ca_path=broker_ca_path,
    client_cert_path=client_concatenated_path,
    client_key_path=client_key_path,
)

Using the SSLContext

Finally you can use the ssl_context as the ssl_context argument to aiokafka.AIOKafkaProducer or aiokafka.AIOKafkaConsumer:

import asyncio
from aiokafka import AIOKafkaProducer

producer = AIOKafkaProducer(
    loop=asyncio.get_running_loop(),
    bootstrap_servers="kafka:9093",
    ssl_context=ssl_context,
    security_protocol="SSL",
)
await producer.start()

...

await producer.stop()