package com.test; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @Slf4j public class FluxFindWithSubscribe { // 1. Both have doOnError and onErrorContinue, only onErrorContinue will work no matter outside or inside of them // 2. Both have onErrorContinue inside and outside, onErrorContinue inside will work, onErrorContinue outside will not // 3. Both have doOnError inside and outside, both work, inside will log first // 4. if it is a API, will also output exception in log if do not has onErrorContinue just has doOnError public static void main(String[] args){ FluxFind fluxFind = new FluxFind(); Flux.from(fluxFind.find()) .doOnError(e -> log.error("Has Error outside")) //.onErrorContinue((e, o) -> log.error("Error occurred outside")) .subscribe(i ->"{}", i), e -> log.error("{}", e)); } }
package com.test; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; @Component @Slf4j public class FluxFind { public Flux<Integer> find(){ return Flux.from(doFind(1)) .concatWith(doFind(2)) //.doOnError(e -> log.error("Has Error inside")); .onErrorContinue((e, o) -> log.error("Error occurred inside")); } private Flux<Integer> doFind(int i){ return Flux.just(i).map(s ->{ if(s ==1){ throw new RuntimeException(); }else{ return s; }}); } }
18:15:06.147 [main] ERROR com.test.FluxFind - Error occurred inside
18:15:06.147 [main] INFO com.test.FluxFindWithSubscribe - 2
package com.test; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; @Component @Slf4j public class FluxFind { // For 3 layers of flux, inner layer and outside layer with onErrorContinue but no exception, // middle layer with exception. // Finally, outside catch the exception and onErrorContinue will work, result: // 18:34:03.217 [main] ERROR com.test.FluxFind - Error occurred outside // 18:34:03.218 [main] INFO com.test.FluxFind - 2 public static void main(String[] args) { FluxFind fluxFind = new FluxFind(); Flux.from(fluxFind.find()) .doOnError(e -> log.error("Has Error outside")) .onErrorContinue((e, o) -> log.error("Error occurred outside")) .subscribe(i ->"{}", i), e -> log.error("{}", e)); } public Flux<Integer> find(){ return Flux.from(doFind(1)) .concatWith(doFind(2)); //.doOnError(e -> log.error("Has Error inside")); //.onErrorContinue((e, o) -> log.error("Error occurred inside")); } private Flux<Integer> doFind(int i){ return getMapFlux(i).map(s ->{ if(s ==1){ throw new RuntimeException(); }else{ return s; }}); } private Flux<Integer> getMapFlux(int s){ return Flux.just(s) .map(i -> i) .onErrorContinue((e, o) -> log.error("Error occurred inside")); } }
package com.test; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Slf4j public class FluxFindString { // onErrorContinue will not work when Mono.error // 18:54:30.305 [main] ERROR com.test.FluxFindString - Has Error outside // 18:54:30.308 [main] ERROR com.test.FluxFindString - {} // java.lang.RuntimeException: null // at com.test.FluxFindString.doFind( // at com.test.FluxFindString.find( // at com.test.FluxFindString.main( public static void main(String[] args) { FluxFindString fluxFind = new FluxFindString(); Flux.from(fluxFind.find()) .doOnError(e -> log.error("Has Error outside")) .onErrorContinue((e, o) -> log.error("Error occurred outside")) .subscribe(i ->"{}", i), e -> log.error("{}", e)); } public Flux<String> find() { return Flux.from(doFind(null)) .concatWith(doFind("2")) .onErrorContinue((e, o) -> log.error("Error occurred inside")); } private Mono<String> doFind(String i) { return Mono.justOrEmpty(i).switchIfEmpty(Mono.error(new RuntimeException())); } }
public class OnErrorContinue2 { public static void main(String[] args) { // -5 // 数据:0,发生错误:java.lang.ArithmeticException: / by zero // -10 // 10 // 5 // Exception in thread "main" java.lang.ArithmeticException: / by zero // at com.test.reactor.OnErrorContinue2.main( Flux.just(-2, -1, 0, 1, 2) .map(n -> 10 / n) .onErrorContinue((e, n) -> System.err.println("数据:" + n + ",发生错误:" + e)) .subscribe(System.out::println, System.err::println); // Exception in thread "main" java.lang.ArithmeticException: / by zero // at com.test.reactor.OnErrorContinue2.main( /*Flux.just(10 / -2, 10 / -1, 10 / 0, 10 / 1, 10 / 2) .onErrorContinue((e, n) -> System.err.println("数据:" + n + ",发生错误:" + e)) .subscribe(System.out::println, System.err::println);*/ } }
* {@link Flux#onErrorContinue} works for {@link Throwable}, not works for {@link Mono#error} which create a
* {@link Mono} that terminates with the specified error immediately after being subscribed to.