zoukankan      html  css  js  c++  java
  • RxJava 和 RxAndroid 五(线程调度)

    对rxJava不了解的同学可以先看

    RxJava 和 RxAndroid 一 (基础)
    RxJava 和 RxAndroid 二(操作符的使用)
    RxJava 和 RxAndroid 三(生命周期控制和内存优化)

    RxJava 和 RxAndroid 四(RxBinding的使用)

    本文将有几个例子说明,rxjava线程调度的正确使用姿势。

    例1

     Observable
                    .create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            Logger.v( "rx_call" , Thread.currentThread().getName()  );
    
                            subscriber.onNext( "dd");
                            subscriber.onCompleted();
                        }
                    })
                    .map(new Func1<String, String >() {
                        @Override
                        public String call(String s) {
                            Logger.v( "rx_map" , Thread.currentThread().getName()  );
                            return s + "88";
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                        }
                    }) ;
    

      结果

    /rx_call: main           -- 主线程
    /rx_map: main        --  主线程
    /rx_subscribe: main   -- 主线程

    例2

       new Thread(new Runnable() {
                @Override
                public void run() {
                    Logger.v( "rx_newThread" , Thread.currentThread().getName()  );
                    rx();
                }
            }).start();
    
     void rx(){
            Observable
                    .create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            Logger.v( "rx_call" , Thread.currentThread().getName()  );
    
                            subscriber.onNext( "dd");
                            subscriber.onCompleted();
                        }
                    })
                    .map(new Func1<String, String >() {
                        @Override
                        public String call(String s) {
                            Logger.v( "rx_map" , Thread.currentThread().getName()  );
                            return s + "88";
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                        }
                    }) ;
    
        }
    

     

          结果

    /rx_newThread: Thread-564   -- 子线程
    /rx_call: Thread-564              -- 子线程
    /rx_map: Thread-564            -- 子线程 
    /rx_subscribe: Thread-564    -- 子线程

    • 通过例1和例2,说明,Rxjava默认运行在当前线程中。如果当前线程是子线程,则rxjava运行在子线程;同样,当前线程是主线程,则rxjava运行在主线程

    例3

     Observable
                    .create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            Logger.v( "rx_call" , Thread.currentThread().getName()  );
    
                            subscriber.onNext( "dd");
                            subscriber.onCompleted();
                        }
                    })
    
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
    
                    .map(new Func1<String, String >() {
                        @Override
                        public String call(String s) {
                            Logger.v( "rx_map" , Thread.currentThread().getName()  );
                            return s + "88";
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                        }
                    }) ;
    

      结果

    /rx_call: RxCachedThreadScheduler-1    --io线程
    /rx_map: main                                     --主线程
    /rx_subscribe: main                              --主线程

    例4

     Observable
                    .create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            Logger.v( "rx_call" , Thread.currentThread().getName()  );
    
                            subscriber.onNext( "dd");
                            subscriber.onCompleted();
                        }
                    })
                    .map(new Func1<String, String >() {
                        @Override
                        public String call(String s) {
                            Logger.v( "rx_map" , Thread.currentThread().getName()  );
                            return s + "88";
                        }
                    })
    
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
    
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                        }
                    }) ; 

          结果

    /rx_call: RxCachedThreadScheduler-1     --io线程
    /rx_map: RxCachedThreadScheduler-1   --io线程
    /rx_subscribe: main                              --主线程

       

    • 通过例3、例4 可以看出  .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 写的位置不一样,造成的结果也不一样。从例4中可以看出 map() 操作符默认运行在事件产生的线程之中。事件消费只是在 subscribe() 里面。
    • 对于 create() , just() , from()   等                 --- 事件产生   

                   map() , flapMap() , scan() , filter()  等    --  事件加工

                  subscribe()                                          --  事件消费

    •   事件产生:默认运行在当前线程,可以由 subscribeOn()  自定义线程

             事件加工:默认跟事件产生的线程保持一致, 可以由 observeOn() 自定义线程

           事件消费:默认运行在当前线程,可以有observeOn() 自定义

    例5  多次切换线程

    Observable
                    .create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            Logger.v( "rx_call" , Thread.currentThread().getName()  );
    
                            subscriber.onNext( "dd");
                            subscriber.onCompleted();
                        }
                    })
    
                    .observeOn( Schedulers.newThread() )    //新线程
    
                    .map(new Func1<String, String >() {
                        @Override
                        public String call(String s) {
                            Logger.v( "rx_map" , Thread.currentThread().getName()  );
                            return s + "88";
                        }
                    })
    
                    .observeOn( Schedulers.io() )      //io线程
    
                    .filter(new Func1<String, Boolean>() {
                        @Override
                        public Boolean call(String s) {
                            Logger.v( "rx_filter" , Thread.currentThread().getName()  );
                            return s != null ;
                        }
                    })
    
                    .subscribeOn(Schedulers.io())     //定义事件产生线程:io线程
                    .observeOn(AndroidSchedulers.mainThread())     //事件消费线程:主线程
    
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                        }
                    }) ;

      结果

    /rx_call: RxCachedThreadScheduler-1           -- io 线程
    /rx_map: RxNewThreadScheduler-1             -- new出来的线程
    /rx_filter: RxCachedThreadScheduler-2        -- io线程
    /rx_subscribe: main                                   -- 主线程

    例6:只规定了事件产生的线程

           Observable
                    .create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                            subscriber.onNext( "dd" ) ;
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
                        }
                    }) ;
    

      结果

    /rx--create: RxCachedThreadScheduler-4                      // io 线程
    /rx--subscribe: RxCachedThreadScheduler-4                 // io 线程

         

    例:7:只规定事件消费线程

     Observable
                    .create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                            subscriber.onNext( "dd" ) ;
                        }
                    })
                    .observeOn( Schedulers.newThread() )
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
                        }
                    }) ;
    

      结果

    /rx--create: main                                           -- 主线程
    /rx--subscribe: RxNewThreadScheduler-1        --  new 出来的子线程 

          

        从例6可以看出,如果只规定了事件产生的线程,那么事件消费线程将跟随事件产生线程。

        从例7可以看出,如果只规定了事件消费的线程,那么事件产生的线程和 当前线程保持一致。

    例8:线程调度封装

     在Android 常常有这样的场景,后台处理处理数据,前台展示数据。

    一般的用法:

       Observable
                    .just( "123" )
                    .subscribeOn( Schedulers.io())
                    .observeOn( AndroidSchedulers.mainThread() )
                    .subscribe(new Action1() {
                        @Override
                        public void call(Object o) {
                        }
                    }) ;
    

      但是项目中这种场景有很多,所以我们就想能不能把这种场景的调度方式封装起来,方便调用。

    简单的封装

        public Observable apply( Observable observable ){
           return observable.subscribeOn( Schedulers.io() )
                    .observeOn( AndroidSchedulers.mainThread() ) ;
        }
    

    使用

      apply( Observable.just( "123" ) )
                    .subscribe(new Action1() {
                        @Override
                        public void call(Object o) {
    
                        }
                    }) ;
    

    弊端:虽然上面的这种封装可以做到线程调度的目的,但是它破坏了链式编程的结构,是编程风格变得不优雅。

    改进:Transformers 的使用(就是转化器的意思,把一种类型的Observable转换成另一种类型的Observable )

    改进后的封装

        Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
            @Override public Object call(Object observable) {
                return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    

      使用

          Observable
                    .just( "123" )
                    .compose( schedulersTransformer )
                    .subscribe(new Action1() {
                        @Override
                        public void call(Object o) {
                        }
                    }) ;
    

      弊端:虽然保持了链式编程结构的完整,但是每次调用 .compose( schedulersTransformer ) 都是 new 了一个对象的。所以我们需要再次封装,尽量保证单例的模式。

    改进后的封装

    package lib.app.com.myapplication;
    
    import rx.Observable;
    import rx.android.schedulers.AndroidSchedulers;
    import rx.schedulers.Schedulers;
    
    /**
     * Created by ${zyj} on 2016/7/1.
     */
    public class RxUtil {
    
        private final static Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
            @Override public Object call(Object observable) {
                return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    
       public static  <T> Observable.Transformer<T, T> applySchedulers() {
            return (Observable.Transformer<T, T>) schedulersTransformer;
        }
    
    }
    

      使用

        Observable
                    .just( "123" )
                    .compose( RxUtil.<String>applySchedulers() )
                    .subscribe(new Action1() {
                        @Override
                        public void call(Object o) {
                        }
                    }) ;
    

      

     

  • 相关阅读:
    codeforces 587B
    codeforces 552E
    算法竞赛模板 字典树
    算法竞赛模板 二叉树
    算法竞赛模板 图形面积计算
    TZOJ 1545 Hurdles of 110m(动态规划)
    算法竞赛模板 判断线段相交
    算法竞赛模板 折线分割平面
    TZOJ 3005 Triangle(判断点是否在三角形内+卡精度)
    算法竞赛模板 KMP
  • 原文地址:https://www.cnblogs.com/zhaoyanjun/p/5624395.html
Copyright © 2011-2022 走看看