Project Reactor - Mono and Flux Examples

In this tutorial, we will see the usage of important methods of Mono and Flux implementation classes.

Project Reactor Library Overview

The Project Reactor is a fourth-generation reactive library that implements Reactive Streams specifications, for building non-blocking applications on the JVM.
Project reactor libraries provide two implementations of the Publisher interface:
  1. Mono
  2. Flux
Mono: Returns 0 or 1 element.
The Mono API allows producing only one value.
Flux: Returns 0…N elements.
The Flux can be endless, it can produce multiple values.

Mono vs Flux

Mono and Flux are both implementations of the Publisher interface. In simple terms, we can say that when we're doing something like a computation or making a request to a database or an external service, and expecting a maximum of one result, then we should use Mono.
When we're expecting multiple results from our computation, database, or external service call, then we should use Flux.

Add Project Reactor Dependency

To use Mono and Flux, make sure that you add Project Reactor Core dependency:
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
      <version>3.5.1</version>
    </dependency>

Mono Examples

Mono Example 1 - Create Mono Object

Let's create an example to demonstrate the usage of Mono implementation of the Publisher interface:
public class FluxAndMonoServices {

    public Mono<String> fruitMono() {
        return Mono.just("Mango").log();
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitMono()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output:

15:22:17.647 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:22:17.652 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
15:22:17.653 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
15:22:17.654 [main] INFO reactor.Mono.Just.1 - | onNext(Mango)
Mono -> s = Mango
15:22:17.654 [main] INFO reactor.Mono.Just.1 - | onComplete()

Mono Example 2 - onError() Method

Let me demonstrate the usage of the onError() method. If there are any errors while sending the data then the Publisher call onError() method to send error details to the Subscriber.

public class FluxAndMonoServices {

    public Mono<String> fruitMono() {
        return Mono.just("Mango").log()
                .then(Mono.error(new RuntimeException("Error Occurred while publishing data")));
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitMono()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output:

15:23:23.600 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:23:23.608 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | onNext(Mango)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | onComplete()
15:23:23.611 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Error Occurred while publishing data
Caused by: java.lang.RuntimeException: Error Occurred while publishing data
	at net.javaguides.springbootwebfluxdemo.FluxAndMonoServices.fruitMono(FluxAndMonoServices.java:14)
	at net.javaguides.springbootwebfluxdemo.FluxAndMonoServices.main(FluxAndMonoServices.java:23)

Mono Example 3 - zipWith() Method

The zipWith() method combines the result from this mono and another mono object.
public class FluxAndMonoServices {

    public Mono<String> fruitsMonoZipWith() {
        var fruits = Mono.just("Mango");
        var veggies = Mono.just("Tomato");

        return fruits.zipWith(veggies,
                (first,second) -> first+second).log();
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsMonoZipWith()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output:

15:26:43.501 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:26:43.509 [main] INFO reactor.Mono.Zip.1 - onSubscribe([Fuseable] MonoZip.ZipCoordinator)
15:26:43.510 [main] INFO reactor.Mono.Zip.1 - request(unbounded)
15:26:43.511 [main] INFO reactor.Mono.Zip.1 - onNext(MangoTomato)
Mono -> s = MangoTomato
15:26:43.511 [main] INFO reactor.Mono.Zip.1 - onComplete()

Mono Example 4 - flatMap() and flatMapMany() Methods

flatMap() - Transform the item emitted by this Mono asynchronously, returning the value emitted by another Mono (possibly changing the value type).

flatMapMany() - Transform the item emitted by this Mono into a Publisher, then forward its emissions into the returned Flux.

Here is an example that demonstrates the above methods:
public class FluxAndMonoExamples {

    public Mono<List<String>> fruitMonoFlatMap() {
        return Mono.just("Mango")
                .flatMap(s -> Mono.just(List.of(s.split(""))))
                .log();
    }

    public Flux<String> fruitMonoFlatMapMany() {
        return Mono.just("Mango")
                .flatMapMany(s -> Flux.just(s.split("")))
                .log();
    }

    public static void main(String[] args) {

        FluxAndMonoExamples fluxAndMonoServices
                = new FluxAndMonoExamples();

        fluxAndMonoServices.fruitMonoFlatMap()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitMonoFlatMapMany()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output:

16:01:59.002 [main] INFO reactor.Mono.FlatMap.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
16:01:59.003 [main] INFO reactor.Mono.FlatMap.1 - | request(unbounded)
16:01:59.003 [main] INFO reactor.Mono.FlatMap.1 - | onNext([M, a, n, g, o])
s = [M, a, n, g, o]
16:01:59.004 [main] INFO reactor.Mono.FlatMap.1 - | onComplete()
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - request(unbounded)
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(M)
Mono -> s = M
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(a)
Mono -> s = a
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(n)
Mono -> s = n
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(g)
Mono -> s = g
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(o)
Mono -> s = o
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onComplete()

Flux Examples

Flux Example 1 - Create Flux Object

Flux represents an Asynchronous Sequence of 0-N Items. This is like a stream of 0 to N items, and we can do various transformations to this stream, including transforming it to an entirely different type of 0-N item stream.


Here is a simple Flux example that returns multiple elements:
public class FluxAndMonoServices {

    public Flux<String> fruitsFlux() {
        return Flux.fromIterable(List.of("Mango","Orange","Banana")).log();
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFlux()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

15:31:01.901 [main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:31:01.902 [main] INFO reactor.Flux.Iterable.1 - | request(unbounded)
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Mango)
s = Mango
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Orange)
s = Orange
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Banana)
s = Banana
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onComplete()

Flux Example 2 - map() Method Example

The map() transform the items emitted by this Flux by applying a synchronous function to each item.

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxMap() {
        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .map(String::toUpperCase);
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxMap()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

s = MANGO
s = ORANGE
s = BANANA

Flux Example 3 - filter() Method Example

The filter() method evaluates each source value against the given Predicate.

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxFilter(int number) {
        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .filter(s -> s.length() > number);
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxFilter(5)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

s = Orange
s = Banana

Flux Example 4 - flatMap() and delayElements() Methods Example

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxFlatMap() {
        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .flatMap(s -> Flux.just(s.split("")))
                .log();
    }

    public Flux<String> fruitsFluxFlatMapAsync() {
        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .flatMap(s -> Flux.just(s.split(""))
                .delayElements(Duration.ofMillis(
                        new Random().nextInt(1000)
                )))
                .log();
    }


    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxFlatMap()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxFlatMapAsync()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

15:38:03.789 [main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(M)
s = M
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(g)
s = g
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(o)
s = o
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(O)
s = O
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(r)
s = r
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(g)
s = g
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(e)
s = e
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(B)
s = B
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onComplete()
15:38:03.793 [main] INFO reactor.Flux.FlatMap.2 - onSubscribe(FluxFlatMap.FlatMapMain)
15:38:03.793 [main] INFO reactor.Flux.FlatMap.2 - request(unbounded)

Flux Example 5 - transform(), defaultIfEmpty() and switchIfEmpty() Methods

transform() - Transform this Flux in order to generate a target Flux.

defaultIfEmpty() - Provide a default unique value if this sequence is completed without any data.

switchIfEmpty() - Switch to an alternative Publisher if this sequence is completed without any data.

Here is an example that demonstrates the above methods:

public class FluxAndMonoServices {


    public Flux<String> fruitsFluxTransform(int number) {

        Function<Flux<String>,Flux<String>> filterData
                = data -> data.filter(s -> s.length() > number);

        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .transform(filterData)
                .log();
                //.filter(s -> s.length() > number);
    }

    public Flux<String> fruitsFluxTransformDefaultIfEmpty(int number) {

        Function<Flux<String>,Flux<String>> filterData
                = data -> data.filter(s -> s.length() > number);

        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .transform(filterData)
                .defaultIfEmpty("Default")
                .log();

    }

    public Flux<String> fruitsFluxTransformSwitchIfEmpty(int number) {

        Function<Flux<String>,Flux<String>> filterData
                = data -> data.filter(s -> s.length() > number);

        return Flux.fromIterable(List.of("Mango","Orange","Banana"))
                .transform(filterData)
                .switchIfEmpty(Flux.just("Pineapple","Jack Fruit")
                            .transform(filterData))
                .log();

    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxTransform(5)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxTransformDefaultIfEmpty(6)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxTransformSwitchIfEmpty(6)
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

15:42:00.075 [main] INFO reactor.Flux.FilterFuseable.1 - | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | request(unbounded)
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onNext(Orange)
s = Orange
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onNext(Banana)
s = Banana
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onComplete()
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onSubscribe([Fuseable] FluxDefaultIfEmpty.DefaultIfEmptySubscriber)
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - request(unbounded)
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onNext(Default)
s = Default
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onComplete()
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - request(unbounded)
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onNext(Pineapple)
s = Pineapple
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onNext(Jack Fruit)
s = Jack Fruit
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onComplete()

Flux Example 6 - concat() and concatWith() Methods

concat() - Concatenate all sources provided in an Iterable, forwarding element emitted by the sources downstream.

concatWith() - Concatenate emissions of this Flux with the provided Publisher (no interleave).

Here is an example that demonstrates the above methods:

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxConcat() {
        var fruits = Flux.just("Mango","Orange");
        var veggies = Flux.just("Tomato","Lemon");

        return Flux.concat(fruits,veggies);
    }

    public Flux<String> fruitsFluxConcatWith() {
        var fruits = Flux.just("Mango","Orange");
        var veggies = Flux.just("Tomato","Lemon");

        return fruits.concatWith(veggies);
    }


    public Flux<String> fruitsMonoConcatWith() {
        var fruits = Mono.just("Mango");
        var veggies = Mono.just("Tomato");

        return fruits.concatWith(veggies);
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxConcat()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxConcatWith()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsMonoConcatWith()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Tomato

Flux Example 7 - merge() and mergeWith() Methods

merge() - Merge data from Publisher sequences contained in an array / vararg into an interleaved merged sequence.

mergeWith() - Merge data from this Flux and a Publisher into an interleaved merged sequence.

Here is an example that demonstrates the above methods:

public class FluxAndMonoServices {

    public Flux<String> fruitsFluxMerge() {
        var fruits = Flux.just("Mango","Orange");
        var veggies = Flux.just("Tomato","Lemon");
        return Flux.merge(fruits,veggies);
    }

    public Flux<String> fruitsFluxMergeWith() {
        var fruits = Flux.just("Mango","Orange");
        var veggies = Flux.just("Tomato","Lemon");
        return fruits.mergeWith(veggies);
    }

    public static void main(String[] args) {

        FluxAndMonoServices fluxAndMonoServices
                = new FluxAndMonoServices();

        fluxAndMonoServices.fruitsFluxMerge()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxMergeWith()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });
    }
}

Output:

s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Orange
s = Tomato
s = Lemon

Flux Example 8 - zip() and zipWith() Methods

public class FluxAndMonoExamples {

    public Flux<String> fruitsFluxZip() {
        var fruits = Flux.just("Mango","Orange");
        var veggies = Flux.just("Tomato","Lemon");

        return Flux.zip(fruits,veggies,
                (first,second) -> first+second).log();
    }

    public Flux<String> fruitsFluxZipWith() {
        var fruits = Flux.just("Mango","Orange");
        var veggies = Flux.just("Tomato","Lemon");

        return fruits.zipWith(veggies,
                (first,second) -> first+second).log();
    }

    public Flux<String> fruitsFluxZipTuple() {
        var fruits = Flux.just("Mango","Orange");
        var veggies = Flux.just("Tomato","Lemon");
        var moreVeggies = Flux.just("Potato","Beans");

        return Flux.zip(fruits,veggies,moreVeggies)
                .map(objects -> objects.getT1() + objects.getT2() + objects.getT3());
    }

    public static void main(String[] args) {

        FluxAndMonoExamples fluxAndMonoServices
                = new FluxAndMonoExamples();

        fluxAndMonoServices.fruitsFluxZip()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxZipWith()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });

        fluxAndMonoServices.fruitsFluxZipTuple()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output:

16:07:40.867 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:07:40.874 [main] INFO reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
16:07:40.875 [main] INFO reactor.Flux.Zip.1 - request(unbounded)
16:07:40.876 [main] INFO reactor.Flux.Zip.1 - onNext(MangoTomato)
s = MangoTomato
16:07:40.876 [main] INFO reactor.Flux.Zip.1 - onNext(OrangeLemon)
s = OrangeLemon
16:07:40.876 [main] INFO reactor.Flux.Zip.1 - onComplete()
16:07:40.877 [main] INFO reactor.Flux.Zip.2 - onSubscribe(FluxZip.ZipCoordinator)
16:07:40.877 [main] INFO reactor.Flux.Zip.2 - request(unbounded)
16:07:40.877 [main] INFO reactor.Flux.Zip.2 - onNext(MangoTomato)
Mono -> s = MangoTomato
16:07:40.877 [main] INFO reactor.Flux.Zip.2 - onNext(OrangeLemon)
Mono -> s = OrangeLemon
16:07:40.877 [main] INFO reactor.Flux.Zip.2 - onComplete()
Mono -> s = MangoTomatoPotato
Mono -> s = OrangeLemonBeans

Flux Example 9 - Different onError() Method Scenarios

public class FluxAndMonoExamples {

    public Flux<String> fruitsFluxOnErrorReturn() {
        return Flux.just("Apple","Mango")
                .concatWith(Flux.error(
                        new RuntimeException("Exception Occurred")
                ))
                .onErrorReturn("Orange");
    }

    public Flux<String> fruitsFluxOnErrorContinue() {
        return Flux.just("Apple","Mango","Orange")
                .map(s -> {
                    if (s.equalsIgnoreCase("Mango"))
                        throw new RuntimeException("Exception Occurred");
                    return s.toUpperCase();
                })
                .onErrorContinue((e,f) -> {
                    System.out.println("e = " + e);
                    System.out.println("f = " + f);
                });
    }

    public Flux<String> fruitsFluxOnErrorMap() {
        return Flux.just("Apple","Mango","Orange")
                .checkpoint("Error Checkpoint1")
                .map(s -> {
                    if (s.equalsIgnoreCase("Mango"))
                        throw new RuntimeException("Exception Occurred");
                    return s.toUpperCase();
                })
                .checkpoint("Error Checkpoint2")
                .onErrorMap(throwable -> {
                    System.out.println("throwable = " + throwable);
                    return new IllegalStateException("From onError Map");
                });
    }

    public Flux<String> fruitsFluxOnError() {
        return Flux.just("Apple","Mango","Orange")
                .map(s -> {
                    if (s.equalsIgnoreCase("Mango"))
                        throw new RuntimeException("Exception Occurred");
                    return s.toUpperCase();
                })
                .doOnError(throwable -> {
                    System.out.println("throwable = " + throwable);

                });
    }



    public static void main(String[] args) {

        FluxAndMonoExamples fluxAndMonoServices
                = new FluxAndMonoExamples();

        fluxAndMonoServices.fruitsFluxOnError()
                .subscribe(s -> {
                    System.out.println("s = " + s);
                });

        fluxAndMonoServices.fruitsFluxOnErrorReturn()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });

        fluxAndMonoServices.fruitsFluxOnErrorContinue()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });

        fluxAndMonoServices.fruitsFluxOnErrorMap()
                .subscribe(s -> {
                    System.out.println("Mono -> s = " + s);
                });
    }
}

Output

16:10:52.445 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
s = APPLE
throwable = java.lang.RuntimeException: Exception Occurred
16:10:52.452 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Exception Occurred
Caused by: java.lang.RuntimeException: Exception Occurred
	at net.javaguides.springbootwebfluxdemo.FluxAndMonoExamples.lambda$fruitsFluxOnError$4(FluxAndMonoExamples.java:53)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:172)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:97)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
	at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8660)
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:8781)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8626)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8550)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8493)
	at net.javaguides.springbootwebfluxdemo.FluxAndMonoExamples.main(FluxAndMonoExamples.java:70)
Mono -> s = Apple
Mono -> s = Mango
Mono -> s = Orange
Mono -> s = APPLE
e = java.lang.RuntimeException: Exception Occurred
f = Mango
Mono -> s = ORANGE
Mono -> s = APPLE
throwable = java.lang.RuntimeException: Exception Occurred
16:10:52.472 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: From onError Map
Caused by: java.lang.IllegalStateException: From onError Map
	at net.javaguides.springbootwebfluxdemo.FluxAndMonoExamples.lambda$fruitsFluxOnErrorMap$3(FluxAndMonoExamples.java:45)
	at reactor.core.publisher.Flux.lambda$onErrorMap$27(Flux.java:7088)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:172)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:97)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:649)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:649)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2341)
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:633)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:633)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8660)
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:8781)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8626)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8550)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8493)
	at net.javaguides.springbootwebfluxdemo.FluxAndMonoExamples.main(FluxAndMonoExamples.java:85)

Related Java Reactive Programming Examples

Reactive Programming in Java

In this tutorial, we will discuss Reactive programming and how to achieve Reactive programming in Java.

Spring WebFlux Tutorial

In this tutorial, we will learn about Spring WebFlux and how to build reactive REST APIs using Spring WebFlux, Spring Boot, and MongoDB database.

Spring Boot WebFlux MongoDB CRUD REST API Tutorial

In this tutorial, you will learn how to build CRUD REST APIs using Spring Boot, Spring WebFlux, and MongoDB NoSQL database.

Testing Spring WebFlux Reactive CRUD Rest APIs using WebTestClient

In this tutorial, we will learn how to write Integration tests to test Spring WebFlux reactive CRUD REST APIs using WebTestClient.

Spring WebFlux Functional Endpoints CRUD REST API Example

In this tutorial, we will new functional-style programming model to build reactive CRUD REST APIs using Spring Boot 3, Spring WebFlux, MongoDB, and IntelliJ IDEA.

Unit Testing Spring WebFlux CRUD REST API using JUnit and Mockito

In this tutorial, we will learn how to unit test Spring WebFlux controller (Reactive CRUD REST APIs) using JUnit and Mockito frameworks.

Comments