This guide is beginner-friendly and explains:
- What is CQRS?
- Why do we need it?
- How does it work?
- A step-by-step example with Spring Boot
- Real-world applications of CQRS
- Using Kafka as a message broker for event-driven CQRS
1️⃣ What is CQRS?
CQRS (Command Query Responsibility Segregation) is a design pattern that separates reading and writing operations into different models. Instead of using a single database for both tasks, it introduces two separate models:
- Command Model → Handles write operations (Insert, Update, Delete).
- Query Model → Handles read operations (Select, Fetch, View).
This means that instead of having one service do both reads and writes, we split it into two services:
- One service for handling commands (creating, updating, or deleting data).
- Another service for handling queries (fetching and displaying data).
🔍 How CQRS Works with Kafka
1️⃣ User sends a command (e.g., create order) → The Command Service writes to the database.
2️⃣ Command Service saves the data in the database and publishes an event to Kafka.
3️⃣ The Query Service listens to the Kafka topic, processes the event, and updates the read database (MongoDB).
4️⃣ User queries for order details → The Query Service retrieves data from the optimized read database.
This architecture ensures better performance by decoupling read and write operations and enables event-driven data consistency.
2️⃣ Implementing CQRS with Kafka as a Message Broker
Step 1: Add Kafka Dependencies
Include Kafka dependencies in pom.xml
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Step 2: Configure Kafka in application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=order-group
Step 3: Implement Order Command Service (Handles Writes)
✅ Order Command Model (JPA Entity for PostgreSQL)
@Entity
public class OrderCommand {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String product;
private int quantity;
private String status;
}
✅ Order Command Repository
public interface OrderCommandRepository extends JpaRepository<OrderCommand, Long> {}
✅ Kafka Producer for Command Events
@Service
public class OrderCommandService {
private final OrderCommandRepository repository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public OrderCommandService(OrderCommandRepository repository, KafkaTemplate<String, String> kafkaTemplate) {
this.repository = repository;
this.kafkaTemplate = kafkaTemplate;
}
public OrderCommand createOrder(OrderCommand order) {
order.setStatus("CREATED");
OrderCommand savedOrder = repository.save(order);
kafkaTemplate.send("order-events", "OrderCreated: " + savedOrder.getId());
return savedOrder;
}
}
✅ Order Command Controller
@RestController
@RequestMapping("/orders")
public class OrderCommandController {
private final OrderCommandService service;
@Autowired
public OrderCommandController(OrderCommandService service) {
this.service = service;
}
@PostMapping
public ResponseEntity<OrderCommand> createOrder(@RequestBody OrderCommand order) {
return ResponseEntity.ok(service.createOrder(order));
}
}
Step 4: Implement Order Query Service (Handles Reads)
✅ Order Query Model (MongoDB Document)
@Document(collection = "orders")
public class OrderQuery {
@Id
private String id;
private String product;
private int quantity;
private String status;
}
✅ Order Query Repository
public interface OrderQueryRepository extends MongoRepository<OrderQuery, String> {}
✅ Kafka Consumer for Query Updates
@Service
public class OrderQueryService {
private final OrderQueryRepository repository;
@Autowired
public OrderQueryService(OrderQueryRepository repository) {
this.repository = repository;
}
@KafkaListener(topics = "order-events", groupId = "order-group")
public void updateQueryModel(String event) {
String[] data = event.split(": ");
if (data.length == 2 && data[0].equals("OrderCreated")) {
OrderQuery queryModel = new OrderQuery(data[1], "Laptop", 1, "CREATED");
repository.save(queryModel);
}
}
}
✅ Order Query Controller
@RestController
@RequestMapping("/orders")
public class OrderQueryController {
private final OrderQueryService service;
@Autowired
public OrderQueryController(OrderQueryService service) {
this.service = service;
}
@GetMapping
public List<OrderQuery> getAllOrders() {
return service.getAllOrders();
}
}
Step 5: Testing Kafka-based CQRS
# Create a new order (Command Service)
curl -X POST http://localhost:8081/orders -H "Content-Type: application/json" -d '{"product":"Laptop","quantity":1}'
# Verify the order event in Kafka
kafka-console-consumer --bootstrap-server localhost:9092 --topic order-events --from-beginning
# Fetch all orders (Query Service)
curl -X GET http://localhost:8082/orders
3️⃣ Why Use Kafka in CQRS?
✅ Benefits of Kafka in CQRS
✔ Ensures Eventual Consistency – Query service gets updated asynchronously.
✔ Scalable and Decoupled Architecture – Services do not depend on each other directly.
✔ Handles High Traffic Efficiently – Kafka manages event streams for large-scale applications.
✔ Resilient and Fault-Tolerant – Kafka persists events for future recovery.
🎯 Conclusion
By implementing CQRS with Kafka, we achieve separation of concerns, event-driven synchronization, and better scalability.
🚀 Key Takeaways:
✔ CQRS improves data consistency and scalability.
✔ Optimizes read and write operations separately.
✔ Kafka ensures event-driven updates for better efficiency.
✔ Best suited for high-traffic applications requiring real-time updates.
By implementing CQRS, you can enhance system reliability and efficiency! 🚀
Comments
Post a Comment
Leave Comment