In this tutorial, we will see the usage of important methods of Mono and Flux implementation classes.
Project Reactor Library Overview
The Project Reactor is a fourth-generation reactive library that implements Reactive Streams specifications, for building non-blocking applications on the JVM.Project reactor libraries provide two implementations of the Publisher interface:- Mono
- Flux
Mono: Returns 0 or 1 element.
The Mono API allows producing only one value.Flux: Returns 0…N elements.
The Flux can be endless, it can produce multiple values.
- Mono
- Flux
The Mono API allows producing only one value.
The Flux can be endless, it can produce multiple values.
Mono vs Flux
Mono and Flux are both implementations of the Publisher interface. In simple terms, we can say that when we're doing something like a computation or making a request to a database or an external service, and expecting a maximum of one result, then we should use Mono.When we're expecting multiple results from our computation, database, or external service call, then we should use Flux.
Add Project Reactor Dependency
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.1</version>
</dependency>
Mono Examples
Mono Example 1 - Create Mono Object
public class FluxAndMonoServices {
public Mono<String> fruitMono() {
return Mono.just("Mango").log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitMono()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output:
15:22:17.647 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:22:17.652 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
15:22:17.653 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
15:22:17.654 [main] INFO reactor.Mono.Just.1 - | onNext(Mango)
Mono -> s = Mango
15:22:17.654 [main] INFO reactor.Mono.Just.1 - | onComplete()
Mono Example 2 - onError() Method
public class FluxAndMonoServices {
public Mono<String> fruitMono() {
return Mono.just("Mango").log()
.then(Mono.error(new RuntimeException("Error Occurred while publishing data")));
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitMono()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output:
15:23:23.600 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:23:23.608 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | onNext(Mango)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | onComplete()
15:23:23.611 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Error Occurred while publishing data
Caused by: java.lang.RuntimeException: Error Occurred while publishing data
at net.javaguides.springbootwebfluxdemo.FluxAndMonoServices.fruitMono(FluxAndMonoServices.java:14)
at net.javaguides.springbootwebfluxdemo.FluxAndMonoServices.main(FluxAndMonoServices.java:23)
Mono Example 3 - zipWith() Method
public class FluxAndMonoServices {
public Mono<String> fruitsMonoZipWith() {
var fruits = Mono.just("Mango");
var veggies = Mono.just("Tomato");
return fruits.zipWith(veggies,
(first,second) -> first+second).log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsMonoZipWith()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output:
15:26:43.501 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:26:43.509 [main] INFO reactor.Mono.Zip.1 - onSubscribe([Fuseable] MonoZip.ZipCoordinator)
15:26:43.510 [main] INFO reactor.Mono.Zip.1 - request(unbounded)
15:26:43.511 [main] INFO reactor.Mono.Zip.1 - onNext(MangoTomato)
Mono -> s = MangoTomato
15:26:43.511 [main] INFO reactor.Mono.Zip.1 - onComplete()
Mono Example 4 - flatMap() and flatMapMany() Methods
public class FluxAndMonoExamples {
public Mono<List<String>> fruitMonoFlatMap() {
return Mono.just("Mango")
.flatMap(s -> Mono.just(List.of(s.split(""))))
.log();
}
public Flux<String> fruitMonoFlatMapMany() {
return Mono.just("Mango")
.flatMapMany(s -> Flux.just(s.split("")))
.log();
}
public static void main(String[] args) {
FluxAndMonoExamples fluxAndMonoServices
= new FluxAndMonoExamples();
fluxAndMonoServices.fruitMonoFlatMap()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitMonoFlatMapMany()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output:
16:01:59.002 [main] INFO reactor.Mono.FlatMap.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
16:01:59.003 [main] INFO reactor.Mono.FlatMap.1 - | request(unbounded)
16:01:59.003 [main] INFO reactor.Mono.FlatMap.1 - | onNext([M, a, n, g, o])
s = [M, a, n, g, o]
16:01:59.004 [main] INFO reactor.Mono.FlatMap.1 - | onComplete()
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - request(unbounded)
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(M)
Mono -> s = M
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(a)
Mono -> s = a
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(n)
Mono -> s = n
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(g)
Mono -> s = g
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(o)
Mono -> s = o
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onComplete()
Flux Examples
Flux Example 1 - Create Flux Object
Flux represents an Asynchronous Sequence of 0-N Items. This is like a stream of 0 to N items, and we can do various transformations to this stream, including transforming it to an entirely different type of 0-N item stream.
public class FluxAndMonoServices {
public Flux<String> fruitsFlux() {
return Flux.fromIterable(List.of("Mango","Orange","Banana")).log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFlux()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
15:31:01.901 [main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:31:01.902 [main] INFO reactor.Flux.Iterable.1 - | request(unbounded)
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Mango)
s = Mango
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Orange)
s = Orange
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Banana)
s = Banana
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onComplete()
Flux Example 2 - map() Method Example
The map() transform the items emitted by this Flux by applying a synchronous function to each item.
public class FluxAndMonoServices {
public Flux<String> fruitsFluxMap() {
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.map(String::toUpperCase);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxMap()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
s = MANGO
s = ORANGE
s = BANANA
Flux Example 3 - filter() Method Example
The filter() method evaluates each source value against the given Predicate.public class FluxAndMonoServices {
public Flux<String> fruitsFluxFilter(int number) {
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.filter(s -> s.length() > number);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxFilter(5)
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
s = Orange
s = Banana
Flux Example 4 - flatMap() and delayElements() Methods Example
public class FluxAndMonoServices {
public Flux<String> fruitsFluxFlatMap() {
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.flatMap(s -> Flux.just(s.split("")))
.log();
}
public Flux<String> fruitsFluxFlatMapAsync() {
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.flatMap(s -> Flux.just(s.split(""))
.delayElements(Duration.ofMillis(
new Random().nextInt(1000)
)))
.log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxFlatMap()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxFlatMapAsync()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
15:38:03.789 [main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(M)
s = M
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(g)
s = g
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(o)
s = o
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(O)
s = O
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(r)
s = r
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(g)
s = g
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(e)
s = e
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(B)
s = B
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onComplete()
15:38:03.793 [main] INFO reactor.Flux.FlatMap.2 - onSubscribe(FluxFlatMap.FlatMapMain)
15:38:03.793 [main] INFO reactor.Flux.FlatMap.2 - request(unbounded)
Flux Example 5 - transform(), defaultIfEmpty() and switchIfEmpty() Methods
public class FluxAndMonoServices {
public Flux<String> fruitsFluxTransform(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.transform(filterData)
.log();
//.filter(s -> s.length() > number);
}
public Flux<String> fruitsFluxTransformDefaultIfEmpty(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.transform(filterData)
.defaultIfEmpty("Default")
.log();
}
public Flux<String> fruitsFluxTransformSwitchIfEmpty(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.transform(filterData)
.switchIfEmpty(Flux.just("Pineapple","Jack Fruit")
.transform(filterData))
.log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxTransform(5)
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxTransformDefaultIfEmpty(6)
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxTransformSwitchIfEmpty(6)
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
15:42:00.075 [main] INFO reactor.Flux.FilterFuseable.1 - | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | request(unbounded)
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onNext(Orange)
s = Orange
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onNext(Banana)
s = Banana
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onComplete()
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onSubscribe([Fuseable] FluxDefaultIfEmpty.DefaultIfEmptySubscriber)
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - request(unbounded)
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onNext(Default)
s = Default
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onComplete()
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - request(unbounded)
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onNext(Pineapple)
s = Pineapple
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onNext(Jack Fruit)
s = Jack Fruit
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onComplete()
Flux Example 6 - concat() and concatWith() Methods
public class FluxAndMonoServices {
public Flux<String> fruitsFluxConcat() {
var fruits = Flux.just("Mango","Orange");
var veggies = Flux.just("Tomato","Lemon");
return Flux.concat(fruits,veggies);
}
public Flux<String> fruitsFluxConcatWith() {
var fruits = Flux.just("Mango","Orange");
var veggies = Flux.just("Tomato","Lemon");
return fruits.concatWith(veggies);
}
public Flux<String> fruitsMonoConcatWith() {
var fruits = Mono.just("Mango");
var veggies = Mono.just("Tomato");
return fruits.concatWith(veggies);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxConcat()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxConcatWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsMonoConcatWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Tomato
Flux Example 7 - merge() and mergeWith() Methods
Here is an example that demonstrates the above methods:
public class FluxAndMonoServices {
public Flux<String> fruitsFluxMerge() {
var fruits = Flux.just("Mango","Orange");
var veggies = Flux.just("Tomato","Lemon");
return Flux.merge(fruits,veggies);
}
public Flux<String> fruitsFluxMergeWith() {
var fruits = Flux.just("Mango","Orange");
var veggies = Flux.just("Tomato","Lemon");
return fruits.mergeWith(veggies);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxMerge()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxMergeWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Orange
s = Tomato
s = Lemon
Flux Example 8 - zip() and zipWith() Methods
public class FluxAndMonoExamples {
public Flux<String> fruitsFluxZip() {
var fruits = Flux.just("Mango","Orange");
var veggies = Flux.just("Tomato","Lemon");
return Flux.zip(fruits,veggies,
(first,second) -> first+second).log();
}
public Flux<String> fruitsFluxZipWith() {
var fruits = Flux.just("Mango","Orange");
var veggies = Flux.just("Tomato","Lemon");
return fruits.zipWith(veggies,
(first,second) -> first+second).log();
}
public Flux<String> fruitsFluxZipTuple() {
var fruits = Flux.just("Mango","Orange");
var veggies = Flux.just("Tomato","Lemon");
var moreVeggies = Flux.just("Potato","Beans");
return Flux.zip(fruits,veggies,moreVeggies)
.map(objects -> objects.getT1() + objects.getT2() + objects.getT3());
}
public static void main(String[] args) {
FluxAndMonoExamples fluxAndMonoServices
= new FluxAndMonoExamples();
fluxAndMonoServices.fruitsFluxZip()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxZipWith()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
fluxAndMonoServices.fruitsFluxZipTuple()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output:
16:07:40.867 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:07:40.874 [main] INFO reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator)
16:07:40.875 [main] INFO reactor.Flux.Zip.1 - request(unbounded)
16:07:40.876 [main] INFO reactor.Flux.Zip.1 - onNext(MangoTomato)
s = MangoTomato
16:07:40.876 [main] INFO reactor.Flux.Zip.1 - onNext(OrangeLemon)
s = OrangeLemon
16:07:40.876 [main] INFO reactor.Flux.Zip.1 - onComplete()
16:07:40.877 [main] INFO reactor.Flux.Zip.2 - onSubscribe(FluxZip.ZipCoordinator)
16:07:40.877 [main] INFO reactor.Flux.Zip.2 - request(unbounded)
16:07:40.877 [main] INFO reactor.Flux.Zip.2 - onNext(MangoTomato)
Mono -> s = MangoTomato
16:07:40.877 [main] INFO reactor.Flux.Zip.2 - onNext(OrangeLemon)
Mono -> s = OrangeLemon
16:07:40.877 [main] INFO reactor.Flux.Zip.2 - onComplete()
Mono -> s = MangoTomatoPotato
Mono -> s = OrangeLemonBeans
Flux Example 9 - Different onError() Method Scenarios
public class FluxAndMonoExamples {
public Flux<String> fruitsFluxOnErrorReturn() {
return Flux.just("Apple","Mango")
.concatWith(Flux.error(
new RuntimeException("Exception Occurred")
))
.onErrorReturn("Orange");
}
public Flux<String> fruitsFluxOnErrorContinue() {
return Flux.just("Apple","Mango","Orange")
.map(s -> {
if (s.equalsIgnoreCase("Mango"))
throw new RuntimeException("Exception Occurred");
return s.toUpperCase();
})
.onErrorContinue((e,f) -> {
System.out.println("e = " + e);
System.out.println("f = " + f);
});
}
public Flux<String> fruitsFluxOnErrorMap() {
return Flux.just("Apple","Mango","Orange")
.checkpoint("Error Checkpoint1")
.map(s -> {
if (s.equalsIgnoreCase("Mango"))
throw new RuntimeException("Exception Occurred");
return s.toUpperCase();
})
.checkpoint("Error Checkpoint2")
.onErrorMap(throwable -> {
System.out.println("throwable = " + throwable);
return new IllegalStateException("From onError Map");
});
}
public Flux<String> fruitsFluxOnError() {
return Flux.just("Apple","Mango","Orange")
.map(s -> {
if (s.equalsIgnoreCase("Mango"))
throw new RuntimeException("Exception Occurred");
return s.toUpperCase();
})
.doOnError(throwable -> {
System.out.println("throwable = " + throwable);
});
}
public static void main(String[] args) {
FluxAndMonoExamples fluxAndMonoServices
= new FluxAndMonoExamples();
fluxAndMonoServices.fruitsFluxOnError()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxOnErrorReturn()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
fluxAndMonoServices.fruitsFluxOnErrorContinue()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
fluxAndMonoServices.fruitsFluxOnErrorMap()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output
16:10:52.445 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
s = APPLE
throwable = java.lang.RuntimeException: Exception Occurred
16:10:52.452 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Exception Occurred
Caused by: java.lang.RuntimeException: Exception Occurred
at net.javaguides.springbootwebfluxdemo.FluxAndMonoExamples.lambda$fruitsFluxOnError$4(FluxAndMonoExamples.java:53)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:172)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:97)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8660)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8781)
at reactor.core.publisher.Flux.subscribe(Flux.java:8626)
at reactor.core.publisher.Flux.subscribe(Flux.java:8550)
at reactor.core.publisher.Flux.subscribe(Flux.java:8493)
at net.javaguides.springbootwebfluxdemo.FluxAndMonoExamples.main(FluxAndMonoExamples.java:70)
Mono -> s = Apple
Mono -> s = Mango
Mono -> s = Orange
Mono -> s = APPLE
e = java.lang.RuntimeException: Exception Occurred
f = Mango
Mono -> s = ORANGE
Mono -> s = APPLE
throwable = java.lang.RuntimeException: Exception Occurred
16:10:52.472 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: From onError Map
Caused by: java.lang.IllegalStateException: From onError Map
at net.javaguides.springbootwebfluxdemo.FluxAndMonoExamples.lambda$fruitsFluxOnErrorMap$3(FluxAndMonoExamples.java:45)
at reactor.core.publisher.Flux.lambda$onErrorMap$27(Flux.java:7088)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:172)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:97)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:649)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:649)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2341)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:633)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:633)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.Flux.subscribe(Flux.java:8660)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8781)
at reactor.core.publisher.Flux.subscribe(Flux.java:8626)
at reactor.core.publisher.Flux.subscribe(Flux.java:8550)
at reactor.core.publisher.Flux.subscribe(Flux.java:8493)
at net.javaguides.springbootwebfluxdemo.FluxAndMonoExamples.main(FluxAndMonoExamples.java:85)
Related Java Reactive Programming Examples
Reactive Programming in Java
Spring WebFlux Tutorial
Spring Boot WebFlux MongoDB CRUD REST API Tutorial
In this tutorial, you will learn how to build CRUD REST APIs using Spring Boot, Spring WebFlux, and MongoDB NoSQL database.
Testing Spring WebFlux Reactive CRUD Rest APIs using WebTestClient
Spring WebFlux Functional Endpoints CRUD REST API Example
Unit Testing Spring WebFlux CRUD REST API using JUnit and Mockito
In this tutorial, we will learn how to unit test Spring WebFlux controller (Reactive CRUD REST APIs) using JUnit and Mockito frameworks.
Comments
Post a Comment
Leave Comment