Spring Boot + Kafka + Zookeeper

This project uses Java, Spring Boot, Kafka, Zookeeper to show you how to integrate these services in the composition.

Just head over to the example repository in GitHub and follow the instructions there.

Zookeeper Docker image

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don’t already have one


    image: wurstmeister/zookeeper
      - "2181:2181"

Kafka Docker image

Now start the Kafka server. In the docker-compose.yml it can be something like this


      context: kafka
      dockerfile: Dockerfile
      - zookeeper:zk
      - "9092:9092"
      KAFKA_CREATE_TOPICS: "Topic1:1:1"
      - /var/run/docker.sock:/var/run/docker.sock
      - zookeeper

To start the Kafka server with the certain per-configuration, you need to use Environment variables. Below, you can see which Environment variables are available for this service.

Broker IDs

You can configure the broker id in different ways:

  1. Explicitly, using KAFKA_BROKER_ID
  2. Via a command, using BROKER_ID_COMMAND, e.g. BROKER_ID_COMMAND: "hostname | awk -F'-' '{print $2}'"

If you don’t specify a broker id in your docker-compose file, it will automatically be generated (see https://issues.apache.org/jira/browse/KAFKA-1070). This allows scaling up and down. In this case it is recommended to use the --no-recreate option of docker-compose to ensure that containers are not re-created and thus keep their names and ids.

Automatically create topics

If you want to have kafka-docker automatically create topics in Kafka during creation, a KAFKA_CREATE_TOPICS environment variable can be added in docker-compose.yml.

Here is an example snippet from docker-compose.yml:

      KAFKA_CREATE_TOPICS: "Topic1:1:3,Topic2:1:1:compact"

Topic 1 will have 1 partition and 3 replicas, Topic 2 will have 1 partition, 1 replica and a cleanup.policy set to compact.

Advertised hostname

You can configure the advertised hostname in different ways:

  1. Explicitly, using KAFKA_ADVERTISED_HOST_NAME
  2. Via a command, using HOSTNAME_COMMAND, e.g. HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"

When using commands, make sure you review the “Variable Substitution” section in https://docs.docker.com/compose/compose-file/

If KAFKA_ADVERTISED_HOST_NAME is specified, it takes precedence over HOSTNAME_COMMAND

For AWS deployment, you can use the Metadata service to get the container host’s IP:

HOSTNAME_COMMAND=wget -t3 -T2 -qO-

Reference: http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html


For monitoring purposes, you may wish to configure JMX. Additional to the standard JMX parameters, problems could arise from the underlying RMI protocol used to connect

  • java.rmi.server.hostname - interface to bind listening port.
  • com.sun.management.jmxremote.rmi.port - the port to service RMI requests.

For example, to connect to a kafka running locally (assumes exposing port 1099)

  KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname= -Dcom.sun.management.jmxremote.rmi.port=1099"
  JMX_PORT: 1099

Spring Boot + Kafka

Then grab the spring-kafka JAR and all of its dependencies - the easiest way to do that is to declare a dependency in your build tool, e.g. for Maven:



Using plain Java to send and receive a message:


private static String BOOT_TOPIC = "boot.t";

private Sender sender;

private Receiver receiver;

public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, BOOT_TOPIC);

public static void setUpBeforeClass() throws Exception {
  System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());

public void testReceive() throws Exception {
  sender.send(BOOT_TOPIC, "Hello Boot!");

  receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);

Maven will download the needed dependencies, compile the code and run the unit test case. The result should be a successful build during which following logs are generated:


.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 :: Spring Boot ::        (v1.5.2.RELEASE)

08:36:56.175 [main] INFO  c.c.kafka.SpringKafkaApplicationTest - Starting SpringKafkaApplicationTest on cnf-pc with PID 700 (started by CodeNotFound in c:\code\st\spring-kafka\spring-kafka-avro)
08:36:56.175 [main] INFO  c.c.kafka.SpringKafkaApplicationTest - No active profile set, falling back to default profiles: default
08:36:56.889 [main] INFO  c.c.kafka.SpringKafkaApplicationTest - Started SpringKafkaApplicationTest in 1.068 seconds (JVM running for 5.293)
08:36:58.223 [main] INFO  c.codenotfound.kafka.producer.Sender - sending user='{"name": "John Doe", "favorite_number": null, "favorite_color": "green"}'
08:36:58.271 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-L-1] INFO  c.c.kafka.consumer.Receiver - received user='{"name": "John Doe", "favorite_number": null, "favorite_color": "green"}'
08:37:00.240 [main] ERROR o.a.zookeeper.server.ZooKeeperServer - ZKShutdownHandler is not registered, so ZooKeeper server won't take any action on ERROR or SHUTDOWN server state changes
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.871 sec - in com.codenotfound.kafka.SpringKafkaApplicationTest


Tests run: 3, Failures: 0, Errors: 0, Skipped: 0

[INFO] ------------------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 41.632 s
[INFO] Finished at: 2017-04-17T08:37:31+02:00
[INFO] Final Memory: 18M/212M
[INFO] ------------------------------------------------------------------------

CD pipeline examples
Codefresh YAML for pipeline definitions
Creating pipelines
How Codefresh pipelines work