In this tutorial, we will discuss Reactive programming and how to achieve Reactive programming in Java using Project Reactor reactive library.
Reactive programming is a programming paradigm where the focus is on developing asynchronous and non-blocking applications in an event-driven form.
The Project Reactor is a fourth-generation reactive library, based on the Reactive Streams specification, for building non-blocking applications on the JVM.
Reactive Streams Specification in Java
Publisher
Subscriber
Subscription
Processor
- Project Reactor
- RxJava
- JDK 9 Flow Reactive Stream
Project Reactor
- Mono
- Flux
The Mono API allows producing only one value.
Flux: Returns 0…N elements.
The Flux can be endless, it can produce multiple values.
Mono vs Flux
Reactive Stream Workflow
Let's understand the above Reactive stream workflow:
1. The Subscriber will call subscribe() method of the Publisher to subscribe or register with the Publisher.
2. The Publisher creates an instance of Subscription and sends it to Subscriber saying that your subscription is successful.
4. Next, Publisher call onNext(data) method to send data to the Subscriber. Publisher call onNext(data) n times. It means if there are 10 items then the Publisher call onNext(data) method 10 times.
5. Once the Publisher sends all the data to Subscriber, the next Publisher call onComplete() method to notify Subscriber that all the data has been sent. If there are any errors while sending the data then the Publisher call onError() method to send error details to the Subscriber.
Mono and Flux Examples
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.5.1</version>
</dependency>
Mono Example 1
public class FluxAndMonoServices {
public Mono<String> fruitMono() {
return Mono.just("Mango").log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitMono()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output:
15:22:17.647 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:22:17.652 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
15:22:17.653 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
15:22:17.654 [main] INFO reactor.Mono.Just.1 - | onNext(Mango)
Mono -> s = Mango
15:22:17.654 [main] INFO reactor.Mono.Just.1 - | onComplete()
Mono Example 2
public class FluxAndMonoServices {
public Mono<String> fruitMono() {
return Mono.just("Mango").log()
.then(Mono.error(new RuntimeException("Error Occurred while publishing data")));
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitMono()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output:
15:23:23.600 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:23:23.608 [main] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | request(unbounded)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | onNext(Mango)
15:23:23.609 [main] INFO reactor.Mono.Just.1 - | onComplete()
15:23:23.611 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Error Occurred while publishing data
Caused by: java.lang.RuntimeException: Error Occurred while publishing data
at net.javaguides.springbootwebfluxdemo.FluxAndMonoServices.fruitMono(FluxAndMonoServices.java:14)
at net.javaguides.springbootwebfluxdemo.FluxAndMonoServices.main(FluxAndMonoServices.java:23)
Mono Example 3 - zipWith() Method
public class FluxAndMonoServices {
public Mono<String> fruitsMonoZipWith() {
var fruits = Mono.just("Mango");
var veggies = Mono.just("Tomato");
return fruits.zipWith(veggies,
(first,second) -> first+second).log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsMonoZipWith()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output:
15:26:43.501 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:26:43.509 [main] INFO reactor.Mono.Zip.1 - onSubscribe([Fuseable] MonoZip.ZipCoordinator)
15:26:43.510 [main] INFO reactor.Mono.Zip.1 - request(unbounded)
15:26:43.511 [main] INFO reactor.Mono.Zip.1 - onNext(MangoTomato)
Mono -> s = MangoTomato
15:26:43.511 [main] INFO reactor.Mono.Zip.1 - onComplete()
Mono - flatMap() and flatMapMany() Methods
public class FluxAndMonoExamples {
public Mono<List<String>> fruitMonoFlatMap() {
return Mono.just("Mango")
.flatMap(s -> Mono.just(List.of(s.split(""))))
.log();
}
public Flux<String> fruitMonoFlatMapMany() {
return Mono.just("Mango")
.flatMapMany(s -> Flux.just(s.split("")))
.log();
}
public static void main(String[] args) {
FluxAndMonoExamples fluxAndMonoServices
= new FluxAndMonoExamples();
fluxAndMonoServices.fruitMonoFlatMap()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitMonoFlatMapMany()
.subscribe(s -> {
System.out.println("Mono -> s = " + s);
});
}
}
Output:
16:01:59.002 [main] INFO reactor.Mono.FlatMap.1 - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
16:01:59.003 [main] INFO reactor.Mono.FlatMap.1 - | request(unbounded)
16:01:59.003 [main] INFO reactor.Mono.FlatMap.1 - | onNext([M, a, n, g, o])
s = [M, a, n, g, o]
16:01:59.004 [main] INFO reactor.Mono.FlatMap.1 - | onComplete()
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - request(unbounded)
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(M)
Mono -> s = M
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(a)
Mono -> s = a
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(n)
Mono -> s = n
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(g)
Mono -> s = g
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onNext(o)
Mono -> s = o
16:01:59.006 [main] INFO reactor.Flux.MonoFlatMapMany.2 - onComplete()
Flux Simple Example
Flux represents an Asynchronous Sequence of 0-N Items. This is like a stream of 0 to N items, and we can do various transformations to this stream, including transforming it to an entirely different type of 0-N item stream.public class FluxAndMonoServices {
public Flux<String> fruitsFlux() {
return Flux.fromIterable(List.of("Mango","Orange","Banana")).log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFlux()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
15:31:01.901 [main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:31:01.902 [main] INFO reactor.Flux.Iterable.1 - | request(unbounded)
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Mango)
s = Mango
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Orange)
s = Orange
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onNext(Banana)
s = Banana
15:31:01.903 [main] INFO reactor.Flux.Iterable.1 - | onComplete()
Flux - map() Method Example
The map() transform the items emitted by this Flux by applying a synchronous function to each item.public class FluxAndMonoServices {
public Flux<String> fruitsFluxMap() {
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.map(String::toUpperCase);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxMap()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
s = MANGO
s = ORANGE
s = BANANA
Flux - filter() Method Example
The filter() method evaluates each source value against the given Predicate.public class FluxAndMonoServices {
public Flux<String> fruitsFluxFilter(int number) {
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.filter(s -> s.length() > number);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxFilter(5)
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
s = Orange
s = Banana
Flux - flatMap() and delayElements() Methods Example
public class FluxAndMonoServices {
public Flux<String> fruitsFluxFlatMap() {
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.flatMap(s -> Flux.just(s.split("")))
.log();
}
public Flux<String> fruitsFluxFlatMapAsync() {
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.flatMap(s -> Flux.just(s.split(""))
.delayElements(Duration.ofMillis(
new Random().nextInt(1000)
)))
.log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxFlatMap()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxFlatMapAsync()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
15:38:03.789 [main] INFO reactor.Flux.FlatMap.1 - onSubscribe(FluxFlatMap.FlatMapMain)
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - request(unbounded)
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(M)
s = M
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(g)
s = g
15:38:03.791 [main] INFO reactor.Flux.FlatMap.1 - onNext(o)
s = o
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(O)
s = O
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(r)
s = r
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(g)
s = g
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(e)
s = e
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(B)
s = B
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(n)
s = n
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onNext(a)
s = a
15:38:03.792 [main] INFO reactor.Flux.FlatMap.1 - onComplete()
15:38:03.793 [main] INFO reactor.Flux.FlatMap.2 - onSubscribe(FluxFlatMap.FlatMapMain)
15:38:03.793 [main] INFO reactor.Flux.FlatMap.2 - request(unbounded)
Flux - transform(), defaultIfEmpty() and switchIfEmpty() Methods
public class FluxAndMonoServices {
public Flux<String> fruitsFluxTransform(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.transform(filterData)
.log();
//.filter(s -> s.length() > number);
}
public Flux<String> fruitsFluxTransformDefaultIfEmpty(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.transform(filterData)
.defaultIfEmpty("Default")
.log();
}
public Flux<String> fruitsFluxTransformSwitchIfEmpty(int number) {
Function<Flux<String>,Flux<String>> filterData
= data -> data.filter(s -> s.length() > number);
return Flux.fromIterable(List.of("Mango","Orange","Banana"))
.transform(filterData)
.switchIfEmpty(Flux.just("Pineapple","Jack Fruit")
.transform(filterData))
.log();
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxTransform(5)
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxTransformDefaultIfEmpty(6)
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxTransformSwitchIfEmpty(6)
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
15:42:00.075 [main] INFO reactor.Flux.FilterFuseable.1 - | onSubscribe([Fuseable] FluxFilterFuseable.FilterFuseableSubscriber)
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | request(unbounded)
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onNext(Orange)
s = Orange
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onNext(Banana)
s = Banana
15:42:00.076 [main] INFO reactor.Flux.FilterFuseable.1 - | onComplete()
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onSubscribe([Fuseable] FluxDefaultIfEmpty.DefaultIfEmptySubscriber)
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - request(unbounded)
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onNext(Default)
s = Default
15:42:00.078 [main] INFO reactor.Flux.DefaultIfEmpty.2 - onComplete()
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - request(unbounded)
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onNext(Pineapple)
s = Pineapple
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onNext(Jack Fruit)
s = Jack Fruit
15:42:00.079 [main] INFO reactor.Flux.SwitchIfEmpty.3 - onComplete()
Flux - concat() and concatWith() Methods
public class FluxAndMonoServices {
public Flux<String> fruitsFluxConcat() {
var fruits = Flux.just("Mango","Orange");
var veggies = Flux.just("Tomato","Lemon");
return Flux.concat(fruits,veggies);
}
public Flux<String> fruitsFluxConcatWith() {
var fruits = Flux.just("Mango","Orange");
var veggies = Flux.just("Tomato","Lemon");
return fruits.concatWith(veggies);
}
public Flux<String> fruitsMonoConcatWith() {
var fruits = Mono.just("Mango");
var veggies = Mono.just("Tomato");
return fruits.concatWith(veggies);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxConcat()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxConcatWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsMonoConcatWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Tomato
Flux - merge() and mergeWith() Methods
Here is an example that demonstrates the above methods:
public class FluxAndMonoServices {
public Flux<String> fruitsFluxMerge() {
var fruits = Flux.just("Mango","Orange");
var veggies = Flux.just("Tomato","Lemon");
return Flux.merge(fruits,veggies);
}
public Flux<String> fruitsFluxMergeWith() {
var fruits = Flux.just("Mango","Orange");
var veggies = Flux.just("Tomato","Lemon");
return fruits.mergeWith(veggies);
}
public static void main(String[] args) {
FluxAndMonoServices fluxAndMonoServices
= new FluxAndMonoServices();
fluxAndMonoServices.fruitsFluxMerge()
.subscribe(s -> {
System.out.println("s = " + s);
});
fluxAndMonoServices.fruitsFluxMergeWith()
.subscribe(s -> {
System.out.println("s = " + s);
});
}
}
Output:
s = Mango
s = Orange
s = Tomato
s = Lemon
s = Mango
s = Orange
s = Tomato
s = Lemon
Related Java Reactive Programming Examples
Reactive Programming in Java
Spring WebFlux Tutorial
Spring Boot WebFlux MongoDB CRUD REST API Tutorial
In this tutorial, you will learn how to build CRUD REST APIs using Spring Boot, Spring WebFlux, and MongoDB NoSQL database.
Testing Spring WebFlux Reactive CRUD Rest APIs using WebTestClient
Spring WebFlux Functional Endpoints CRUD REST API Example
Unit Testing Spring WebFlux CRUD REST API using JUnit and Mockito
In this tutorial, we will learn how to unit test Spring WebFlux controller (Reactive CRUD REST APIs) using JUnit and Mockito frameworks.
Comments
Post a Comment
Leave Comment