How to connect to an SSL-secured Kafka cluster (Strimzi example)¶
Kafkit’s kafkit.ssl module can help you connect to Kafka brokers that require your aiokafka-based clients to connect with the SSL protocol.
SSL is commonly used to mutually authenticate the client and Kafka brokers: the broker authenticates the client, and the client authenticates the broker.
SSL authentication is also commonly used in conjunction with Kafka’s ACL-based authorization system, which ensures that certain clients can only perform a specific set of operations.
This page describes how to use kafkit.ssl to help connect your aiokafka client for the specific case of a Strimzi-based Kafka cluster.
Strimzi makes it convenient to deploy secured Kafka clusters in Kubernetes.
The basic ideas in this tutorial, however, apply to any SSL-secured Kafka cluster.
Gathering the SSL client and broker certificates¶
In a Strimzi-based deployment, the Kafka broker’s SSL certificate and the client’s SSL certificates are in separate Kubernetes Secret resources.
The Kafka broker CA certificate is in a secret named
{clustername}-cluster-ca-cert, where{clustername}}matches the name of the StrimziKafkaresource.The client certificates are in a secret named
kafkauser-{clientname}, where{clientname}matches the name of the client’s StrimziKafkaUserresource.
In your client’s Kubernetes Deployment resource, you can mount these secrets are files in your pod’s filesystem (extraneous Deployment fields omitted):
apiVersion: apps/v1
kind: Deployment
spec:
template:
spec:
containers:
- name: myapp
volumeMounts:
- name: "client-ssl"
mountPath: "/var/strimzi-client"
readOnly: True
- name: "broker-ssl"
mountPath: "/var/strimzi-broker"
readOnly: True
volumes:
# Mount the TLS secret created by KafkaUser
- name: "client-tls"
secret:
# matches name of KafkaUser
secretName: kafkauser-myapp
- name: "broker-tls"
secret:
# matches name of Strimzi cluster cluster CA cert secret
secretName: "events-cluster-ca-cert"
In your Python application code, you can create paths to the individual certificates and key files:
from pathlib import Path
broker_ca_path = Path("/var/strimzi-broker/ca.crt")
client_ca_path = Path("/var/strimzi-client/ca.crt")
client_cert_path = Path("/var/strimzi-client/user.crt")
client_key_path = Path("/var/strimzi-client/user.key")
Concatenate the client’s certificate and CA¶
Before using the certificates, you need to concatenate the client’s certificate with the CA for Kafka clients in your cluster.
You can do this with the kafkit.ssl.concatenate_certificates function:
from kafkit.ssl import concatenate_certificates
client_concatenated_path = Path("./client.crt")
concatenate_certificates(
output_path=client_concatenated_path,
cert_path=client_cert_path,
ca_path=client_ca_path,
)
Now at the client_concatenated_path path on your pod’s filesystem there is the client certificate with the client CA appended to it.
Create the SSLContext¶
Both aiokafka.AIOKafkaConsumer and aiokafka.AIOKafkaProducer use an SSL context (ssl.SSLContext) to support SSL communication with the Kafka brokers.
The kafkit.ssl.create_ssl_context function lets you create an SSLContext instance with your certificates and keys:
from kafkit.ssl import create_ssl_context
ssl_context = create_ssl_context(
cluster_ca_path=broker_ca_path,
client_cert_path=client_concatenated_path,
client_key_path=client_key_path,
)
Using the SSLContext¶
Finally you can use the ssl_context as the ssl_context argument to aiokafka.AIOKafkaProducer or aiokafka.AIOKafkaConsumer:
import asyncio
from aiokafka import AIOKafkaProducer
producer = AIOKafkaProducer(
loop=asyncio.get_running_loop(),
bootstrap_servers="kafka:9093",
ssl_context=ssl_context,
security_protocol="SSL",
)
await producer.start()
...
await producer.stop()