Project Reactor Java Tutorial

In this tutorial, we will discuss Reactive programming and how to achieve Reactive programming in Java using Project Reactor reactive library.

Reactive programming is a programming paradigm where the focus is on developing asynchronous and non-blocking applications in an event-driven form.

The Project Reactor is a fourth-generation reactive library, based on the Reactive Streams specification, for building non-blocking applications on the JVM.

Reactive Streams Specification in Java

Reactive Streams Specification is a set of rules or set of guidelines that you need to follow when designing a reactive stream. 

These specifications introduce four interfaces that should be used and overridden when creating a reactive stream.

Publisher

The Publisher is a data source that will always publish events. 

Subscriber

The Subscriber will subscribe/consume the events from the Publisher.

Subscription

The Subscription represents the unique relationship between a Subscriber and a Publisher interface.

Processor

It represents a processing stage – which is both a Subscriber and a Publisher and MUST obey the contracts of both.

Reactive Programming Libraries

A reactive library is nothing but the implementation of reactive specification interfaces. Here are some reactive libraries that are available to us:
  • Project Reactor 
  • RxJava 
  • JDK 9 Flow Reactive Stream
In this tutorial, we are going to use Project Reactor reactive library.

Project Reactor

The Project Reactor is a fourth-generation reactive library that implements Reactive Streams specifications, for building non-blocking applications on the JVM.

Project reactor libraries provide two implementations of the Publisher interface:
  1. Mono
  2. Flux
Mono: Returns 0 or 1 element.
The Mono API allows producing only one value.

Flux: Returns 0…N elements.
The Flux can be endless, it can produce multiple values.

Mono vs Flux

Mono and Flux are both implementations of the Publisher interface. In simple terms, we can say that when we're doing something like a computation or making a request to a database or an external service, and expecting a maximum of one result, then we should use Mono.

When we're expecting multiple results from our computation, database, or external service call, then we should use Flux.

Reactive Stream Workflow


Let's understand the above Reactive stream workflow:
1. The Subscriber will call subscribe() method of the Publisher to subscribe or register with the Publisher.


2. The Publisher creates an instance of Subscription and sends it to Subscriber saying that your subscription is successful.

3. Next, the Subscriber will call the request(n) method of Subscription to request data from the Publisher.

4. Next, Publisher call onNext(data) method to send data to the SubscriberPublisher call onNext(data) n times. It means if there are 10 items then the Publisher call onNext(data) method 10 times.

5. Once the Publisher sends all the data to Subscriber, the next Publisher call onComplete() method to notify Subscriber that all the data has been sent. If there are any errors while sending the data then the Publisher call onError() method to send error details to the Subscriber.

Mono and Flux Examples

Let's create examples of Mono and Flux to understand Reactive Stream workflow in an action.

To use Mono and Flux, make sure that you add Project Reactor Core dependency:
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
      <version>3.5.1</version>
    </dependency>

Mono Example 1

Let's create an example to demonstrate the usage of Mono implementation of the Publisher interface:
public class FluxAndMonoServices {

    public Mono<String> fruitMono() {
        return Mono.just("Mango").log();
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitMono()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output:

15:22:17.647 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:22:17.652 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
15:22:17.653 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
15:22:17.654 [main] INFO reactor.Mono.Just.1 - | onNext(Mango)
Mono -> s = Mango
15:22:17.654 [main] INFO reactor.Mono.Just.1 - | onComplete()

Mono Example 2

Let me demonstrate the usage of the onError() method. If there are any errors while sending the data then the Publisher call onError() method to send error details to the Subscriber.

public class FluxAndMonoServices {

    public Mono<String> fruitMono() {
        return Mono.just("Mango").log()
                .then(Mono.error(new RuntimeException("Error Occurred while publishing data")));
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitMono()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output:

15:23:23.600 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:23:23.608 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | onNext(Mango)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | onComplete()
15:23:23.611 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Error Occurred while publishing data
Caused by: java.lang.RuntimeException: Error Occurred while publishing data
	at net.javaguides.springbootwebfluxdemo.FluxAndMonoServices.fruitMono(FluxAndMonoServices.java:14)
	at net.javaguides.springbootwebfluxdemo.FluxAndMonoServices.main(FluxAndMonoServices.java:23)

Mono Example 3 - zipWith() Method

The zipWith() method combines the result from this mono and another mono object.
public class FluxAndMonoServices {

    public Mono<String> fruitsMonoZipWith() {
        var fruits = Mono.just("Mango");
        var veggies = Mono.just("Tomato");

        return fruits.zipWith(veggies,
                (first,second) -> first+second).log();
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsMonoZipWith()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output:

15:26:43.501 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:26:43.509 [main] INFO reactor.Mono.Zip.1 - onSubscribe([Fuseable] MonoZip.ZipCoordinator)
15:26:43.510 [main] INFO reactor.Mono.Zip.1 - request(unbounded)
15:26:43.511 [main] INFO reactor.Mono.Zip.1 - onNext(MangoTomato)
Mono -> s = MangoTomato
15:26:43.511 [main] INFO reactor.Mono.Zip.1 - onComplete()

Mono - flatMap() and flatMapMany() Methods

flatMap() - Transform the item emitted by this Mono asynchronously, returning the value emitted by another Mono (possibly changing the value type).

flatMapMany() - Transform the item emitted by this Mono into a Publisher, then forward its emissions into the returned Flux.

Here is an example that demonstrates the above methods:
public class FluxAndMonoExamples {

    public Mono<List<String>> fruitMonoFlatMap() {
        return Mono.just("Mango")
                .flatMap(s -> Mono.just(List.of(s.split(""))))
                .log();
    }

    public Flux<String> fruitMonoFlatMapMany() {
        return Mono.just("Mango")
                .flatMapMany(s -> Flux.just(s.split("")))
                .log();
    }

    public static void main(String[] args) {

        FluxAndMonoExamples fluxAndMonoServices
                = new FluxAndMonoExamples();

        fluxAndMonoServices.fruitMonoFlatMap()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitMonoFlatMapMany()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output:

16:01:59.002 [main] INFO reactor.Mono.FlatMap.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
16:01:59.003 [main] INFO reactor.Mono.FlatMap.1 - | request(unbounded)
16:01:59.003 [main] INFO reactor.Mono.FlatMap.1 - | onNext([M, a, n, g, o])
s = [M, a, n, g, o]
16:01:59.004 [main] INFO reactor.Mono.FlatMap.1 - | onComplete()
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - request(unbounded)
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(M)
Mono -> s = M
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(a)
Mono -> s = a
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(n)
Mono -> s = n
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(g)
Mono -> s = g
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(o)
Mono -> s = o
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onComplete()

Flux Simple Example

Flux represents an Asynchronous Sequence of 0-N Items. This is like a stream of 0 to N items, and we can do various transformations to this stream, including transforming it to an entirely different type of 0-N item stream.

Here is a simple Flux example that returns multiple elements:
public class FluxAndMonoServices {

    public Flux<String> fruitsFlux() {
        return Flux.fromIterable(List.of("Mango","Orange","Banana")).log();
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFlux()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

15:31:01.901 [main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:31:01.902 [main] INFO reactor.Flux.Iterable.1 - | request(unbounded)
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Mango)
s = Mango
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Orange)
s = Orange
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Banana)
s = Banana
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onComplete()

Flux - map() Method Example

The map() transform the items emitted by this Flux by applying a synchronous function to each item.
public class FluxAndMonoServices {

    public Flux<String> fruitsFluxMap() {
        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .map(String::toUpperCase);
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxMap()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

s = MANGO
s = ORANGE
s = BANANA

Flux - filter() Method Example

The filter() method evaluates each source value against the given Predicate.

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxFilter(int number) {
        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .filter(s -> s.length() > number);
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxFilter(5)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

s = Orange
s = Banana

Flux - flatMap() and delayElements() Methods Example

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxFlatMap() {
        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .flatMap(s -> Flux.just(s.split("")))
                .log();
    }

    public Flux<String> fruitsFluxFlatMapAsync() {
        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .flatMap(s -> Flux.just(s.split(""))
                .delayElements(Duration.ofMillis(
                        new Random().nextInt(1000)
                )))
                .log();
    }


    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxFlatMap()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxFlatMapAsync()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

15:38:03.789 [main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(M)
s = M
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(g)
s = g
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(o)
s = o
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(O)
s = O
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(r)
s = r
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(g)
s = g
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(e)
s = e
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(B)
s = B
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onComplete()
15:38:03.793 [main] INFO reactor.Flux.FlatMap.2 - onSubscribe(FluxFlatMap.FlatMapMain)
15:38:03.793 [main] INFO reactor.Flux.FlatMap.2 - request(unbounded)

Flux - transform(), defaultIfEmpty() and switchIfEmpty() Methods

transform() - Transform this Flux in order to generate a target Flux.

defaultIfEmpty() - Provide a default unique value if this sequence is completed without any data.

switchIfEmpty() - Switch to an alternative Publisher if this sequence is completed without any data.

Here is an example that demonstrates the above methods:

public class FluxAndMonoServices {


    public Flux<String> fruitsFluxTransform(int number) {

        Function<Flux<String>,Flux<String>> filterData
                = data -> data.filter(s -> s.length() > number);

        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .transform(filterData)
                .log();
                //.filter(s -> s.length() > number);
    }

    public Flux<String> fruitsFluxTransformDefaultIfEmpty(int number) {

        Function<Flux<String>,Flux<String>> filterData
                = data -> data.filter(s -> s.length() > number);

        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .transform(filterData)
                .defaultIfEmpty("Default")
                .log();

    }

    public Flux<String> fruitsFluxTransformSwitchIfEmpty(int number) {

        Function<Flux<String>,Flux<String>> filterData
                = data -> data.filter(s -> s.length() > number);

        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .transform(filterData)
                .switchIfEmpty(Flux.just("Pineapple","Jack Fruit")
                            .transform(filterData))
                .log();

    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxTransform(5)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxTransformDefaultIfEmpty(6)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxTransformSwitchIfEmpty(6)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

15:42:00.075 [main] INFO reactor.Flux.FilterFuseable.1 - | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | request(unbounded)
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onNext(Orange)
s = Orange
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onNext(Banana)
s = Banana
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onComplete()
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onSubscribe([Fuseable] FluxDefaultIfEmpty.DefaultIfEmptySubscriber)
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - request(unbounded)
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onNext(Default)
s = Default
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onComplete()
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - request(unbounded)
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onNext(Pineapple)
s = Pineapple
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onNext(Jack Fruit)
s = Jack Fruit
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onComplete()

Flux - concat() and concatWith() Methods

concat() - Concatenate all sources provided in an Iterable, forwarding element emitted by the sources downstream.

concatWith() - Concatenate emissions of this Flux with the provided Publisher (no interleave).

Here is an example that demonstrates the above methods:

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxConcat() {
        var fruits = Flux.just("Mango","Orange");
        var veggies = Flux.just("Tomato","Lemon");

        return Flux.concat(fruits,veggies);
    }

    public Flux<String> fruitsFluxConcatWith() {
        var fruits = Flux.just("Mango","Orange");
        var veggies = Flux.just("Tomato","Lemon");

        return fruits.concatWith(veggies);
    }


    public Flux<String> fruitsMonoConcatWith() {
        var fruits = Mono.just("Mango");
        var veggies = Mono.just("Tomato");

        return fruits.concatWith(veggies);
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxConcat()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxConcatWith()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsMonoConcatWith()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Tomato

Flux - merge() and mergeWith() Methods

merge() - Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence.

mergeWith() - Merge data from this Flux and a Publisher into an interleaved merged sequence.

Here is an example that demonstrates the above methods:

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxMerge() {
        var fruits = Flux.just("Mango","Orange");
        var veggies = Flux.just("Tomato","Lemon");
        return Flux.merge(fruits,veggies);
    }

    public Flux<String> fruitsFluxMergeWith() {
        var fruits = Flux.just("Mango","Orange");
        var veggies = Flux.just("Tomato","Lemon");
        return fruits.mergeWith(veggies);
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxMerge()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxMergeWith()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Orange
s = Tomato
s = Lemon

Related Java Reactive Programming Examples

Reactive Programming in Java

In this tutorial, we will discuss Reactive programming and how to achieve Reactive programming in Java.

Spring WebFlux Tutorial

In this tutorial, we will learn about Spring WebFlux and how to build reactive REST APIs using Spring WebFlux, Spring Boot, and MongoDB database.

Spring Boot WebFlux MongoDB CRUD REST API Tutorial

In this tutorial, you will learn how to build CRUD REST APIs using Spring Boot, Spring WebFlux, and MongoDB NoSQL database.

Testing Spring WebFlux Reactive CRUD Rest APIs using WebTestClient

In this tutorial, we will learn how to write Integration tests to test Spring WebFlux reactive CRUD REST APIs using WebTestClient.

Spring WebFlux Functional Endpoints CRUD REST API Example

In this tutorial, we will new functional-style programming model to build reactive CRUD REST APIs using Spring Boot 3, Spring WebFlux, MongoDB, and IntelliJ IDEA.

Unit Testing Spring WebFlux CRUD REST API using JUnit and Mockito

In this tutorial, we will learn how to unit test Spring WebFlux controller (Reactive CRUD REST APIs) using JUnit and Mockito frameworks.


Comments