Kafka cluster installation. Only practical experience.
Disclaimer: The story below is related to OpenShift Dedicated v.3.
I have a project which requires event sourcing architecture. The project is running in OpenShift. My idea is to try Kafka and RabbitMQ in the broker role.
Kafka installation is supported in OpenShift by Strimzi implementation called AMQ Streams. This means that, in case of OpenShift Dedicated clusters, RedHat has to enable Streams support if you are on v.3.
After the Streams are enabled you can create a Kafka cluster with a simple command, like it is described in How to run Kafka on Openshift, the enterprise Kubernetes, with AMQ Streams. Just execute:
Probably, you would like to write a message to your cluster, however it will be quite tricky cause with recommended above object you won't have port 9092 opened for you.
So, the next step will be opening of the port 9092. To do this you have to call the command:
After saving this change, a rolling update will be started within two minutes and after that you should be able to run your command.
Now you should be able to write and consume your messages. Let's try with build-in Kafka consumer and producer scripts:
I start containers with default memory request 512M and limit 1Gi. With practical experimentation we found the minimal memory value for Kafka container is 2Gi. So the next step is to update memory request for Kafka container and probably project limits.
Let's call Kafka cluster object with already known command:
My kafka cluster description look like this:
Strimzi Docs
I have a project which requires event sourcing architecture. The project is running in OpenShift. My idea is to try Kafka and RabbitMQ in the broker role.
Kafka installation is supported in OpenShift by Strimzi implementation called AMQ Streams. This means that, in case of OpenShift Dedicated clusters, RedHat has to enable Streams support if you are on v.3.
After the Streams are enabled you can create a Kafka cluster with a simple command, like it is described in How to run Kafka on Openshift, the enterprise Kubernetes, with AMQ Streams. Just execute:
$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 3
listeners:
external:
type: route
storage:
type: ephemeral
zookeeper:
replicas: 3
storage:
type: ephemeral
entityOperator:
topicOperator: {}
EOF
And you will get you cluster....however this is just a. part of work.Probably, you would like to write a message to your cluster, however it will be quite tricky cause with recommended above object you won't have port 9092 opened for you.
So, the next step will be opening of the port 9092. To do this you have to call the command:
# oc edit kafka $CLUSTER_NAME
# in our case cluster name is my-cluster
oc kafka edit my-cluster
You will see kafka object Yaml. This you have to add support for plain listeners (port 9092):
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
#...
listeners:
plain: {}
After saving this change, a rolling update will be started within two minutes and after that you should be able to run your command.
Now you should be able to write and consume your messages. Let's try with build-in Kafka consumer and producer scripts:
oc exec -it bus-kafka-0 -c kafka -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic twitter
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
[2020-02-21 10:35:05,253] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:439)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:105)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages
The same error gives producer - OutOfMemoryError: Java heap space. What is the root of the problem? Java needs more memory.I start containers with default memory request 512M and limit 1Gi. With practical experimentation we found the minimal memory value for Kafka container is 2Gi. So the next step is to update memory request for Kafka container and probably project limits.
Let's call Kafka cluster object with already known command:
# oc edit kafka $CLUSTER_NAME
# in our case cluster name is my-cluster
oc kafka edit my-cluster
Now we should set memory request and limit for kafka containers. Kafka pod contains two containers kafka and sidecar. We need to update only kafka container.
spec:
entityOperator:
topicOperator: {}
kafka:
listeners:
external:
type: route
plain: {}
replicas: 3
# This part should be added
resources:
limits:
memory: 2Gi
requests:
cpu: "100m"
memory: 2Gi
Please, verify your limit ranges support 2Gi request. In my case, I don't have have services, I would recommend the next project limits:
apiVersion: "v1"
kind: "LimitRange"
metadata:
name: "core-resource-limits"
spec:
limits:
- type: "Pod"
max:
cpu: "2"
memory: "3Gi"
min:
cpu: "50m"
memory: "256M"
- type: "Container"
max:
cpu: "2"
memory: "2Gi"
min:
cpu: "50m"
memory: "256M"
default:
cpu: "1"
memory: "512M"
defaultRequest:
cpu: "50m"
memory: "256M"
maxLimitRequestRatio:
cpu: "20"
After that your cluster should be fully working.My kafka cluster description look like this:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: bus
spec:
entityOperator:
topicOperator: {}
kafka:
listeners:
external:
type: route
plain: {}
replicas: 3
resources:
limits:
memory: 2Gi
requests:
cpu: "100m"
memory: 2Gi
storage:
storage:
size: 2Gi
type: persistent-claim
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 100M
resources:
limits:
memory: 1Gi
requests:
cpu: "100m"
memory: 512M
You can change some settings of your kafka cluster (not storage setup) with the command:
oc edit kafka
If you need you can easily delete a cluster:
oc delete kafka
list Kafka clusters:
oc get kafka
run Kafka console consumer:
oc exec -it bus-kafka-1 -c kafka -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic twitter
run Kafka console producer:
oc exec -it bus-kafka-1 -c kafka -- bin/kafka-console-producer.sh --broker-list 0.0.0.0:9092 --topic twitter
P.S. Great thanks for RedHat guys for support in my acquaintance with AMQ Streams.Strimzi Docs
Comments
Post a Comment