zoukankan      html  css  js  c++  java
  • spring-in-action-初识反应式编程reactor

    一:反应式编程

       

        1.反应式编程的优点

        1)无阻塞

         2)处理的数据可以是无限的

          打水仗:

           命令式编程:使用的是水气球。水气球的容量有限。把水气球装水,再扔向对方。如果想要更多的水,就需要更多的水气球。

           反应式编程:使用的是水管。只要水管的另一头不断的输入水,那么这边喷出的水就不会断。

     二反应式流

      1.基本概念

         他是一种规范。是对无阻塞-回压-异步流处理的标准

          他是一个异步编程范式,主要涉及数据流及变化的传播,可以看做是观察者设计模式的扩展。

                         java里头的iterator是以pull模型,即订阅者使用next去拉取下一个数据;而reactive streams则是以push模型为主

        2.JAVA流和反应式流的基本区别

        Java流:通常是同步的,且处理的数据有限

          反应式流:支持任意大小的数据集,包括无限的。且只要数据就绪,就可以实时的处理数据

        3.反应式流规范的定义

        反应式流的规范主要由4个接口组成     

        (1) Publisher 出版商 - 定义了发布者的方法

        (2) Subscriber 用户 - 接口定义了订阅者的方法

        (3) Subscription 订阅 - 控制数据开始和结束发送,且可以控制数据量

        (4) Processor 他是Publisher 和 Subscriber  的结合。定义了处理器

        1)Publisher源码

            public interface Publisher<T> {
            /**
                消费者订阅
             */
            public void subscribe(Subscriber<? super T> s);
        } 

        2)Subscriber源码

            public interface Subscriber<T> {
        /**
             接收Subscription 对象,获得的Subscription 对象可以控制数据传输开始和结束以及数据量
         */
        public void onSubscribe(Subscription s);
    
        /**
                获取下一个数据
         */
        public void onNext(T t);
    
        /**
         如果出错了,就会调用本方法
         */
        public void onError(Throwable t);
    
        /**
        如果publisher没有数据了,也就是数据传输完成了,会调用本方法
         */
        public void onComplete();
    }

        3)Subscription源码

    public interface Subscription {
        /**
            请求publisher发送数据, n表示数据量
         */
        public void request(long n);
    
        /**
        取消订阅
         */
        public void cancel();
    }

        4)Processor源码

    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }

       

    三:spring reactor

      1.基本概念

          他提供了反应式流规范的实现,提供了一组用于组装反应式流的函数式API

          reactor是push数据,当Subscriber调用onSubscribe方法是,传入一个Subscriber对象,此时,Subscriber对象的onSubscribe(Subscription subscription)方法会被调用,订阅者调用subscribe方法订阅,发布者调用订阅者的onNext通知订阅者新消息。也就是说,是发布者主动把数据推送给订阅者。

      被订阅者 (Publisher) 主动推送数据给 订阅者 (Subscriber),触发 onNext() 方法。异常和完成时触发另外两个方法。

      被订阅者 (Publisher) 发生异常,则触发 订阅者 (Subscriber)onError() 方法进行异常捕获处理。

      被订阅者 (Publisher) 每次推送都会触发一次 onNext() 方法。所有的推送完成且无异常时,onCompleted() 方法将 在最后 触发一次

            @Test
        public void testBackpressure(){
            Flux.just(1, 2, 3, 4)
                    .log()
                    .subscribe(new Subscriber<Integer>() {
                        private Subscription s;
                        int onNextAmount;

                        @Override
                        public void onSubscribe(Subscription s) {
                            this.s = s;
                            s.request(2);
                        }

                        @Override
                        public void onNext(Integer integer) {
                            try {
                                Thread.sleep(500);
                            } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                            System.out.println(integer);
                            onNextAmount++;
                            if (onNextAmount % 2 == 0) {
                                s.request(2);
                            }
                        }

                        @Override
                        public void onError(Throwable t) {}

                        @Override
                        public void onComplete() {}
                    });

            try {
                System.out.println("aaa");
            } catch (Exception e) {
                e.printStackTrace();
            }

    /*
            日志打印和输出结果

    16:41:48.160 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
    16:41:48.174 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription) //自动调用onSubscribe方法
    16:41:48.178 [main] INFO reactor.Flux.Array.1 - | request(2)  //调用request方法表示结束数据2个
    16:41:48.178 [main] INFO reactor.Flux.Array.1 - | onNext(1)   //调用onNext方法接收数据
    1
    16:41:48.680 [main] INFO reactor.Flux.Array.1 - | onNext(2)  //调用onNext方法接收数据
    2
    16:41:49.180 [main] INFO reactor.Flux.Array.1 - | request(2)  //调用request方法表示结束数据2个
    16:41:49.180 [main] INFO reactor.Flux.Array.1 - | onNext(3)  //调用onNext方法接收数据
    3
    16:41:49.681 [main] INFO reactor.Flux.Array.1 - | onNext(4)  //调用onNext方法接收数据
    4
    16:41:50.182 [main] INFO reactor.Flux.Array.1 - | request(2)  //调用request方法表示结束数据2个
    16:41:50.183 [main] INFO reactor.Flux.Array.1 - | onComplete() //发现没有数据了,结束
    aaa
            
            */

        2.Reactor提供的两个核心反应式类型

          (1) Mono:允许0-1个数据项

          (2) Flux:允许任意数量的数据项

        3.Reactor两个核心反应式类型的操作使用

        1)基本说明:由于Mono和Flux类似,且Flux使用频率更大,所以只拿Flux举例

          2)Flux的提供的操作可分为4类  

            ① 创建

            ② 组合

            ③ 转换

            ④ 逻辑

         3)创建操作

           (1)Flux.just创建Flux 

            下面使用 Flux.just创建了一个字符串Flux-just并且通过just.subscribe来订阅flux,把数据打印出来以验证输出的数据是否正确

        @Test
        public void test1(){
            
            //创建一个flux
            Flux<String> just = Flux.just("字符串1","字符串2","字符串3","字符串4","字符串5","字符串6");
            //订阅flux - 这里没有中间操作,数据直接到了订阅者
            just.subscribe(x->{
                System.out.println(x);
            });
            
        }    

          (2)使用StepVerifier来订阅和验证Flux

            上面我们通过just.subscribe来订阅flux,并且把数据打印出来以验证输出的数据是否正确,Reacotr提供了StepVerifier来订阅及验证Flux 

      @Test
        public void test2(){ //使用StepVerifier 来验证flux
            
            //创建一个flux
            Flux<String> just = Flux.just("字符串1","字符串2","字符串3","字符串4","字符串5","字符串6");
            
            //使用StepVerifier来测试flux
            StepVerifier.create(just)  //StepVerifier订阅flux
            .expectNext("字符串1")    //校验数据
            .expectNext("字符串2")
            .expectNext("字符串3")
            .expectNext("字符串4")
            .expectNext("字符串5")
            .expectNext("字符串6")
            .verifyComplete(); //校验正常完成
        }

          (3使用数组来创建flux-Flux.fromArray   

        @Test
        public void test3(){  //使用array创建flux
            
            String[] arr = {"字符串1","字符串2","字符串3","字符串4","字符串5","字符串6"};
            
            
            //创建一个flux
            Flux<String> just = Flux.fromArray(arr);
            
            //使用StepVerifier来测试flux
            StepVerifier.create(just)  //StepVerifier订阅flux
            .expectNext("字符串1")    //校验数据
            .expectNext("字符串2")
            .expectNext("字符串3")
            .expectNext("字符串4")
            .expectNext("字符串5")
            .expectNext("字符串6")
            .verifyComplete(); //校验正常完成
        }

          (4使用集合来创建flux-Flux.fromIterable

        @Test
        public void test4(){  //使用集合创建flux
            
            String[] arr = {"字符串1","字符串2","字符串3","字符串4","字符串5","字符串6"};
            List<Object> asList = Arrays.asList(arr);
            
            //创建一个flux
            Flux<Object> just = Flux.fromIterable(asList);
            
            //使用StepVerifier来测试flux
            StepVerifier.create(just)  //StepVerifier订阅flux
            .expectNext("字符串1")    //校验数据
            .expectNext("字符串2")
            .expectNext("字符串3")
            .expectNext("字符串4")
            .expectNext("字符串5")
            .expectNext("字符串6")
            .verifyComplete(); //校验正常完成
        }        

         (5)使用流创建Flux-Flux.fromStream

           @Test
        public void test5(){  //使用流创建flux
            
            String[] arr = {"字符串1","字符串2","字符串3","字符串4","字符串5","字符串6"};
            List<Object> asList = Arrays.asList(arr);
            
            Stream<Object> stream = asList.stream();
            
            //创建一个flux
            Flux<Object> just = Flux.fromStream(stream);
            
            //使用StepVerifier来测试flux
            StepVerifier.create(just)  //StepVerifier订阅flux
            .expectNext("字符串1")    //校验数据
            .expectNext("字符串2")
            .expectNext("字符串3")
            .expectNext("字符串4")
            .expectNext("字符串5")
            .expectNext("字符串6")
            .verifyComplete(); //校验正常完成
        }    

          (6)生成区间内的整数的flux

            @Test
        public void test6(){  //使用flux生成区间内的整数
            
            //创建一个flux
            Flux<Integer> just = Flux.range(5, 6); //从5开始,生成6个连续的整数
            
            //使用StepVerifier来测试flux
            StepVerifier.create(just)  //StepVerifier订阅flux
            .expectNext(5)    //校验数据
            .expectNext(6)
            .expectNext(7)
            .expectNext(8)
            .expectNext(9)
            .expectNext(10)
            .verifyComplete(); //校验正常完成
        }    

          (7)使用flux按时间间隔生成正整数-Flux.interval

            @Test
        public void test7(){  //使用flux按时间间隔生成正整数
            System.out.println("test7");
            //创建一个flux
            Flux<Long> just = Flux.interval(Duration.ofSeconds(2)).take(6);
            //Duration.ofSeconds(2)每2秒生成一个数字,由0开始,take表示生成6个数字
            //使用StepVerifier来测试flux
            StepVerifier.create(just)  //StepVerifier订阅flux
            .expectNext(0L)    //校验数据
            .expectNext(1L)
            .expectNext(2L)
            .expectNext(3L)
            .expectNext(4L)
            .expectNext(5L)
            .verifyComplete(); //校验正常完成
        }

        4)组合操作

          mergeWith

            把两个Flux组合形成一个新的Flux,新的Flux中的数据顺序和两个Flux的发布数据的时间顺序相同

           (1)使用mergeWithjustjust2组合成mergeWith  

            下面代码吧just和just2组合生成一个新的Flux-mergeWith,这里我们注意mergeWith打印的数据的顺序,just打印完后再打印的just2的数据。

      @Test
        public void test0(){
            
            //创建一个flux
            Flux<String> just = Flux.just("字符串1","字符串2");
            Flux<String> just2 = Flux.just("字符串1x","字符串2x");
            
            //订阅flux - 这里没有中间操作,数据直接到了订阅者
            Flux<String> mergeWith = just.mergeWith(just2);
            
            mergeWith.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });

        /* main-字符串1

    
    

          main-字符串2

    
    

          main-字符串1x

    
    

          main-字符串2x

         */

    }

          (2)使用mergeWithjustjust2组合成mergeWith,且使用delaySubscription方法和delayElements方法来控制组合的数据的顺序     

       @Test
        public void test2(){
            //创建一个flux
            Flux<String> just = Flux.just("字符串1","字符串2").delayElements(Duration.ofSeconds(1)); //每一个数据隔1s发布(第一个数据也是隔1s才发布)
            Flux<String> just2 = Flux.just("字符串1x","字符串2x").
                    delaySubscription(Duration.ofMillis(500)). //订阅后延迟0.5s发布开始数据
                    delayElements(Duration.ofSeconds(1));    ////每一个数据隔1s发布(第一个数据也是隔1s才发布)
            
            //订阅flux - 这里没有中间操作,数据直接到了订阅者
            Flux<String> mergeWith = just.mergeWith(just2);
            
            mergeWith.subscribe(x->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"=="+ x);
            });
            
            /*
            parallel-1-字符串1
            parallel-5-字符串1x
            parallel-7-字符串2
            parallel-1-字符串2x
            */
            
            StepVerifier.create(mergeWith)  //如果把这段代码去掉,那么打印结果就为空
            .expectNext("字符串1")
            .expectNext("字符串1x")
            .expectNext("字符串2")
            .expectNext("字符串2x")
            .verifyComplete();
            
            Thread t = Thread.currentThread();
            String name = t.getName();
            System.out.println(name +"线程");        
    //打印结果 main线程
    
            System.out.println("结束");   
        }    

          使用delayElements方法对just进行控制,指定它每个数据发布间隔1

          使用delayElements方法对just2进行控制,指定它每个数据发布间隔1

          使用delaySubscription方法对just2进行控制,指定开始时延迟0.5秒发布数据

          注意:delayElements对第一个数据同样有效,也就是说just等了1秒第一个数据发布,后面每隔1秒发布一个数据,而just2等了1.5(0.5+1)发布第一个数据,后面每隔1秒发布一个数据

          Flux特性:这里还有一个有意思的发现,这里出现了4个线程 parallel-1 parallel-5 parallel-7main数据在管道上处理的每一步,可能是同一个线程,也可能是不同的线程,我们不能判断数据会在哪个线程上执行

          组合后Flux数据顺序体现我们使用delayElements和delaySubscription来控制两个源数据的发布顺序,而新的Flux的数据输出和我们预期的一致,也就验证了组合后的Flux的数据顺序和组合的两个Flux的数据的发布顺序相同

          

        zipWith

          把两个Flux组合形成一个新的Flux,默认情况下他会把两个源flux的数据成对的放入Tuple2形成一个Tuple2Flux,也可以自定义数据的组合方式

         (1)使用zipwith来组合两个Flux  

          下面使用zipwith把just和just2组合生成一个新的Tuple2的Flux

        @Test
        public void testz1(){
            
            //创建一个flux
            Flux<String> just = Flux.just("字符串1","字符串2");
            Flux<String> just2 = Flux.just("字符串1x","字符串2x");
            
            //订阅flux - 这里没有中间操作,数据直接到了订阅者
             Flux<Tuple2<String, String>> mergeWith = just.zipWith(just2);  //返回的是一个Tuple2的一个flux,他会把两个源flux的数据承兑放入Tuple2
            
            mergeWith.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });
            
        /*    
            main-[字符串1,字符串1x]
            main-[字符串2,字符串2x]
         */
            }    

          (2)当两个Flux的数据数量不一致时,形成的Tuple2Flux中的数据将不会录入无法形成一对一的数据

          just有4个数据,just2只有2个数据,使用zipmap组合后结果生成了包含2个Tuple2的flux。just的后面的两个数据没有进入新的Flux

      @Test
        public void testz2(){
            
            //创建一个flux
            Flux<String> just = Flux.just("字符串1","字符串2","字符串3","字符串4");
            Flux<String> just2 = Flux.just("字符串1x","字符串2x");
            
             Flux<Tuple2<String, String>> mergeWith = just.zipWith(just2); //返回的是一个Tuple2的一个flux,他会把两个源flux的数据承兑放入Tuple2,如果两个flux的数据数量不一致,多的无法组成Tuple2,则多的数据不会进入新的flux
            
            mergeWith.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });
            
        /*    
            main-[字符串1,字符串1x]
            main-[字符串2,字符串2x]
         */
            
        }

          (3)自定义组合方式

          下面把两个Flux的数据按照 数据1&&数据2的格式拼接组合生成一个String的flux,而不是生成Tuple2的flux

      @Test
        public void testz3(){
            
            //创建一个flux
            Flux<String> just = Flux.just("字符串1","字符串2");
            Flux<String> just2 = Flux.just("字符串1x","字符串2x","字符串3x","字符串4x");
            
            Flux<String> mergeWith = Flux.zip(just, just2,(x,y)->{
                return x + "&&" + y;
            });
            
                mergeWith.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });
            
        /*    
            main-字符串1&&字符串1x
            main-字符串2&&字符串2x
         */
            
        }

        5)中间操作

        Flux的中间操作和Java8的Stream的中间操作基本一致 

    1. skip 跳过指定数量的数据

    2.take 只取前面指定数量的数据

    3. filter过滤

    4. Distinct 去重

    5. sort排序

    6.map

    7.flatmap

     

          (1)skip跳过指定数量的数据 

      @Test
        public void testx1(){
            
            //创建一个flux
            Flux<String> just = Flux.just("字符串1x","字符串2x","字符串3x","字符串4x").skip(2); //跳过前两个数据
            just.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });
            
        /*    
            main-字符串3x
            main-字符串4x
         */
        }

     

          (2)take只取前面指定数量的数据   

      @Test
        public void testx4(){
            
            //创建一个flux
            Flux<String> just = Flux.just("字符串1x","字符串2x","字符串3x","字符串4x").take(2); //只取前两个数据
            just.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });
            
        /*    
            main-字符串1x
            main-字符串2x
         */
            
        }

     

          (3)filter过滤

      @Test
        public void testx5(){
            
            //创建一个flux
            Flux<String> just = Flux.just("字符串1x","字符串2x","字符串3x","字符串4x","字符串","字符串5x").filter(x->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name + "-filter过滤中");
                return x.contains("x");
            }); //只取前两个数据
            just.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });
            
        /*    
            main-filter过滤中
            main-字符串1x
            main-filter过滤中
            main-字符串2x
            main-filter过滤中
            main-字符串3x
            main-filter过滤中
            main-字符串4x
            main-filter过滤中
            main-filter过滤中
            main-字符串5x
         */
            
        }

     

          (4)Distinct 去重   

      @Test
        public void testx6(){
            
            //创建一个flux
            Flux<String> just = Flux.just("字符串1x","字符串2x","字符串3x","字符串4x","字符串","字符串").distinct(); //只取前两个数据
            just.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });
            
        /*    
            main-字符串1x
            main-字符串2x
            main-字符串3x
            main-字符串4x
            main-字符串
         */
            
        }

          (5)sort 排序

          自然排序    

      @Test
        public void testx7(){
            
            //创建一个flux
            Flux<String> just = Flux.just("字符串1x","字符串2x","字符串3x","字符串4x","字符串7","字符串5").sort(); //只取前两个数据
            just.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });
            
        /*    
            main-字符串1x
            main-字符串2x
            main-字符串3x
            main-字符串4x
            main-字符串5
            main-字符串7
         */
            
        }

          自定义排序   

    @Test
        public void testx8(){
            
            //创建一个flux
            Flux<Integer> just = Flux.just(1,3,2,6,9,0,5,3).sort((x,y)->{
                return y.compareTo(x); //倒叙
            }); //只取前两个数据
            just.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });
            
        /*    
            main-9
            main-6
            main-5
            main-3
            main-3
            main-2
            main-1
            main-0
         */
        }

          (6)map 

          对Flux的数据进行处理后形成新的Flux,注意,只有一个主线程main

      @Test
        public void testx9(){
            
            //创建一个flux
            Flux<Integer> just = Flux.just(1,3,2,6).map(x->{
                
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-map处理中");
                
                return x * x;
            }); 
            just.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });
            
        /*    
            main-map处理中
            main-1
            main-map处理中
            main-9
            main-map处理中
            main-4
            main-map处理中
            main-36
         */
        }

          (7)flatmap

          对Flux的数据进行处理后形成的Flux合流形成新的Flux,注意,只有一个主线程main

      @Test  //只有一个主线程main
        public void testx10() throws InterruptedException{
            
            //创建一个flux
            Flux just = Flux.just(1,3,2,6).flatMap(x->{
                
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-" + "flatmap转换中");
                
                return Flux.just(x).map(a->{
                    Thread t1 = Thread.currentThread();
                    String name1 = t.getName();
                    System.out.println(name1 +"-" + "map转换中");
                    return a * a;
                });
            }); 
            just.subscribe(x ->{
                Thread t = Thread.currentThread();
                String name = t.getName();
                System.out.println(name +"-"+ x);
            });
            
            Thread.sleep(3000);
            
        /*    
            main-flatmap转换中
            main-map转换中
            main-1
            main-flatmap转换中
            main-map转换中
            main-9
            main-flatmap转换中
            main-map转换中
            main-4
            main-flatmap转换中
            main-map转换中
            main-36
         */
        }

          (8)flatmap+subscribeOn  

          使用了subscribeOn后,为订阅者开辟了一个新的线程parallel-1

         @Test    //使用了subscribeOn 单独开了一个新的线程parallel
            public void testx11() throws InterruptedException{
                
                //创建一个flux
                Flux just = Flux.just(1,3,2,6).flatMap(x->{
                    
                    Thread t = Thread.currentThread();
                    String name = t.getName();
                    System.out.println(name +"-" + "flatmap转换中");
                    
                    return Flux.just(x).map(a->{
                        Thread t1 = Thread.currentThread();
                        String name1 = t.getName();
                        System.out.println(name1 +"-" + "map转换中");
                        return a * a;
                    });
                }).subscribeOn(Schedulers.parallel()); 
                just.subscribe(x ->{
                    Thread t = Thread.currentThread();
                    String name = t.getName();
                    System.out.println(name +"-"+ x);
                });
                
                Thread.sleep(3000);
                
            /*    
                parallel-1-flatmap转换中
                parallel-1-map转换中
                parallel-1-1
                parallel-1-flatmap转换中
                parallel-1-map转换中
                parallel-1-9
                parallel-1-flatmap转换中
                parallel-1-map转换中
                parallel-1-4
                parallel-1-flatmap转换中
                parallel-1-map转换中
                parallel-1-36
             */
            } 

    Flatmap使用了subscribeOn(Schedulers.parallel())会让多个订阅者并行执

      subscribeOn 方法Schedulers支持的并发模型说明

      .immediate()在当前线程中执行订阅

      .single()在一个单一的,可重复的线程中执行订阅。对所有的订阅者重用相同的线程

      .newSingle()针对每个订阅,使用专门的线程

      .elastic() 在从无界弹性线程池中拉取的线程中执行订阅。根据需要创建新的工作线程,并销毁空闲的工作线程(默认情况下,空闲60s后销毁)

      .parallel()从一个固定大小的线程池中拉取线程执行订阅,该线程池大大小和CPU的核心数一致


          (9)Collection操作

            @Test
                public void testx21(){
                    
                    //创建一个flux
                    Mono<List<Integer>> just = Flux.just(1,3,2,6).map(x->{
                        
                        Thread t = Thread.currentThread();
                        String name = t.getName();
                        System.out.println(name +"-map处理中");
                        
                        return x * x;
                    }).collect(Collectors.toList()); 
                    just.subscribe(x ->{
                        Thread t = Thread.currentThread();
                        String name = t.getName();
                        System.out.println(name +"-"+ x);
                    });
                }

     

          

    @Test
                public void testx22(){
                    
                    //创建一个flux
                    Mono<ConcurrentMap<Integer, List<Integer>>> just = Flux.just(1,3,2,6).map(x->{
                        
                        Thread t = Thread.currentThread();
                        String name = t.getName();
                        System.out.println(name +"-map处理中");
                        
                        return x * x;
                    }).collect(Collectors.groupingByConcurrent(x->{
                        return x;
                    }));
                    
                    just.subscribe(x ->{
                        Thread t = Thread.currentThread();
                        String name = t.getName();
                        System.out.println(name +"-"+ x);
                    });
                    
                }

           (10)map有多个订阅者

          下面just有两个订阅者。我们发现map这段代码被执行了两次,线程还是只有一个主线程main

          @Test    //两个订阅者  我们发现flatmap的操作执行了两遍 - 还是只有主线程main
            public void testx12() throws InterruptedException{
                
                //创建一个flux
                Flux just = Flux.just(1,3,2,6).flatMap(x->{
                    
                    Thread t = Thread.currentThread();
                    String name = t.getName();
                    System.out.println(name +"-" + "flatmap转换中");
                    
                    return Flux.just(x).map(a->{
                        Thread t1 = Thread.currentThread();
                        String name1 = t.getName();
                        System.out.println(name1 +"-" + "map转换中");
                        return a * a;
                    });
                }); 
                just.subscribe(x ->{
                    Thread t = Thread.currentThread();
                    String name = t.getName();
                    System.out.println(name +"-"+ x);
                });
                
                just.subscribe(x ->{
                    Thread t = Thread.currentThread();
                    String name = t.getName();
                    System.out.println(name +"*"+ x);
                });
                
                Thread.sleep(3000);
                
            /*    
                main-flatmap转换中
                main-map转换中
                main-1
                main-flatmap转换中
                main-map转换中
                main-9
                main-flatmap转换中
                main-map转换中
                main-4
                main-flatmap转换中
                main-map转换中
                main-36
                main-flatmap转换中
                main-map转换中
                main*1
                main-flatmap转换中
                main-map转换中
                main*9
                main-flatmap转换中
                main-map转换中
                main*4
                main-flatmap转换中
                main-map转换中
                main*36
             */
            }

          (11)flatmap + subscribeOn多个订阅者

        just有两个订阅者,flatmap这段代码执行了两次,且每个订阅者都开辟了一个线程:parallel-1,parallel-2。这里很好的体现了reactor的特点,异步并行处理

    @Test//flatmap使用了subscribeOn两个订阅者  我们发现flatmap的操作执行了两遍 - 每一个订阅者都创建了一个新的线程在执行
                public void testx14() throws InterruptedException{
                    
                    //创建一个flux
                    Flux just = Flux.just(1,3,2,6).flatMap(x->{
                        
                        Thread t = Thread.currentThread();
                        String name = t.getName();
                        System.out.println(name +"-" + "flatmap转换中");
                        
                        return Flux.just(x).map(a->{
                            Thread t1 = Thread.currentThread();
                            String name1 = t.getName();
                            System.out.println(name1 +"-" + "map转换中");
                            return a * a;
                        });
                    }).subscribeOn(Schedulers.parallel()); 
                    just.subscribe(x ->{
                        Thread t = Thread.currentThread();
                        String name = t.getName();
                        System.out.println(name +"-"+ x);
                    });
                    
                    just.subscribe(x ->{
                        Thread t = Thread.currentThread();
                        String name = t.getName();
                        System.out.println(name +"*"+ x);
                    });
                    
                    Thread.sleep(3000);
                    
                /*    
                    parallel-1-flatmap转换中
                    parallel-2-flatmap转换中
                    parallel-1-map转换中
                    parallel-2-map转换中
                    parallel-1-1
                    parallel-2*1
                    parallel-1-flatmap转换中
                    parallel-2-flatmap转换中
                    parallel-1-map转换中
                    parallel-2-map转换中
                    parallel-1-9
                    parallel-2*9
                    parallel-1-flatmap转换中
                    parallel-2-flatmap转换中
                    parallel-1-map转换中
                    parallel-2-map转换中
                    parallel-1-4
                    parallel-2*4
                    parallel-1-flatmap转换中
                    parallel-2-flatmap转换中
                    parallel-1-map转换中
                    parallel-2-map转换中
                    parallel-1-36
                    parallel-2*36
                 */
                }
            

     

        总结每一个订阅者,Flux的中间操作都会执行一遍  

        6)逻辑操作

          (1)ALL全部匹配返回true

            @Test
                public void testx23(){
                    
                    //创建一个flux
                    Mono<Boolean> just = Flux.just(1,3,2,6).map(x->{
                        
                        Thread t = Thread.currentThread();
                        String name = t.getName();
                        System.out.println(name +"-map处理中");
                        
                        return x * x;
                    }).all(y ->{
                        return y > 4;
                    });
                    
                    just.subscribe(x ->{
                        Thread t = Thread.currentThread();
                        String name = t.getName();
                        System.out.println(name +"-"+ x);
                    });
                    
                }

          (2)ANY任意一个匹配返回true

            @Test
                public void testx24(){
                    
                    //创建一个flux
                    Mono<Boolean> just = Flux.just(1,3,2,6).map(x->{
                        
                        Thread t = Thread.currentThread();
                        String name = t.getName();
                        System.out.println(name +"-map处理中");
                        
                        return x * x;
                    }).any(y ->{
                        return y > 4;
                    });
                    
                    just.subscribe(x ->{
                        Thread t = Thread.currentThread();
                        String name = t.getName();
                        System.out.println(name +"-"+ x);
                    });
                    
                }
  • 相关阅读:
    爬虫练习
    爬取豆瓣电影top250
    简单爬虫
    正则提取子域名和ip
    用户体验培训总结
    测试经验总结
    项目管理知识总结
    读书笔记——《留住好员工:爱他们,还是失去他们?》
    ISTQB学习笔记
    数据结构和算法with Python
  • 原文地址:https://www.cnblogs.com/jthr/p/13915445.html
Copyright © 2011-2022 走看看