Kafka cluster installation. Only practical experience.

Disclaimer: The story below is related to OpenShift Dedicated v.3.
I have a project which requires event sourcing architecture. The project is running in OpenShift. My idea is to try Kafka and RabbitMQ in the broker role.
Kafka installation is supported in OpenShift by Strimzi implementation called AMQ Streams. This means that, in case of OpenShift Dedicated clusters, RedHat has to enable Streams support if you are on v.3.
After the Streams are enabled you can create a Kafka cluster with a simple command, like it is described in How to run Kafka on Openshift, the enterprise Kubernetes, with AMQ Streams. Just execute:

    
$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata: 
 name: my-cluster
spec:
 kafka:
   replicas: 3
   listeners:
     external:
       type: route
   storage:
     type: ephemeral
 zookeeper:
   replicas: 3
   storage:
     type: ephemeral
 entityOperator:
   topicOperator: {}
EOF
    
And you will get you cluster....however this is just a. part of work.
Probably, you would like to write a message to your cluster, however it will be quite tricky cause with recommended above object you won't have port 9092 opened for you.
So, the next step will be opening of the port 9092. To do this you have to call the command:


# oc edit kafka $CLUSTER_NAME
# in our case cluster name is my-cluster
oc kafka edit my-cluster

You will see kafka object Yaml. This you have to add support for plain listeners (port 9092):

apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    #...
    listeners:
      plain: {}

After saving this change, a rolling update will be started within two minutes and after that you should be able to run your command.
Now you should be able to write and consume your messages. Let's try with build-in Kafka consumer and producer scripts:


oc exec -it bus-kafka-0 -c kafka -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9094  --topic twitter
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2020-02-21 10:35:05,253] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
java.lang.OutOfMemoryError: Java heap space
 at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
 at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
 at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
 at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
 at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
 at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
 at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
 at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191)
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
 at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:439)
 at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:105)
 at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
 at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
 at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages
The same error gives producer - OutOfMemoryError: Java heap space. What is the root of the problem? Java needs more memory.
I start containers with default memory request 512M and limit 1Gi. With practical experimentation  we found the minimal memory value for Kafka container is 2Gi. So the next step is to update memory request for Kafka container and probably project limits.
Let's call Kafka cluster object with already known command:


# oc edit kafka $CLUSTER_NAME
# in our case cluster name is my-cluster
oc kafka edit my-cluster

Now we should set memory request and limit for kafka containers. Kafka pod contains two containers kafka and sidecar. We need to update only kafka container.


  spec:
    entityOperator:
      topicOperator: {}
    kafka:
      listeners:
        external:
          type: route
        plain: {}
      replicas: 3
      # This part should be added
      resources:
        limits:
          memory: 2Gi
        requests:
          cpu: "100m"
          memory: 2Gi
Please, verify your limit ranges support 2Gi request. In my case, I don't have have services, I would recommend the next project limits:

apiVersion: "v1"
kind: "LimitRange"
metadata:
  name: "core-resource-limits" 
spec:
  limits:
    - type: "Pod"
      max:
        cpu: "2" 
        memory: "3Gi" 
      min:
        cpu: "50m" 
        memory: "256M" 
    - type: "Container"
      max:
        cpu: "2" 
        memory: "2Gi" 
      min:
        cpu: "50m" 
        memory: "256M" 
      default:
        cpu: "1" 
        memory: "512M" 
      defaultRequest:
        cpu: "50m" 
        memory: "256M" 
      maxLimitRequestRatio:
        cpu: "20"
After that your cluster should be fully working.
My kafka cluster description look like this:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: bus
spec:
  entityOperator:
    topicOperator: {}
  kafka:
    listeners:
      external:
        type: route
      plain: {}
    replicas: 3
    resources:
      limits:
        memory: 2Gi
      requests:
        cpu: "100m"
        memory: 2Gi
    storage:
    storage:
      size: 2Gi
      type: persistent-claim
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 100M
    resources:
      limits:
        memory: 1Gi
      requests:
        cpu: "100m"
        memory: 512M
You can change some settings of your kafka cluster (not storage setup) with the command:

    oc edit kafka 
If you need you can easily delete a cluster:

    oc delete kafka 
list Kafka clusters:

    oc get kafka
run Kafka console consumer:

    oc exec -it bus-kafka-1 -c kafka -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic twitter
run Kafka console producer:

    oc exec -it bus-kafka-1 -c kafka -- bin/kafka-console-producer.sh --broker-list 0.0.0.0:9092 --topic twitter
P.S. Great thanks for RedHat guys for support in my acquaintance with AMQ Streams.

Strimzi Docs

Comments

Popular posts from this blog

Install Kubeflow locally

RabbitMQ and OpenShift