Project Reactor - Adding Delay to Mono and Flux
https://www.woolha.com/tutorials/project-reactor-adding-delay-to-mono-and-flux
package com.test.reactor; import java.util.ArrayList; import reactor.core.publisher.Mono; public class DelayTest { public static void main(String[] args) { DelayTest test = new DelayTest(); test.testDelay();; } public void testDelay(){ Mono.just(new ArrayList<String>()) .delayUntil(this::before) .delayUntil(this::doing) .delayUntil(this::after) .subscribe(System.out::println); } Mono<Void> before(ArrayList<String> arrayList){ System.out.println("add 1"); arrayList.add("1"); return Mono.empty().then(); } Mono<Void> doing(ArrayList<String> arrayList){ System.out.println("add 2"); arrayList.add("2"); return Mono.empty().then(); } Mono<Void> after(ArrayList<String> arrayList){ System.out.println("add 3"); arrayList.add("3"); return Mono.empty().then(); } }
add 1
add 2
add 3
[1, 2, 3]
package com.test.reactor; import java.time.Duration; import reactor.core.publisher.Flux; public class DelayTest2 { public static void main(String[] args) { DelayTest2 test = new DelayTest2(); test.testDelay();; } public void testDelay(){ Flux.just(1, 2, 3, 4, 5, 6, 7, 8) .delayElements(Duration.ofMillis(1000)) .buffer(Duration.ofMillis(2000), Duration.ofMillis(2000)) .subscribe(System.out::println); try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } } }
[1]
[2, 3]
[4, 5]
[6, 7]
[8]
after remove buffer every 1000ms emit 1element
1
2
3
4
5
6
7
8
package com.test.reactor; import java.time.Duration; import reactor.core.publisher.Flux; public class DelayTest2 { public static void main(String[] args) { DelayTest2 test = new DelayTest2(); test.testDelay();; } public void testDelay(){ Flux.just(1, 2, 3, 4, 5, 6, 7, 8) .delayUntil(a -> Flux.just(11, 21, 31).hide().delayElements(Duration.ofMillis(1000))) .subscribe(System.out::println); try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } } }
every 3000ms emit 1element
1
2
3
4
5
6