Spring Boot Kafka JsonSerializer and JsonDeserializer Example


In this tutorial, we will learn how to use the Spring Kafka library provided JsonSerializer and JsonDeserializer classes for storing and retrieving JSON from Apache Kafka topics and returning Java model objects.

If you are new to Apache Kafka then you should check out my article - Apache Kafka Core Concepts.

Well, basically you will learn How to send and receive a Java Object as a JSON byte[] to and from Apache Kafka.

Apache Kafka stores and transports byte[]. There are a number of built-in serializers and deserializers but it doesn’t include any for JSON. Spring Kafka created a JsonSerializer and JsonDeserializer which we can use to convert Java Objects to and from JSON.

We’ll send a Java Object as JSON byte[] to a Kafka Topic using a JsonSerializer. Afterward, we’ll configure how to receive a JSON byte[] and automatically convert it to a Java Object using a JsonDeserializer.

1. Install and Setup Apache Kafka

1. Download Kafka from the official website at https://kafka.apache.org/downloads

2. Extract Kafka zip in the local file system

Run the following commands in order to start all services in the correct order:

3. Start Zookeeper service. 

Use the below command to start the Zookeeper service:
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

4. Start Kafka Broker

Open another terminal session and run the below command to start the Kafka broker:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.

2. Create and Setup Spring Boot Project in IntelliJ

Create a Spring boot project using https://start.spring.io/

Add dependencies:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
</dependency>
   
 <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
</dependency>
Import in IntelliJ and run the spring boot application

3. Configure Kafka Producer and Consumer in an application.properties File

In the application.properties file, add Kafka broker address as well as Consumer and Producer related configuration.

Open the application.properties file and let's configure Kafka Producer and Consumer to exchange JSON messages:

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
We are using the following Consumer property to convert JSON into Java object:
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

We are using the following Producer property to convert Java object into JSON:
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
Let's understand the meaning of the above properties.

spring.kafka.consumer.bootstrap-servers - Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers.

spring.kafka.consumer.group-id - A unique string that identifies the consumer group to which this consumer belongs.

spring.kafka.consumer.auto-offset-reset - What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.

spring.kafka.consumer.key-deserializer - Deserializer class for keys.

spring.kafka.consumer.value-deserializer - Deserializer class for values.

spring.kafka.producer.key-serializer - Serializer class for keys.

spring.kafka.producer.value-serializer - Serializer class for values

4. Create Kafka Topic

To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored. We will use the topic name "javaguides" in this example.

Let's create a KafkaTopicConfig file and add the following content:
package net.javaguides.springbootkafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic javaguidesTopic(){
        return TopicBuilder.name("javaguides")
                .build();
    }
}

5. Create Simple POJO to Serialize / Deserialize

Let's create a User class to send and receive a User object to and from a Kafka topic.

Well, the User instance will be serialized by JsonSerializer to a byte array. Kafka finally stores this byte array into the given partition of the particular topic.

During deserialization, JsonDeserializer is used to receive JSON from Kafka as a byte array, convert it JSON byte array to the User object, and return it to the application.

package net.javaguides.springbootkafka.payload;

public class User {
    private int id;
    private String firstName;
    private String lastName;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                '}';
    }
}

6. Create Kafka Producer to Produce JSON Message

Let's create Kafka Producer to Produce JSON Messages using Spring Kafka.

KafkaTemplate

Well, Spring boot provides an auto-configuration for Spring’s KafkaTemplate so you can autowire it directly in your own beans.

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.payload.User;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void sendMessage(User data){
        LOGGER.info(String.format("Message sent -> %s", data.toString()));

        Message<User> message = MessageBuilder
                .withPayload(data)
                .setHeader(KafkaHeaders.TOPIC, AppConstants.TOPIC_NAME)
                .build();

        kafkaTemplate.send(message);
    }
}

Let’s start by sending a User object to a Kafka Topic.

Notice: we created a KafkaTemplate<String, User> since we are sending Java Objects to the Kafka topic that’ll automatically be transformed into a JSON byte[].

In this example, we created a Message using the MessageBuilder. It’s important to add the topic to which we are going to send the message too.

7. Create REST API to Send JSON Object

Let's create a simple POST REST API to send User information as a JSON object.

Instead of sending a message string, we will create a POST REST API to post a complete User object as a JSON so that the Kafka producer can able to write the User object to the Kafka topic.

package net.javaguides.springbootkafka.controller;

import net.javaguides.springbootkafka.kafka.KafkaProducer;
import net.javaguides.springbootkafka.payload.User;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {

    private KafkaProducer kafkaProducer;

    public KafkaProducerController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> publish(@RequestBody User user){
        kafkaProducer.sendMessage(user);
        return ResponseEntity.ok("Message sent to kafka topic");
    }
}

8. Create Kafka Consumer to Consume JSON Message

Let's create a Kafka Consumer to receive JSON messages from the topic. In KafkaConsumer we simply need to add the User Java Object as a parameter in our method.

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.payload.User;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafKaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);

    @KafkaListener(topics = AppConstants.TOPIC_NAME,
                    groupId = AppConstants.GROUP_ID)
    public void consume(User data){
        LOGGER.info(String.format("Message received -> %s", data.toString()));
    }
}

9. Demo

Let's run the Spring boot application and have the demo. Make sure that Zookeeper and Kafka services should be up and running.

Let's use the Postman client to make a POST REST API call:


Observe the console logs:

Conclusion

In this tutorial, we have learned how to use the Spring Kafka library provided JsonSerializer and JsonDeserializer classes for storing and retrieving JSON from Apache Kafka topics and returning Java model objects.

Video Course on YouTube - Spring Boot Apache Kafka Tutorial

GitHub Repository


Comments