Spring Boot Kafka Producer Consumer Example Tutorial

In this tutorial, we will learn how to create Kafka Producer and Consumer in Spring Boot Kafka project.

If you are new to Apache Kafka then you should check out my article - Apache Kafka Core Concepts.

The Spring Team provides Spring for Apache Kafka dependency to work with the development of Kafka-based messaging solutions. 

In this tutorial, we use Kafka as a messaging system to send messages between Producers and Consumers.

Video

1. Install and Setup Apache Kafka

1. Download Kafka from the official website at https://kafka.apache.org/downloads

2. Extract Kafka zip in the local file system

Run the following commands in order to start all services in the correct order:

3. Start Zookeeper service. 

Use the below command to start the Zookeeper service:
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

4. Start Kafka Broker

Open another terminal session and run the below command to start the Kafka broker:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.

2. Create and Setup Spring Boot Project in IntelliJ

Create a Spring boot project using https://start.spring.io/

Add dependencies:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
</dependency>
   
 <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
</dependency>
Import in IntelliJ and run the spring boot application

3. Configure Kafka Producer and Consumer in an application.properties File

In the application.properties file, add Kafka broker address as well as Consumer and Producer related configuration.

Open the application.properties file and the following content to it:

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer

Let's understand the above spring boot provided Kafka properties:

spring.kafka.consumer.group-id - specifies a unique string that identifies the consumer group this consumer belongs to.

spring.kafka.consumer.auto-offset-reset property - specifies what to do when there is no initial offset in Kafka or if the current offset does not exist anymore on the server (e.g. because that data has been deleted):
  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw an exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw an exception to the consumer.

spring.kafka.consumer.key-deserializer - specifies the deserializer class for keys.

spring.kafka.consumer.value-deserializer - specifies the deserializer class for values.

spring.kafka.producer.key-deserializer - specifies the serializer class for keys.

spring.kafka.producer.value-deserializer - specifies the serializer class for values.

4. Create Kafka Topic

To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored. We will use the topic name "javaguides" in this example.

Let's create a KafkaTopicConfig file and add the following content:
package net.javaguides.springbootkafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic javaguidesTopic(){
        return TopicBuilder.name("javaguides")
                .build();
    }
}

5. Create Kafka Producer

Creating a producer will write our messages on the topic.

KafkaTemplate

Well, Spring boot provides an auto-configuration for Spring’s KafkaTemplate so you can autowire it directly in your own beans.

For example:

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.utils.AppConstants;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message){
        LOGGER.info(String.format("Message sent -> %s", message));
        kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
    }
}

Create a utils package and within this package create AppConstants with the following content:

package net.javaguides.springbootkafka.utils;

public class AppConstants {
    public static final String TOPIC_NAME = "javaguides";
    public static final String GROUP_ID = "group_id";
}

KafKaProducer class uses KafkaTemplate to send messages to the configured topic name.

6. Create REST API to Send Message

Create controller package, within controller package create KafkaProducerController with the following content to it:

package net.javaguides.springbootkafka;

import net.javaguides.springbootkafka.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {

    private KafkaProducer kafkaProducer;

    public KafkaProducerController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @GetMapping("/publish")
    public ResponseEntity<String> publish(@RequestParam("message") String message){
        kafkaProducer.sendMessage(message);
        return ResponseEntity.ok("Message sent to kafka topic");
    }
}

See Topic Messages via Command Line:

bin/kafka-console-consumer.sh --topic javaguides --from-beginning --bootstrap-server localhost:9092

Make sure to change the topic name. In our case "javaguides" is the topic name.

7. Create Kafka Consumer

Kafka Consumer is  the service that will be responsible for reading messages and processing them according to the needs of your own business logic. 

To set it up, enter the following:

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafKaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);

    @KafkaListener(topics = AppConstants.TOPIC_NAME,
                    groupId = AppConstants.GROUP_ID)
    public void consume(String message){
        LOGGER.info(String.format("Message received -> %s", message));
    }
}

Here, we told our method void to consume (String message) to subscribe to the user’s topic and just emit every message to the application log. In your real application, you can handle messages the way your business requires you to.

KafkaListener endpoint:

    @KafkaListener(topics = AppConstants.TOPIC_NAME,
                    groupId = AppConstants.GROUP_ID)
    public void consume(String message){
        LOGGER.info(String.format("Message received -> %s", message));
    }

8. Demo

Let's run the Spring boot application and have the demo. Make sure that Zookeeper and Kafka services should be up and running.

Open a browser and hit the below link to call a REST API:


From the command line, you can view the topic messages:

You can view the topic messages in a console:

Conclusion

In this tutorial, we have learned how to create Kafka Producer and Consumer in Spring Boot Kafka project with a demo live example.

Comments