Reactive Programming in Java

In this tutorial, we will discuss about Reactive programming and how to achieve Reactive programming in Java.

Reactive Programming Overview

Reactive programming is a programming paradigm where the focus is on developing asynchronous and non-blocking applications in an event-driven form. It is a declarative programming model that enables the processing of data streams in a more efficient, scalable, and resilient way. 

Reactive programming deals with the continuous flow of data, rather than individual events or requests, and enables the handling of complex asynchronous operations in a more streamlined way. In Reactive Programming, data streams are modeled as streams of events that are pushed asynchronously to data consumers. These events trigger reactions in the data consumers, which perform some action in response.

Reactive programming is built on the idea of composing streams of events and applying functional transformations to these streams. Reactive Programming aims to provide a more efficient and scalable way of processing data streams. It enables the development of highly responsive and fault-tolerant applications that can handle large volumes of data in real time. It also enables the development of more modular and composable applications by separating concerns and enabling the composition of small, reusable components. 

Reactive Programming is gaining popularity in the development of modern applications, such as web and mobile applications, IoT applications, and real-time data processing systems. Some popular frameworks for implementing Reactive Programming in Java include Reactor, RxJava, and Akka.

Reactive Streams Specification in Java

Reactive Streams Specification is a set of rules or set of guidelines that you need to follow when designing a reactive stream. 

These specifications introduce four interfaces that should be used and overridden when creating a reactive stream.

Publisher

The Publisher is a data source that will always publish events. 

Subscriber

The Subscriber will subscribe/consume the events from the Publisher.

Subscription

The Subscription represents the unique relationship between a Subscriber and a Publisher interface.

Processor

It represents a processing stage – which is both a Subscriber and a Publisher and MUST obey the contracts of both.

Reactive Programming Libraries

A reactive library is nothing but the implementation of reactive specification interfaces. Here are some reactive libraries that are available to us:
  • Project Reactor 
  • RxJava 
  • JDK 9 Flow Reactive Stream
In this tutorial, we are going to use Project Reactor reactive library.

Project Reactor

The Project Reactor is a fourth-generation reactive library, based on the Reactive Streams specification, for building non-blocking applications on the JVM.

Mono and Flux Implementations

Project reactor libraries provide two implementations of the Publisher interface:
  1. Mono
  2. Flux
Mono: Returns 0 or 1 element.
The Mono API allows producing only one value.

Flux: Returns 0…N elements.
The Flux can be endless, it can produce multiple values.

Mono vs Flux

Mono and Flux are both implementations of the Publisher interface. In simple terms, we can say that when we're doing something like a computation or making a request to a database or an external service, and expecting a maximum of one result, then we should use Mono.

When we're expecting multiple results from our computation, database, or external service call, then we should use Flux.

Reactive Stream Workflow


Let's understand the above Reactive stream workflow:
1. The Subscriber will call subscribe() method of the Publisher to subscribe or register with the Publisher.

2. The Publisher creates an instance of Subscription and sends it to Subscriber saying that your subscription is successful.

3. Next, the Subscriber will call the request(n) method of Subscription to request data from the Publisher.

4. Next, Publisher call onNext(data) method to send data to the Subscriber. Publisher call onNext(data) n times. It means if there are 10 items then the Publisher call onNext(data) method 10 times.

5. Once the Publisher sends all the data to Subscriber, the next Publisher call onComplete() method to notify Subscriber that all the data has been sent. If there are any errors while sending the data then the Publisher call onError() method to send error details to the Subscriber.

Mono and Flux Examples

Let's create an example of Mono and Flux to understand Reactive Stream workflow in an action.

To use Mono and Flux, make sure that you add Project Reactor Core dependency:
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
      <version>3.5.1</version>
    </dependency>
If you are using the Spring Boot application then check out Spring WebFlux Tutorial.

Mono Example

Let's create an example to demonstrate the usage of Mono implementation of the Publisher interface:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class MonoFluxDemo {

    @Test
    public void testMono(){

        Mono<?> stringMono = Mono.just("java guides")
                        .log();
        stringMono.subscribe((element) -> {
            System.out.println(element);
        });
    }
}

Output:

14:36:23.902 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:36:23.909 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
14:36:23.910 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
14:36:23.911 [main] INFO reactor.Mono.Just.1 - | onNext(java guides)
java guides
14:36:23.911 [main] INFO reactor.Mono.Just.1 - | onComplete()
Note that the Mono publisher called onComplete() method to notify Subscriber that all the data has been sent successfully.

Next, let me demonstrate the onError() method. If there are any errors while sending the data then the Publisher call onError() method to send error details to the Subscriber.
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

public class MonoFluxDemo {

    @Test
    public void testMono(){

        Mono<?> stringMono = Mono.just("java guides")
                .then(Mono.error(new RuntimeException("Exception occured while emitting the data")))
                        .log();
        stringMono.subscribe((element) -> {
            System.out.println(element);
        }, throwable -> System.out.println(throwable.getMessage()));
    }
}

Output:

14:41:36.788 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:41:36.795 [main] INFO reactor.Mono.IgnoreThen.1 - onSubscribe(MonoIgnoreThen.ThenIgnoreMain)
14:41:36.796 [main] INFO reactor.Mono.IgnoreThen.1 - request(unbounded)
14:41:36.798 [main] ERROR reactor.Mono.IgnoreThen.1 - onError(java.lang.RuntimeException: Exception occured while emitting the data)
14:41:36.798 [main] ERROR reactor.Mono.IgnoreThen.1 - 
java.lang.RuntimeException: Exception occured while emitting the data
	....
Exception occured while emitting the data

Flux Example

Let's create an example to demonstrate the usage of Flux implementation of the Publisher interface:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

public class MonoFluxDemo {
    @Test
    public void testFlux(){
        Flux<String> stringFlux = Flux.just("Apple", "Banana", "Orange", "Mango")
                        .log();
        stringFlux.subscribe((element) -> {
            System.out.println(element);
        });
    }
}

Output:

14:44:22.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:44:22.218 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Apple)
Apple
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Banana)
Banana
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Orange)
Orange
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onNext(Mango)
Mango
14:44:22.219 [main] INFO reactor.Flux.Array.1 - | onComplete()
Next, let me demonstrate get usage of the onError() method:
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

public class MonoFluxDemo {
    @Test
    public void testFlux(){
        Flux<String> stringFlux = Flux.just("orange", "banana", "mango", "apple")
                .concatWithValues("watermelon")
                .concatWith(Flux.error(new RuntimeException("Exception while emitting data")))
                .log();

        stringFlux.subscribe((element) -> {
            System.out.println(element);
        }, throwable -> System.out.println(throwable.getMessage()));
    }
}

Output:

14:45:18.406 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:45:18.411 [main] INFO reactor.Flux.ConcatArray.1 - onSubscribe(FluxConcatArray.ConcatArraySubscriber)
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - request(unbounded)
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(orange)
orange
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(banana)
banana
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(mango)
mango
14:45:18.412 [main] INFO reactor.Flux.ConcatArray.1 - onNext(apple)
apple
14:45:18.413 [main] INFO reactor.Flux.ConcatArray.1 - onNext(watermelon)
watermelon
14:45:18.413 [main] ERROR reactor.Flux.ConcatArray.1 - onError(java.lang.RuntimeException: Exception while emitting data)
14:45:18.413 [main] ERROR reactor.Flux.ConcatArray.1 - 
java.lang.RuntimeException: Exception while emitting data
	at net.javaguides.springbootwebfluxdemo.MonoFluxDemo.testFlux(MonoFluxDemo.java:10)
.....
Exception while emitting data

Related Java Reactive Programming Examples

Spring Boot WebFlux MongoDB CRUD REST API Tutorial

In this tutorial, you will learn how to build CRUD REST APIs using Spring Boot, Spring WebFlux, and MongoDB NoSQL database.

Testing Spring WebFlux Reactive CRUD Rest APIs using WebTestClient

In this tutorial, we will learn how to write Integration tests to test Spring WebFlux reactive CRUD REST APIs using WebTestClient.

Spring WebFlux Functional Endpoints CRUD REST API Example

In this tutorial, we will new functional-style programming model to build reactive CRUD REST APIs using Spring Boot 3, Spring WebFlux, MongoDB, and IntelliJ IDEA.

Unit Testing Spring WebFlux CRUD REST API using JUnit and Mockito

In this tutorial, we will learn how to unit test Spring WebFlux controller (Reactive CRUD REST APIs) using JUnit and Mockito frameworks.

Comments