📘 Premium Read: Access my best content on
Medium member-only articles
— deep dives into Java, Spring Boot, Microservices, backend architecture, interview preparation, career advice, and industry-standard best practices.
🎓 Top 15 Udemy Courses (80-90% Discount):
My Udemy Courses - Ramesh Fadatare
— All my Udemy courses are real-time and project oriented courses.
In this tutorial, we will learn how to use the Spring Kafka library provided JsonSerializer and JsonDeserializer classes for storing and retrieving JSON from Apache Kafka topics and returning Java model objects.
Well, basically you will learn How to send and receive a Java Object as a JSON byte[] to and from Apache Kafka.
Apache Kafka stores and transports byte[]. There are a number of built-in serializers and deserializers but it doesn’t include any for JSON. Spring Kafka created a JsonSerializer and JsonDeserializer which we can use to convert Java Objects to and from JSON.
We’ll send a Java Object as JSON byte[] to a Kafka Topic using a JsonSerializer. Afterward, we’ll configure how to receive a JSON byte[] and automatically convert it to a Java Object using a JsonDeserializer.
Run the following commands in order to start all services in the correct order:
3. Start Zookeeper service.
Use the below command to start the Zookeeper service:
# Start the ZooKeeper service# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
4. Start Kafka Broker
Open another terminal session and run the below command to start the Kafka broker:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.
2. Create and Setup Spring Boot Project in IntelliJ
Let's understand the meaning of the above properties.
spring.kafka.consumer.bootstrap-servers - Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers.
spring.kafka.consumer.group-id - A unique string that identifies the consumer group to which this consumer belongs.
spring.kafka.consumer.auto-offset-reset - What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
spring.kafka.consumer.key-deserializer - Deserializer class for keys.
spring.kafka.consumer.value-deserializer - Deserializer class for values.
spring.kafka.producer.key-serializer - Serializer class for keys.
spring.kafka.producer.value-serializer - Serializer class for values
4. Create Kafka Topic
To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored. We will use the topic name "javaguides" in this example.
Let's create a KafkaTopicConfig file and add the following content:
packagenet.javaguides.springbootkafka;
importorg.apache.kafka.clients.admin.NewTopic;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic javaguidesTopic(){
returnTopicBuilder.name("javaguides")
.build();
}
}
5. Create Simple POJO to Serialize / Deserialize
Let's create a User class to send and receive a User object to and from a Kafka topic.
Well, the User instance will be serialized by JsonSerializer to a byte array. Kafka finally stores this byte array into the given partition of the particular topic.
During deserialization, JsonDeserializer is used to receive JSON from Kafka as a byte array, convert it JSON byte array to the User object, and return it to the application.
package net.javaguides.springbootkafka.payload;
public class User {
private int id;
private String firstName;
private String lastName;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
'}';
}
}
6. Create Kafka Producer to Produce JSON Message
Let's create Kafka Producer to Produce JSON Messages using Spring Kafka.
KafkaTemplate
Well, Spring boot provides an auto-configuration for Spring’s KafkaTemplate so you can autowire it directly in your own beans.
Let’s start by sending a User object to a Kafka Topic.
Notice: we created a KafkaTemplate<String, User> since we are sending Java Objects to the Kafka topic that’ll automatically be transformed into a JSON byte[].
In this example, we created a Message using the MessageBuilder. It’s important to add the topic to which we are going to send the message too.
7. Create REST API to Send JSON Object
Let's create a simple POST REST API to send User information as a JSON object.
Instead of sending a message string, we will create a POST REST API to post a complete User object as a JSON so that the Kafka producer can able to write the User object to the Kafka topic.
package net.javaguides.springbootkafka.controller;
import net.javaguides.springbootkafka.kafka.KafkaProducer;
import net.javaguides.springbootkafka.payload.User;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {
private KafkaProducer kafkaProducer;
public KafkaProducerController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@PostMapping("/publish")
public ResponseEntity<String> publish(@RequestBody User user){
kafkaProducer.sendMessage(user);
return ResponseEntity.ok("Message sent to kafka topic");
}
}
8. Create Kafka Consumer to Consume JSON Message
Let's create a Kafka Consumer to receive JSON messages from the topic. In KafkaConsumer we simply need to add the User Java Object as a parameter in our method.
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.payload.User;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafKaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);
@KafkaListener(topics = AppConstants.TOPIC_NAME,
groupId = AppConstants.GROUP_ID)
public void consume(User data){
LOGGER.info(String.format("Message received -> %s", data.toString()));
}
}
9. Demo
Let's run the Spring boot application and have the demo. Make sure that Zookeeper and Kafka services should be up and running.
Let's use the Postman client to make a POST REST API call:
Observe the console logs:
Conclusion
In this tutorial, we have learned how to use the Spring Kafka library provided JsonSerializer and JsonDeserializer classes for storing and retrieving JSON from Apache Kafka topics and returning Java model objects.
Video Course on YouTube - Spring Boot Apache Kafka Tutorial
Comments
Post a Comment
Leave Comment