Spring Boot Kafka Microservices - #14 - Configure and Create Kafka Consumer in EmailService


Welcome to Spring Boot Kafka Event-Driven Microservices Series. In this lecture, we will Configure and Create Kafka Consumer in EmailService Microservice.

Lecture - #14 - Configure and Create Kafka Consumer in EmailService


Source Code used in Lecture for Your Reference

Configure and Create Kafka Consumer in EmailService Microservice

Open the application.properties file of the email-service project and configure Kafka consumer.

server.port=8082
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: email
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.topic.name=order_topics
We are using the following Consumer property to convert JSON into Java object:
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
Let's understand the meaning of the above properties.

spring.kafka.consumer.bootstrap-servers - Comma-delimited list of host:port pairs to use for establishing the initial connections to the Kafka cluster. Overrides the global property, for consumers.

spring.kafka.consumer.group-id - A unique string that identifies the consumer group to which this consumer belongs.

spring.kafka.consumer.auto-offset-reset - What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.

spring.kafka.consumer.key-deserializer - Deserializer class for keys.

spring.kafka.consumer.value-deserializer - Deserializer class for values.

Let's create a Kafka Consumer to receive JSON messages from the topic. 

In an email-service project, create a package named kafka. Within a kafka package, create a class named OrderConsumer and add the following content to it:
package net.javaguides.emailservice.kafka;

import net.javaguides.basedomains.dto.OrderEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class OrderConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderConsumer.class);

    @KafkaListener(
            topics = "${spring.kafka.topic.name}"
            ,groupId = "${spring.kafka.consumer.group-id}"
    )
    public void consume(OrderEvent event){
        LOGGER.info(String.format("Order event received in email service => %s", event.toString()));

        // send an email to the customer
    }
}

Comments