Project Reactor

  • An implementation of reactive programming in java
  • It comes by default in spring boot

Publisher

A publisher/source can send one of the following:

  • An Item
  • A complete event (terminal event)
  • A failure event (terminal event)

Springboot Reactive Web

  • We return Mono<> in the controllers
  • Advantage is that there will be no threads hanging around. We have eliminated the waiting threads
  • The netty server is reactive and will handle the Monos, not the programmer

Create Mono/Flux

var monoSource = Mono.just(List.of("one", "two", "three"))
    .delayElement(Duration.of(2, SECONDS));
monoSource.subscribe(System.out::println);
 
// we use iterable to emit items one by one
// here each element will be emitted after 2 seconds
var fluxSource = Flux.fromIterable(List.of("f-one", "f-two", "f-three"))
    .delayElements(Duration.of(2, SECONDS));
fluxSource.subscribe(System.out::println);

Operators

  • log: can be useful to log the events/data received from source
  • filter
  • take
  • map
  • flatMap
  • count: returns Mono<Long>, we can subscribe to it to get the count. The Mono returns when terminal event is received.
ReactiveSources.intNumbersFlux()
    .count()
    .subscribe(System.out::println);
  • collectList: returns Mono<List<T>>, we can subscribe to it to get the list
ReactiveSources.intNumbersFlux()
    .collectList()
    .subscribe(System.out::println);
  • buffer(2): returns Flux<List<T>> every two elements will be collected and it returns those three elements down the chain in the form of flux. So we will get pair of items down the chain
// source: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
ReactiveSources.intNumbersFlux()
    .buffer(2)
    .map(list -> list.get(0) + list.get(1))
    .subscribe(System.out::println);
/** Output
3
7
11
15
19
**/

Error related Operators

  • doOnError: do something when error happens but also pass down the error in operator chain
  • onErrorContinue: do something when error happens but do not pass down the error and go to the next
// Here on error, subscribe function is not called, instead it goes to the next element
ReactiveSources.intNumbersFluxWithException()
    .onErrorContinue((e, item) -> System.out.println("Error!!! " + e.getMessage()))
    .subscribe(num -> System.out.println(num));
/*
1
2
Error!!! An error happened in the flux
4
5
*/
  • onErrorResume: do something when error happens, swallow the error and pass down the returned flux in the chain
ReactiveSources.intNumbersFluxWithException()
    .onErrorResume(e -> Flux.just(-1, -2)) // switch to new flux on error
    .subscribe(num -> System.out.println(num));
  • doFinally: run this when flux finally completes (terminal event received) regardless of success or failure
flux.doFinally(signalType -> {
    if (signalType == SignalType.ON_COMPLETE)
    {
        System.out.println("Done!!!");
    } else if (signalType == SignalType.ON_ERROR)
    {
        System.out.println("Error!!!");
    }
});

Operators to combine Reactive Source

  • zipWith: combines existing Mono with another Mono
// we generate another Mono with sum of two Monos
var result = Mono1.zipWith(Mono2)
                .map(value -> {
                    return value.getT1() + value.getT2();
                })