zoukankan      html  css  js  c++  java
  • 理解 RxJava 的线程模型


    来源:鸟窝,

    colobu.com/2016/07/25/understanding-rxjava-thread-model/

    如有好文章投稿,请点击 → 这里了解详情


    ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io。


    Netflix参考微软的Reactive Extensions创建了Java的实现RxJava,主要是为了简化服务器端的并发。2013年二月份,Ben Christensen 和 Jafar Husain发在Netflix技术博客的一篇文章第一次向世界展示了RxJava。


    RxJava也在Android开发中得到广泛的应用。


    ReactiveX

    An API for asynchronous programming with observable streams.

    A combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.


    虽然RxJava是为异步编程实现的库,但是如果不清楚它的使用,或者错误地使用了它的线程调度,反而不能很好的利用它的异步编程提到系统的处理速度。本文通过实例演示错误的RxJava的使用,解释RxJava的线程调度模型,主要介绍Scheduler、observeOn和subscribeOn的使用。


    本文中的例子以并发发送http request请求为基础,通过性能检验RxJava的线程调度。


    第一个例子,性能超好?


    我们首先看第一个例子:


      public static void testRxJavaWithoutBlocking(int count) throws Exception {

        CountDownLatch finishedLatch = new CountDownLatch(1);

        long t = System.nanoTime();

        Observable.range(0, count).map(i -> {

            //System.out.println("A:" + Thread.currentThread().getName());

            return 200;

        }).subscribe(statusCode -> {

            //System.out.println("B:" + Thread.currentThread().getName());

        }, error -> {

        }, () -> {

            finishedLatch.countDown();

        });

        finishedLatch.await();

        t = (System.nanoTime() - t) / 1000000; //ms

        System.out.println("RxJavaWithoutBlocking TPS: " + count * 1000 / t);

    }


    这个例子是一个基本的RxJava的使用,利用Range创建一个Observable, subscriber处理接收的数据。因为整个逻辑没有阻塞,程序运行起来很快,

    输出结果为:


    RxJavaWithoutBlocking TPS: 7692307 。


    2 加上业务的模拟,性能超差


    上面的例子是一个理想化的程序,没雨任何阻塞。我们模拟一下实际的应用,加上业务处理。


    业务逻辑是发送一个http的请求,httpserver是一个模拟器,针对每个请求有30毫秒的延迟。subscriber统计请求结果:


    public static void testRxJavaWithBlocking(int count) throws Exception {

            URL url = new URL("http://127.0.0.1:8999/");

            CountDownLatch finishedLatch = new CountDownLatch(1);

            long t = System.nanoTime();

            Observable.range(0, count).map(i -> {

                try {

                    HttpURLConnection conn = (HttpURLConnection) url.openConnection();

                    conn.setRequestMethod("GET");

                    int responseCode = conn.getResponseCode();

                    BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));

                    String inputLine;

                    while ((inputLine = in.readLine()) != null) {

                        //response.append(inputLine);

                    }

                    in.close();

                    return responseCode;

                } catch (Exception ex) {

                    return -1;

                }

            }).subscribe(statusCode -> {

            }, error -> {

            }, () -> {

                finishedLatch.countDown();

            });

            finishedLatch.await();

            t = (System.nanoTime() - t) / 1000000; //ms

            System.out.println("RxJavaWithBlocking TPS: " + count * 1000 / t);

        }


    运行结果如下:


    RxJavaWithBlocking TPS: 29。


    性能怎么突降呢,第一个例子看起来性能超好啊,http server只增加了一个30毫秒的延迟,导致这个方法每秒只能处理29个请求。


    如果我们估算一下, 29*30= 870 毫秒,大约1秒,正好和单个线程发送处理所有的请求的TPS差不多。


    后面我们也会看到,实际的确是一个线程处理的,你可以在代码中加入


    3 加上调度器,不起作用?


    如果你对subscribeOn和observeOn方法有些印象的话,可能会尝试使用调度器去解决:


    public static void testRxJavaWithBlocking(int count) throws Exception {

            URL url = new URL("http://127.0.0.1:8999/");

            CountDownLatch finishedLatch = new CountDownLatch(1);

            long t = System.nanoTime();

            Observable.range(0, count).map(i -> {

                try {

                    HttpURLConnection conn = (HttpURLConnection) url.openConnection();

                    conn.setRequestMethod("GET");

                    int responseCode = conn.getResponseCode();

                    BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));

                    String inputLine;

                    while ((inputLine = in.readLine()) != null) {

                        //response.append(inputLine);

                    }

                    in.close();

                    return responseCode;

                } catch (Exception ex) {

                    return -1;

                }

            }).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).subscribe(statusCode -> {

            }, error -> {

            }, () -> {

                finishedLatch.countDown();

            });

            finishedLatch.await();

            t = (System.nanoTime() - t) / 1000000; //ms

            System.out.println("RxJavaWithBlocking TPS: " + count * 1000 / t);

        }


    加上.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation())看一下性能:


    RxJavaWithBlocking TPS: 30。


    性能没有改观,是时候了解一下RxJava线程调度的问题了。


    4 RxJava的线程模型


    首先,依照Observable Contract, onNext是顺序执行的,不会同时由多个线程并发执行。



    默认情况下,它是在调用subscribe方法的那个线程中执行的。如第一个例子和第二个例子,Rx的操作和消息接收处理都是在同一个线程中执行的。一旦由阻塞,比如第二个例子,久会导致这个线程被阻塞,吞吐量下降。



    但是subscribeOn可以改变Observable的运行线程。



    上图中可以看到,如果你使用了subscribeOn方法,则Rx的运行将会切换到另外的线程上,而不是默认的调用线程。


    需要注意的是,如果在Observable链中调用了多个subscribeOn方法,无论调用点在哪里,Observable链只会使用第一个subscribeOn指定的调度器,正所谓”一见倾情”。


    但是onNext还是顺序执行的,所以第二个例子的性能依然低下。


    observeOn可以中途改变Observable链的线程。前面说了,subscribeOn方法改变的源Observable的整个的运行线程,要想中途切换线程,就需要observeOn方法。



    官方的一个简略晦涩的解释如下:


    The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.


    一图胜千言:



    注意箭头的颜色和横轴的颜色,不同的颜色代表不同的线程。


    5 Schedulers


    上面我们了解了RxJava可以使用subscribeOn和observeOn可以改变和切换线程,以及onNext是顺序执行的,不是并发执行,至多也就切换到另外一个线程,如果它中间的操作是阻塞的,久会影响整个Rx的执行。


    Rx是通过调度器来选择哪个线程执行的,RxJava内置了几种调度器,分别为不同的case提供线程:


    • io() : 这个调度器时用于I/O操作, 它可以增长或缩减来确定线程池的大小它是使用CachedThreadScheduler来实现的。需要注意的是,它的线程池是无限制的,如果你使用了大量的线程的话,可能会导致OutOfMemory等资源用尽的异常。


    • computation() : 这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(), debounce(), delay(), interval(), sample(), skip()。


    因为这些方法内部已经调用的调度器,所以你再调用subscribeOn是无效的,比如下面的例子总是使用computation调度器的线程。


    Observable.just(1,2,3)

                    .delay(1, TimeUnit.SECONDS)

                    .subscribeOn(Schedulers.newThread())

                    .map(i -> {

                        System.out.println("map: " + Thread.currentThread().getName());

                        return i;

                    })

                    .subscribe(i -> {});


    • immediate() :这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。


    • newThread() :创建一个新的线程只从。


    • trampoline() :为当前线程建立一个队列,将当前任务加入到队列中依次执行。


    同时,Schedulers还提供了from静态方法,用户可以定制线程池:


    ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

    Schedulers.from(es)


    6 改造,异步执行


    现在,我们已经了解了RxJava的线程运行,以及相关的调度器。可以看到上面的例子还是顺序阻塞执行的,即使是切换到另外的线程上,依然是顺序阻塞执行,显示它的吞吐率非常非常的低。下一步我们就要改造这个例子,让它能异步的执行。


    下面是一种改造方案,我先把代码贴出来,再解释:


    public static void testRxJavaWithFlatMap(int count) throws Exception {

        ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

        URL url = new URL("http://127.0.0.1:8999/");

        CountDownLatch finishedLatch = new CountDownLatch(1);

        long t = System.nanoTime();

        Observable.range(0, count).subscribeOn(Schedulers.io()).flatMap(i -> {

                    //System.out.println("A: " + Thread.currentThread().getName());

                    return Observable.just(i).subscribeOn(Schedulers.from(es)).map(v -> {

                                //System.out.println("B: " + Thread.currentThread().getName());

                                try {

                                    HttpURLConnection conn = (HttpURLConnection) url.openConnection();

                                    conn.setRequestMethod("GET");

                                    int responseCode = conn.getResponseCode();

                                    BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));

                                    String inputLine;

                                    while ((inputLine = in.readLine()) != null) {

                                        //response.append(inputLine);

                                    }

                                    in.close();

                                    return responseCode;

                                } catch (Exception ex) {

                                    return -1;

                                }

                            }

                    );

                }

        ).observeOn(Schedulers.computation()).subscribe(statusCode -> {

            //System.out.println("C: " + Thread.currentThread().getName());

        }, error -> {

        }, () -> {

            finishedLatch.countDown();

        });

        finishedLatch.await();

        t = (System.nanoTime() - t) / 1000000; //ms

        System.out.println("RxJavaWithFlatMap TPS: " + count * 1000 / t);

        es.shutdownNow();

    }


    通过flatmap可以将源Observable的元素项转成n个Observable,生成的每个Observable可以使用线程池并发的执行,同时flatmap还会将这n个Observable merge成一个Observable。你可以将其中的注释打开,看看线程的执行情况。


    性能还不错:


    RxJavaWithFlatMap TPS: 3906。


    FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable



    7 另一种解决方案


    我们已经清楚了要并行执行提高吞吐率的解决办法就是创建多个Observable并且并发执行。基于这种解决方案,我们还可以有其它的解决方案。


    上一方案中利用flatmap创建多个Observable,针对我们的例子,我们何不直接创建多个Observable呢?


    public static void testRxJavaWithParallel(int count) throws Exception {

        ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());

        URL url = new URL("http://127.0.0.1:8999/");

        CountDownLatch finishedLatch = new CountDownLatch(count);

        long t = System.nanoTime();

        for (int k = 0; k < count; k++) {

            Observable.just(k).map(i -> {

                //System.out.println("A: " + Thread.currentThread().getName());

                try {

                    HttpURLConnection conn = (HttpURLConnection) url.openConnection();

                    conn.setRequestMethod("GET");

                    int responseCode = conn.getResponseCode();

                    BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));

                    String inputLine;

                    while ((inputLine = in.readLine()) != null) {

                        //response.append(inputLine);

                    }

                    in.close();

                    return responseCode;

                } catch (Exception ex) {

                    return -1;

                }

            }).subscribeOn(Schedulers.from(es)).observeOn(Schedulers.computation()).subscribe(statusCode -> {

            }, error -> {

            }, () -> {

                finishedLatch.countDown();

            });

        }

        finishedLatch.await();

        t = (System.nanoTime() - t) / 1000000; //ms

        System.out.println("RxJavaWithParallel TPS: " + count * 1000 / t);

        es.shutdownNow();

    }


    性能更好一点:


    RxJavaWithParallel2 TPS: 4716。


    这个例子没有使用Schedulers.io()作为它的调度器,这是因为如果在大并发的情况下,可能会出现创建过多的线程导致资源不错,所以我们限定使用200个线程。


    8 总结


    • subscribeOn() 改变的Observable运行(operate)使用的调度器,多次调用无效。


    • observeOn() 改变Observable发送notifications的调度器,会影响后续的操作,可以多次调用


    • 默认情况下, 操作链使用的线程是调用subscribe()的线程


    • Schedulers提供了多个调度器,可以并行运行多个Observable


    • 使用RxJava可以实现异步编程,但是依然要小心线程阻塞。而且由于这种异步的编程,调试代码可能更加的困难


    9 参考文档


    • http://reactivex.io/documentation/contract.html


    • http://reactivex.io/documentation/operators/subscribeon.html 中文翻译


    • http://reactivex.io/documentation/operators/observeon.html 中文翻译


    • http://reactivex.io/documentation/scheduler.html


    • http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html


    • http://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.html


    • https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2 中文翻译


    • https://github.com/mcxiaoke/RxDocs

  • 相关阅读:
    HDU 1850 Being a Good Boy in Spring Festival
    UESTC 1080 空心矩阵
    HDU 2491 Priest John's Busiest Day
    UVALive 6181
    ZOJ 2674 Strange Limit
    UVA 12532 Interval Product
    UESTC 1237 质因子分解
    UESTC 1014 Shot
    xe5 android listbox的 TMetropolisUIListBoxItem
    xe5 android tts(Text To Speech)
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7452603.html
Copyright © 2011-2022 走看看