How to connect to an SSL-secured Kafka cluster (Strimzi example)¶
Kafkit’s kafkit.ssl
module can help you connect to Kafka brokers that require your aiokafka-based clients to connect with the SSL protocol.
SSL is commonly used to mutually authenticate the client and Kafka brokers: the broker authenticates the client, and the client authenticates the broker.
SSL authentication is also commonly used in conjunction with Kafka’s ACL-based authorization system, which ensures that certain clients can only perform a specific set of operations.
This page describes how to use kafkit.ssl
to help connect your aiokafka client for the specific case of a Strimzi-based Kafka cluster.
Strimzi makes it convenient to deploy secured Kafka clusters in Kubernetes.
The basic ideas in this tutorial, however, apply to any SSL-secured Kafka cluster.
Gathering the SSL client and broker certificates¶
In a Strimzi-based deployment, the Kafka broker’s SSL certificate and the client’s SSL certificates are in separate Kubernetes Secret
resources.
The Kafka broker CA certificate is in a secret named
{clustername}-cluster-ca-cert
, where{clustername}}
matches the name of the StrimziKafka
resource.The client certificates are in a secret named
kafkauser-{clientname}
, where{clientname}
matches the name of the client’s StrimziKafkaUser
resource.
In your client’s Kubernetes Deployment
resource, you can mount these secrets are files in your pod’s filesystem (extraneous Deployment
fields omitted):
apiVersion: apps/v1
kind: Deployment
spec:
template:
spec:
containers:
- name: myapp
volumeMounts:
- name: "client-ssl"
mountPath: "/var/strimzi-client"
readOnly: True
- name: "broker-ssl"
mountPath: "/var/strimzi-broker"
readOnly: True
volumes:
# Mount the TLS secret created by KafkaUser
- name: "client-tls"
secret:
# matches name of KafkaUser
secretName: kafkauser-myapp
- name: "broker-tls"
secret:
# matches name of Strimzi cluster cluster CA cert secret
secretName: "events-cluster-ca-cert"
In your Python application code, you can create paths to the individual certificates and key files:
from pathlib import Path
broker_ca_path = Path("/var/strimzi-broker/ca.crt")
client_ca_path = Path("/var/strimzi-client/ca.crt")
client_cert_path = Path("/var/strimzi-client/user.crt")
client_key_path = Path("/var/strimzi-client/user.key")
Concatenate the client’s certificate and CA¶
Before using the certificates, you need to concatenate the client’s certificate with the CA for Kafka clients in your cluster.
You can do this with the kafkit.ssl.concatenate_certificates
function:
from kafkit.ssl import concatenate_certificates
client_concatenated_path = Path("./client.crt")
concatenate_certificates(
output_path=client_concatenated_path,
cert_path=client_cert_path,
ca_path=client_ca_path,
)
Now at the client_concatenated_path
path on your pod’s filesystem there is the client certificate with the client CA appended to it.
Create the SSLContext¶
Both aiokafka.AIOKafkaConsumer
and aiokafka.AIOKafkaProducer
use an SSL context (ssl.SSLContext
) to support SSL communication with the Kafka brokers.
The kafkit.ssl.create_ssl_context
function lets you create an SSLContext
instance with your certificates and keys:
from kafkit.ssl import create_ssl_context
ssl_context = create_ssl_context(
cluster_ca_path=broker_ca_path,
client_cert_path=client_concatenated_path,
client_key_path=client_key_path,
)
Using the SSLContext¶
Finally you can use the ssl_context
as the ssl_context
argument to aiokafka.AIOKafkaProducer
or aiokafka.AIOKafkaConsumer
:
import asyncio
from aiokafka import AIOKafkaProducer
producer = AIOKafkaProducer(
loop=asyncio.get_running_loop(),
bootstrap_servers="kafka:9093",
ssl_context=ssl_context,
security_protocol="SSL",
)
await producer.start()
...
await producer.stop()