Use KSQL on kuberentes with Strimzi

Ludovico Russo

4 mins min read

Strimzi is a Kubernetes Operator that allow you to run a kafka cluster on Kuberetes in minutes. In this post, we will see how to install and use strimzi inside a K8s cluster and how to run KSQL with a strimzi Kafka cluster.

Setup Kuberentes and Strimzi

I've a running kubernetes cluster running on my machine using minikube using the command

$ minikube start --memory=4096

and I've created a namespaced named "kafka".

To install the strimzi operator we simply need to run the following command:

$ kubectl apply -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

Note: this does not runs any kafka cluster, but it installs all the custom resource definition (CRD) needed by strimzi and istantiates the strimzi operators!

$ kubectl get pods

NAME                                       READY   STATUS    RESTARTS   AGE
strimzi-cluster-operator-9968fd8c9-fzh7g   1/1     Running   0          86s

Start the first kubernetes cluster!

To run a kafka cluster, we need to create a Kafka resource as following:

# kafka.yaml

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: kafka-cluster
spec:
  kafka:
    version: 2.5.0
    replicas: 1
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.5"
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 10Gi
          deleteClaim: false
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

this will define a kafka cluster with 1 zookeeper node and 1 kafka node, we can simply scale up replicas editing the file.

Once created the resource using

kubectl apply -f kafka.yaml

we should see the new pods in your k8s cluster

$ kubectl k get pods

NAME                                             READY   STATUS    RESTARTS   AGE
kafka-cluster-entity-operator-5f7954489b-bbqxv   3/3     Running   0          40s
kafka-cluster-kafka-0                            2/2     Running   0          64s
kafka-cluster-zookeeper-0                        1/1     Running   0          2m12s
strimzi-cluster-operator-9968fd8c9-fzh7g         1/1     Running   0          7m54s

and new services

$ kubectl get service
NAME                             TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
kafka-cluster-kafka-bootstrap    ClusterIP   10.96.67.26     <none>        9091/TCP,9092/TCP,9093/TCP   2m25s
kafka-cluster-kafka-brokers      ClusterIP   None            <none>        9091/TCP,9092/TCP,9093/TCP   2m25s
kafka-cluster-zookeeper-client   ClusterIP   10.96.215.241   <none>        2181/TCP                     3m33s
kafka-cluster-zookeeper-nodes    ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP   3m33s

now, we can access the kafka using the kafka-cluster-kafka-bootstrap ip on port 9092 inside the cluster.

Run the producer

$ kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.19.0-kafka-2.5.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list kafka-cluster-kafka-bootstrap:9092 --topic my-topic

Run the consumer

$ kubectl -n kafka run kafka-consumer -ti --image=strimzi/kafka:0.19.0-kafka-2.5.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

Setup KSQL

To install a ksql node, we need a schema registry node that is required by ksql.

We will use helm (version 3 in my case) to install both schema registry and ksql. We will use the helm charts provided by confluent.

Setup helm chart

I was not able to run it direcly from confluent helm repository, so I've download the github repo and run the charts from the source code.

Install the schema registry

The schema registry is simple to install, we need only to provide the kafka.bootstrapServers configuration and leave all remaining values as default.

To install it using helm, simply enter the directory charts/cp-schema-registry and run the following command

$ helm install kafka-schema-registry --set kafka.bootstrapServers="PLAINTEXT://kafka-cluster-kafka-bootstrap:9092" . -n kafka

This will install and configure the schema registry pod and the schema registry service on k8s!

$ kubectl get pods

NAME                                                        READY   STATUS    RESTARTS   AGE
#...
kafka-schema-registry-cp-schema-registry-65c6f58f48-hbpvp   2/2     Running   0          7m5s

$ kubectl get services

NAME                                       TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
#...
kafka-schema-registry-cp-schema-registry   ClusterIP   10.96.106.63    <none>        8081/TCP                     18m```

Install ksql server

Last step is to install the ksql server. You need to access the directory charts/cp-ksql-server to install the chart. We need to override two parametes in this case: the kafka.bootstrapServers and the cp-schema-registry.url url, that have to point to the service of the schema registry we've just created.

$ helm install ksql-server --set cp-schema-registry.url="http://kafka-schema-registry-cp-schema-registry:8081",kafka.bootstrapServers="PLAINTEXT://kafka-cluster-kafka-bootstrap:9092",ksql.headless=false . -n kafka

We will finaly have a ksql pod and a ksql service to interact with our kafka cluster.

$ kubectl get pod -n kafka
#...
NAME                                                        READY   STATUS    RESTARTS   AGE
ksql-server-cp-ksql-server-798bfc5859-dfdlm                 2/2     Running   0          4m3s

$ kubectl get services -n kafka

NAME                                       TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
# ...
ksql-server-cp-ksql-server                 ClusterIP   10.96.13.224    <none>        8088/TCP                     77s

Attach to the ksql server

To access the ksql server, you need to run a pod with ksql command installed. I use the image confluentinc/cp-ksql-cli provided by confluent to do that.

$ kubectl -n kafka run tmp-ksql-cli --rm -i --tty --image confluentinc/cp-ksql-cli:5.2.1 http://ksql-server-cp-ksql-server:8088

kubectl run --generator=deployment/apps.v1 is DEPRECATED and will be removed in a future version. Use kubectl run --generator=run-pod/v1 or kubectl create instead.
If you don't see a command prompt, try pressing enter.





                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017-2018 Confluent Inc.

CLI v5.2.1, Server v5.5.0 located at http://ksql-server-cp-ksql-server:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> show topics;


 Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------
 my-topic    | false      | 1          | 1                  | 0         | 0
-----------------------------------------------------------------------------------------