Project Reactor
- An implementation of reactive programming in java
- It comes by default in spring boot
Publisher
A publisher/source can send one of the following:
- An Item
- A complete event (terminal event)
- A failure event (terminal event)
Springboot Reactive Web
- We return
Mono<>in the controllers - Advantage is that there will be no threads hanging around. We have eliminated the waiting threads
- The netty server is reactive and will handle the Monos, not the programmer
Create Mono/Flux
var monoSource = Mono.just(List.of("one", "two", "three"))
.delayElement(Duration.of(2, SECONDS));
monoSource.subscribe(System.out::println);
// we use iterable to emit items one by one
// here each element will be emitted after 2 seconds
var fluxSource = Flux.fromIterable(List.of("f-one", "f-two", "f-three"))
.delayElements(Duration.of(2, SECONDS));
fluxSource.subscribe(System.out::println);Operators
log: can be useful to log the events/data received from sourcefiltertakemapflatMapcount: returnsMono<Long>, we can subscribe to it to get the count. The Mono returns when terminal event is received.
ReactiveSources.intNumbersFlux()
.count()
.subscribe(System.out::println);collectList: returnsMono<List<T>>, we can subscribe to it to get the list
ReactiveSources.intNumbersFlux()
.collectList()
.subscribe(System.out::println);buffer(2): returnsFlux<List<T>>every two elements will be collected and it returns those three elements down the chain in the form of flux. So we will get pair of items down the chain
// source: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
ReactiveSources.intNumbersFlux()
.buffer(2)
.map(list -> list.get(0) + list.get(1))
.subscribe(System.out::println);
/** Output
3
7
11
15
19
**/Error related Operators
doOnError: do something when error happens but also pass down the error in operator chainonErrorContinue: do something when error happens but do not pass down the error and go to the next
// Here on error, subscribe function is not called, instead it goes to the next element
ReactiveSources.intNumbersFluxWithException()
.onErrorContinue((e, item) -> System.out.println("Error!!! " + e.getMessage()))
.subscribe(num -> System.out.println(num));
/*
1
2
Error!!! An error happened in the flux
4
5
*/onErrorResume: do something when error happens, swallow the error and pass down the returned flux in the chain
ReactiveSources.intNumbersFluxWithException()
.onErrorResume(e -> Flux.just(-1, -2)) // switch to new flux on error
.subscribe(num -> System.out.println(num));doFinally: run this when flux finally completes (terminal event received) regardless of success or failure
flux.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE)
{
System.out.println("Done!!!");
} else if (signalType == SignalType.ON_ERROR)
{
System.out.println("Error!!!");
}
});Operators to combine Reactive Source
zipWith: combines existing Mono with another Mono
// we generate another Mono with sum of two Monos
var result = Mono1.zipWith(Mono2)
.map(value -> {
return value.getT1() + value.getT2();
})