zoukankan      html  css  js  c++  java
  • RxJava 2.x 理解-2

    操作符总结:

    http://reactivex.io/documentation/operators.html

    https://github.com/ReactiveX/RxJava/wiki

    Operators By Category

    Creating Observables : 创建

    Operators that originate new Observables.

    • Create — create an Observable from scratch by calling observer methods programmatically
    •         —  基本创建方式
    • Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
    •         — 当观察者订阅时,才创建Observable,而不是像Create一样,执行Create就创建好了,在subscribe时才创建这个对象。并且针对每个观察者创建都是一个新的Observable。
    • Empty/Never/Throw — create Observables that have very precise and limited behavior
    •         —  创建一个什么都不做直接onComplete的Observable
    •         —  创建一个什么都不做直接onError的Observable
    •         —  创建一个什么都不做的Observable
    • From — convert some other object or data structure into an Observable
    •         —  发送集合
    • Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
    •         —  创建一个按照给定的时间间隔发射从0开始的整数序列的
    • Just — convert an object or a set of objects into an Observable that emits that or those objects
    •         —  发送可变参数
    • Range — create an Observable that emits a range of sequential integers
    •         —  创建一个发射指定范围的整数序列Observable<Integer>
    • Repeat — create an Observable that emits a particular item or sequence of items repeatedly
    •         —  重复发送,前提是已经有Obserable:  http://www.introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#Repeat
    • Start — create an Observable that emits the return value of a function
    • Timer — create an Observable that emits a single item after a given delay
    •         —  计时器,延迟一段时间后执行

    Transforming Observables :变换

    Operators that transform items that are emitted by an Observable.

    • Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
    •         —  缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
    • FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
    •         —  扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
    • GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
    •         —  分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。
    • Map — transform the items emitted by an Observable by applying a function to each item
    •         —  映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
    • Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
    •         —  扫描, 连续地对数据序列的每一项应用一个函数,然后连续发射结果,每一项结果基于之前的结果  --- 可以说Scan是 累加器函数.
    • Window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time
    •         —  窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些Observable窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集.

    Filtering Observables :过滤

    Operators that selectively emit items from a source Observable.

    • Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
    •         —   只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作 : RxJava(七) 使用debounce操作符 优化app搜索功能
    • Distinct — suppress duplicate items emitted by an Observable
    •         —   去重,过滤掉重复数据项
    • ElementAt — emit only item n emitted by an Observable
    •         —   取值,取特定位置的数据项
    • Filter — emit only those items from an Observable that pass a predicate test
    •         —   过滤,过滤掉没有通过谓词测试的数据项,只发射通过满足条件的
    • First — emit only the first item, or the first item that meets a condition, from an Observable
    •         —   首项,只发射满足条件的第一条数据,即使后面满足的也不管了
    • IgnoreElements — do not emit any items from an Observable but mirror its termination notification
    •         —   忽略所有的数据,只保留终止通知(onError或onCompleted)
    • Last — emit only the last item emitted by an Observable
    •         —   末项,只发射满足条件的最后一条数据
    • Sample — emit the most recent item emitted by an Observable within periodic time intervals
    •         —  在周期的时间间隔内,发射最新的数据,等于是数据抽样
    • Skip — suppress the first n items emitted by an Observable
    •         —  跳过前面的若干项数据
    • SkipLast — suppress the last n items emitted by an Observable
    •         —  跳过后面的若干项数据
    • Take — emit only the first n items emitted by an Observable
    •         —  只保留前面的若干项数据
    • TakeLast — emit only the last n items emitted by an Observable
    •         — 只保留后面的若干项数据

    Combining Observables:组合

    Operators that work with multiple source Observables to create a single Observable

    • And/Then/When — combine sets of items emitted by two or more Observables by means of Patternand Plan intermediaries
    •         —  通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集
    • CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
    •         —  当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
    • Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
    •         —  无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
    • Merge — combine multiple Observables into one by merging their emissions
    •         —  将两个Observable发射的数据组合并成一个
    • StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
    •         —  在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项
    • Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
    •         —  将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据
    • Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
    •         —  打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射

    Error Handling Operators :错误处理

    Operators that help to recover from error notifications from an Observable

    • Catch — recover from an onError notification by continuing the sequence without error
    •         —  捕获,继续序列操作,将错误替换为正常的数据,从onError通知中恢复
    • Retry — if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error
    •         —  重试,如果Observable发射了一个错误通知,重新订阅它,期待它正常终止

    Observable Utility Operators :辅助操作

    A toolbox of useful Operators for working with Observables

    Conditional and Boolean Operators : 条件和布尔操作

    Operators that evaluate one or more Observables or items emitted by Observables

    • All — determine whether all items emitted by an Observable meet some criteria
    •         —  判断Observable发射的所有的数据项是否都满足某个条件
    • Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
    •         —  给定多个Observable,只让第一个发射数据的Observable发射全部数据
    • Contains — determine whether an Observable emits a particular item or not
    •         —  判断Observable是否会发射一个指定的数据项
    • DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
    •         —  发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
    • SequenceEqual — determine whether two Observables emit the same sequence of items
    •         —  判断两个Observable是否按相同的数据序列
    • SkipUntil — discard items emitted by an Observable until a second Observable emits an item
    •         —  丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
    • SkipWhile — discard items emitted by an Observable until a specified condition becomes false
    •         —  丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
    • TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
    •         —  发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
    • TakeWhile — discard items emitted by an Observable after a specified condition becomes false
    •         —  发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据

    Mathematical and Aggregate Operators : 算术和聚合操作

    Operators that operate on the entire sequence of items emitted by an Observable

    • Average — calculates the average of numbers emitted by an Observable and emits this average
    •         —  计算Observable发射的数据序列的平均值,然后发射这个结果
    • Concat — emit the emissions from two or more Observables without interleaving them
    •         —  合并,不交错的连接多个Observable的数据(按顺序)/ 和 merge 不一样,这个是混序的
    • Count — count the number of items emitted by the source Observable and emit only this value
    •         —  计算Observable发射的数据个数,然后发射这个结果
    • Max — determine, and emit, the maximum-valued item emitted by an Observable
    •         —  计算并发射数据序列的最大值
    • Min — determine, and emit, the minimum-valued item emitted by an Observable
    •         —  计算并发射数据序列的最小值
    • Reduce — apply a function to each item emitted by an Observable, sequentially, and emit the final value
    •         —  按顺序对数据序列的每一个应用某个函数,然后返回这个值
    • Sum — calculate the sum of numbers emitted by an Observable and emit this sum
    •         —  计算并发射数据序列的和

    Connectable Observable Operators : 连接

    Specialty Observables that have more precisely-controlled subscription dynamics

    • Connect — instruct a connectable Observable to begin emitting items to its subscribers
    •         —  指示一个可连接的Observable开始发射数据给订阅者
    • Publish — convert an ordinary Observable into a connectable Observable
    •         —  将一个普通的Observable转换为可连接的
    • RefCount — make a Connectable Observable behave like an ordinary Observable
    •         —  使一个可连接的Observable表现得像一个普通的Observable
    • Replay — ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items
    •         —  确保所有的观察者收到同样的数据序列,即使他们在Observable开始发射数据之后才订阅

    Operators to Convert Observables : 转换操作

    • To — convert an Observable into another object or data structure
    •         —  将Observable转换为其它的对象或数据结构

    操作符决策树

    几种主要的需求:

    • 直接创建一个Observable(创建操作)
    • 组合多个Observable(组合操作)
    • 对Observable发射的数据执行变换操作(变换操作)
    • 从Observable发射的数据中取特定的值(过滤操作)
    • 转发Observable的部分值(条件/布尔/过滤操作)
    • 对Observable发射的数据序列求值(算术/聚合操作)
    package pers.bolin.rxjava2demo;
    
    import android.support.v7.app.AppCompatActivity;
    import android.os.Bundle;
    import android.util.Log;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    import io.reactivex.CompletableObserver;
    import io.reactivex.Notification;
    import io.reactivex.Observable;
    import io.reactivex.ObservableEmitter;
    import io.reactivex.ObservableOnSubscribe;
    import io.reactivex.Observer;
    import io.reactivex.Scheduler;
    import io.reactivex.android.schedulers.AndroidSchedulers;
    import io.reactivex.annotations.NonNull;
    import io.reactivex.disposables.Disposable;
    import io.reactivex.functions.BiFunction;
    import io.reactivex.functions.Consumer;
    import io.reactivex.functions.Function;
    import io.reactivex.functions.Predicate;
    import io.reactivex.observables.GroupedObservable;
    import io.reactivex.schedulers.Schedulers;
    import io.reactivex.schedulers.Timed;
    
    public class MainActivity extends AppCompatActivity {
    
        private static final String TAG = "MainActivity";
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
    
            //rxJava2BaseUser();
            //rxJava2Thread();
            //rxJava2Just();
            //rxJava2From();
            //rxJava2Buff();
            //raJava2GroupBy();
            //rxJava2Scan();
            //rxJava2Window();
            //rxJava2Distinct();
            //rxJava2ElementAt();
            //rxJava2Filter();
            //rxJava2First();
            //rxJava2IgnoreElements();
            //rxJava2Last();
            //rxJava2Sample();
            //rxJava2Skip();
            //rxJava2Take();
            //rxJava2CombineLatest();
            //rxJava2Merge();
            //rxJava2StartWith();
            //rxJava2Switch();
            //rxJava2Zip();
            //rxJava2Catch();
            //rxJava2All();
            //rxJava2Delay();
            //rxJava2Do();
            //rxJava2TimeInterval();
            //rxJava2Timeout();
            //rxJava2Timestamp();
            rxJava2Concat();
        }
    
        /**
         * rajava2 的基本使用
         */
        private void rxJava2BaseUser() {
            Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                            emitter.onNext("1");
                            emitter.onNext("2");
                            emitter.onNext("3");
                            //throw new Exception("发生了错误");
                        }
                    })
                    .subscribe(new Observer<String>() {
    
                        Disposable disposable;
    
                        // 新增该方法
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            Log.d(TAG, "onSubscribe");
                            disposable = d;
                        }
    
                        @Override
                        public void onNext(@NonNull String s) {
                            Log.d(TAG, "Item: " + s);
                            if (s.equals("4"))
                                disposable.dispose(); // 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            Log.d(TAG, "onError:" + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    
            Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                            emitter.onNext("----- 01 -----");
                            emitter.onNext("----- 02 -----");
                            emitter.onNext("----- 03 -----");
                        }
                    })
                    // Consumer 和 RxJava 1 中的 Action1 类似
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, "Item: " + s);
                        }
                    });
            Log.d(TAG, " ----------------- empty -----------------");
    
            Observable.empty().subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    Log.d(TAG, "onSubscribe");
                }
    
                @Override
                public void onNext(@NonNull Object o) {
                    Log.d(TAG, "onNext");
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
    
            Log.d(TAG, " ----------------- error -----------------");
    
            Observable.error(new Exception()).subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull Object o) {
    
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    
    
        }
    
        /**
         * rajava2 线程
         */
        private void rxJava2Thread() {
            Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                            Log.d(TAG, "事件处理线程:" + Thread.currentThread().getId());
                            emitter.onNext("---- 1 ----");
                            emitter.onNext("---- 2 ----");
                            emitter.onNext("---- 3 ----");
                        }
                    })
                    .subscribeOn(Schedulers.trampoline())           // 指明被观察者处理的线程
                    .observeOn(AndroidSchedulers.mainThread())      // 指明观察者线程
                    .subscribe(new Observer<String>() {
    
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            Log.d(TAG, "onSubscribe:" + Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onNext(@NonNull String s) {
                            Log.d(TAG, "Item: " + s + " :" + Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            Log.d(TAG, "onError:" + e.getMessage() + " :" + Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete:" + Thread.currentThread().getName());
                        }
                    });
        }
    
        /**
         * rajava2 just 和 rajava1 的使用方式一致
         */
        private void rxJava2Just() {
            Observable
                    .just(1, 2, 3, 4, 5, 6, 7)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "integer:" + integer);
                        }
                    });
    
            Observable
                    .just(1, 2, 3, 4, 5, 6, 7, 8)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            Log.d(TAG, "onSubscribe");
                        }
    
                        @Override
                        public void onNext(@NonNull Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
        }
    
        /**
         * rajava2 from 和 rajava1 的使用方式一致
         */
        private void rxJava2From() {
            List<String> data = new ArrayList<>();
            data.add("hello world 1");
            data.add("hello world 2");
    
            Observable
                    .fromIterable(data)
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, s);
                        }
                    });
        }
    
        /**
         * rajava2 buff
         * 结果:
         * 02-10 22:32:18.758 21300-21300/pers.bolin.rxjava2demo D/MainActivity: integers:[1, 2, 3, 4]
         * 02-10 22:32:18.759 21300-21300/pers.bolin.rxjava2demo D/MainActivity: integers:[5, 6]
         */
        private void rxJava2Buff() {
            Observable.just(1, 2, 3, 4, 5, 6).buffer(4).subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> integers) throws Exception {
                    Log.d(TAG, "integers:" + integers.toString());
                }
            });
        }
    
        /**
         * rajava2 groupBy
         * 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
         * 结果:
         * 02-10 22:43:28.241 31495-31495/pers.bolin.rxjava2demo D/MainActivity: 奇数:1
         * 02-10 22:43:28.242 31495-31495/pers.bolin.rxjava2demo D/MainActivity: 偶数:2
         * 02-10 22:43:28.242 31495-31495/pers.bolin.rxjava2demo D/MainActivity: 奇数:3
         * 02-10 22:43:28.242 31495-31495/pers.bolin.rxjava2demo D/MainActivity: 偶数:4
         * 02-10 22:43:28.242 31495-31495/pers.bolin.rxjava2demo D/MainActivity: 奇数:5
         * 02-10 22:43:28.242 31495-31495/pers.bolin.rxjava2demo D/MainActivity: 偶数:6
         * 02-10 22:43:28.242 31495-31495/pers.bolin.rxjava2demo D/MainActivity: 奇数:7
         */
        private void raJava2GroupBy() {
            Observable.just(1, 2, 3, 4, 5, 6, 7).groupBy(new Function<Integer, Object>() {
                @Override
                public Object apply(@NonNull Integer integer) throws Exception {
                    if (integer % 2 == 0) {
                        return "偶数";
                    } else {
                        return "奇数";
                    }
                }
            }).subscribe(new Consumer<GroupedObservable<Object, Integer>>() {
                @Override
                public void accept(GroupedObservable<Object, Integer> objectIntegerGroupedObservable) throws Exception {
                    String type = (String) objectIntegerGroupedObservable.getKey();
    
                    if (type.equals("偶数")) {
                        objectIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.d(TAG, "偶数:" + integer);
                            }
                        });
                    } else if (type.equals("奇数")) {
                        objectIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.d(TAG, "奇数:" + integer);
                            }
                        });
                    }
                }
            });
        }
    
        /**
         * rajava2 scan
         * 扫描, 连续地对数据序列的每一项应用一个函数,然后连续发射结果,每一项结果基于之前的结果  --- 可以说Scan是 累加器函数.
         * 结果:
         * 02-10 22:54:18.927 11193-11193/? D/MainActivity: scan:h
         * 02-10 22:54:18.927 11193-11193/? D/MainActivity: scan:he
         * 02-10 22:54:18.927 11193-11193/? D/MainActivity: scan:hel
         * 02-10 22:54:18.928 11193-11193/? D/MainActivity: scan:hell
         * 02-10 22:54:18.928 11193-11193/? D/MainActivity: scan:hell,
         * 02-10 22:54:18.928 11193-11193/? D/MainActivity: scan:hell,w
         * 02-10 22:54:18.928 11193-11193/? D/MainActivity: scan:hell,wo
         * 02-10 22:54:18.928 11193-11193/? D/MainActivity: scan:hell,wor
         * 02-10 22:54:18.928 11193-11193/? D/MainActivity: scan:hell,worl
         * 02-10 22:54:18.928 11193-11193/? D/MainActivity: scan:hell,world
         * 02-10 22:54:18.928 11193-11193/? D/MainActivity: scan:hell,world!
         */
        private void rxJava2Scan() {
            Observable.fromArray("h", "e", "l", "l", ",", "w", "o", "r", "l", "d", "!").scan(new BiFunction<String, String, String>() {
                @Override
                public String apply(@NonNull String s, @NonNull String s2) throws Exception {
                    // Log.d(TAG, "apply s :" + s);
                    return s + s2;
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, "scan:" + s);
                }
            });
        }
    
    
        /**
         * rajava2 window
         * 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些Observable窗口,而不是每次发射一项。
         * 类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集.
         * 结果:
         * 02-10 23:33:44.219 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onSubscribe 1
         * 02-10 23:33:44.221 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 1
         * 02-10 23:33:44.222 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onSubscribe 2
         * 02-10 23:33:44.222 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 2:h
         * 02-10 23:33:44.222 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 2:e
         * 02-10 23:33:44.222 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 2:l
         * 02-10 23:33:44.222 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onComplete 2
         * 02-10 23:33:44.222 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 1
         * 02-10 23:33:44.222 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onSubscribe 2
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 2:l
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 2:,
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 2:w
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onComplete 2
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 1
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onSubscribe 2
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 2:o
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 2:r
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 2:l
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onComplete 2
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 1
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onSubscribe 2
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 2:d
         * 02-10 23:33:44.223 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onNext 2:!
         * 02-10 23:33:44.224 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onComplete 2
         * 02-10 23:33:44.224 18440-18440/pers.bolin.rxjava2demo D/MainActivity: onComplete 1
         */
        private void rxJava2Window() {
            Observable.fromArray("h", "e", "l", "l", ",", "w", "o", "r", "l", "d", "!").window(3).subscribe(new Observer<Observable<String>>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    Log.d(TAG, "onSubscribe 1");
                }
    
                @Override
                public void onNext(@NonNull Observable<String> stringObservable) {
                    Log.d(TAG, "onNext 1");
    
                    stringObservable.subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            Log.d(TAG, "onSubscribe 2");
                        }
    
                        @Override
                        public void onNext(@NonNull String s) {
                            Log.d(TAG, "onNext 2:" + s);
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            Log.d(TAG, "onError 2");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete 2");
                        }
                    });
                }
    
    
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.d(TAG, "onError 1");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete 1");
                }
            });
        }
    
        /**
         * rxJava   distinct
         * 去重,过滤掉重复数据项
         * 结果:
         * 02-11 09:04:50.897 21403-21403/pers.bolin.rxjava2demo D/MainActivity: accept:h
         * 02-11 09:04:50.897 21403-21403/pers.bolin.rxjava2demo D/MainActivity: accept:e
         * 02-11 09:04:50.897 21403-21403/pers.bolin.rxjava2demo D/MainActivity: accept:l
         * 02-11 09:04:50.897 21403-21403/pers.bolin.rxjava2demo D/MainActivity: accept:,
         * 02-11 09:04:50.898 21403-21403/pers.bolin.rxjava2demo D/MainActivity: accept:w
         * 02-11 09:04:50.898 21403-21403/pers.bolin.rxjava2demo D/MainActivity: accept:o
         * 02-11 09:04:50.898 21403-21403/pers.bolin.rxjava2demo D/MainActivity: accept:r
         * 02-11 09:04:50.898 21403-21403/pers.bolin.rxjava2demo D/MainActivity: accept:d
         * 02-11 09:04:50.898 21403-21403/pers.bolin.rxjava2demo D/MainActivity: accept:!
         */
        private void rxJava2Distinct() {
            Observable.fromArray("h", "e", "l", "l", ",", "w", "o", "r", "l", "d", "!").distinct().subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, "accept:" + s);
                }
            });
    
        }
    
        /**
         * rxJava ElementAt
         * 取值,取特定位置的数据项
         * 结果:
         * 02-11 09:07:23.761 24523-24523/pers.bolin.rxjava2demo D/MainActivity: accept:w
         */
        private void rxJava2ElementAt() {
            Observable.fromArray("h", "e", "l", "l", ",", "w", "o", "r", "l", "d", "!").elementAt(5).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, "accept:" + s);
                }
            });
        }
    
        /**
         * rxJava Filter
         * 过滤,过滤掉没有通过谓词测试的数据项,只发射通过满足条件的
         * 结果:
         * 02-11 09:11:33.980 28931-28931/? D/MainActivity: accept:8
         * 02-11 09:11:33.980 28931-28931/? D/MainActivity: accept:9
         * 02-11 09:11:33.980 28931-28931/? D/MainActivity: accept:10
         */
        private void rxJava2Filter() {
            Observable.fromArray(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).filter(new Predicate<Integer>() {
                @Override
                public boolean test(@NonNull Integer integer) throws Exception {
                    return integer > 7;
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "accept:" + integer);
                }
            });
        }
    
        /**
         * rxJava First
         * 首项,只发射满足条件的第一条数据,first(T defaultItem),参数是默认的item
         * 结果:
         * 02-11 09:17:47.644 3991-3991/pers.bolin.rxjava2demo D/MainActivity: accept:11
         */
        private void rxJava2First() {
            Observable
                    .fromArray(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                    .filter(new Predicate<Integer>() {
                        @Override
                        public boolean test(@NonNull Integer integer) throws Exception {
                            return integer > 10;
                        }
                    })
                    //first(T defaultItem)
                    .first(11).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "accept:" + integer);
                }
            });
        }
    
    
        /**
         * rxJava IgnoreElements
         * 忽略所有的数据,只保留终止通知(onError或onCompleted)
         * 结果:
         * 02-11 09:21:37.641 8822-8822/pers.bolin.rxjava2demo D/MainActivity: onSubscribe
         * 02-11 09:21:37.642 8822-8822/pers.bolin.rxjava2demo D/MainActivity: onError:java.lang.Exception: 抛出错误信息
         */
        private void rxJava2IgnoreElements() {
            Observable
                    .error(new Exception("抛出错误信息"))
                    .ignoreElements()
                    .subscribe(new CompletableObserver() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            Log.d(TAG, "onSubscribe");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            Log.d(TAG, "onError:" + e);
                        }
                    });
        }
    
        /**
         * rxJava Last
         * 末项,只发射满足条件的最后一条数据
         * 结果:
         * 02-11 09:37:36.570 26512-26512/pers.bolin.rxjava2demo D/MainActivity: accept:8
         */
        private void rxJava2Last() {
            Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
                    .filter(new Predicate<Integer>() {
                        @Override
                        public boolean test(@NonNull Integer integer) throws Exception {
                            return integer > 5;
                        }
                    })
                    .last(0).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "accept:" + integer);
                }
            });
        }
    
        /**
         * rxJava Sample
         * 在周期的时间间隔内,发射最新的数据,等于是数据抽样
         * 结果:
         * 02-11 09:42:44.323 30993-31516/pers.bolin.rxjava2demo D/MainActivity: accept:0
         * 02-11 09:42:46.324 30993-31516/pers.bolin.rxjava2demo D/MainActivity: accept:2
         * 02-11 09:42:48.325 30993-31516/pers.bolin.rxjava2demo D/MainActivity: accept:4
         * 02-11 09:42:50.324 30993-31516/pers.bolin.rxjava2demo D/MainActivity: accept:6
         * 02-11 09:42:52.325 30993-31516/pers.bolin.rxjava2demo D/MainActivity: accept:8
         * 02-11 09:42:54.325 30993-31516/pers.bolin.rxjava2demo D/MainActivity: accept:10
         * 02-11 09:42:56.324 30993-31516/pers.bolin.rxjava2demo D/MainActivity: accept:12
         */
        private void rxJava2Sample() {
            Observable.interval(1, TimeUnit.SECONDS).sample(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG, "accept:" + aLong);
                }
            });
        }
    
        /**
         * rxJava Skip
         * 跳过前面的若干项数据
         * 结果:
         * 02-11 09:47:28.912 4294-4294/pers.bolin.rxjava2demo D/MainActivity: accept:4
         * 02-11 09:47:28.912 4294-4294/pers.bolin.rxjava2demo D/MainActivity: accept:5
         * 02-11 09:47:28.912 4294-4294/pers.bolin.rxjava2demo D/MainActivity: accept:6
         * 02-11 09:47:28.912 4294-4294/pers.bolin.rxjava2demo D/MainActivity: accept:7
         * 02-11 09:47:28.912 4294-4294/pers.bolin.rxjava2demo D/MainActivity: accept:8
         */
        private void rxJava2Skip() {
            Observable.just(1, 2, 3, 4, 5, 6, 7, 8).skip(3).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "accept:" + integer);
                }
            });
        }
    
    
        /**
         * rxJava Take
         * 只保留前面的若干项数据
         * 结果:
         * 02-11 09:50:50.527 8603-8677/pers.bolin.rxjava2demo D/MainActivity: accept:0
         * 02-11 09:50:51.528 8603-8677/pers.bolin.rxjava2demo D/MainActivity: accept:1
         * 02-11 09:50:52.529 8603-8677/pers.bolin.rxjava2demo D/MainActivity: accept:2
         * 02-11 09:50:53.529 8603-8677/pers.bolin.rxjava2demo D/MainActivity: accept:3
         * 02-11 09:50:54.528 8603-8677/pers.bolin.rxjava2demo D/MainActivity: accept:4
         */
        private void rxJava2Take() {
            Observable.interval(1, TimeUnit.SECONDS).take(5).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG, "accept:" + aLong);
                }
            });
        }
    
        /**
         * rxJava2 CombineLatest
         * 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
         * 结果:
         * 02-11 11:05:23.292 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s: 0
         * 02-11 11:05:23.293 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s2: 0
         * 02-11 11:05:23.293 27905-28474/pers.bolin.rxjava2demo D/MainActivity: accept: 0
         * 02-11 11:05:23.293 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s: 0
         * 02-11 11:05:23.293 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s2: 1
         * 02-11 11:05:23.293 27905-28474/pers.bolin.rxjava2demo D/MainActivity: accept: 1
         * 02-11 11:05:24.293 27905-28475/pers.bolin.rxjava2demo D/MainActivity: s: 0
         * 02-11 11:05:24.293 27905-28475/pers.bolin.rxjava2demo D/MainActivity: s2: 2
         * 02-11 11:05:24.293 27905-28475/pers.bolin.rxjava2demo D/MainActivity: accept: 2
         * 02-11 11:05:25.294 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s: 1
         * 02-11 11:05:25.295 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s2: 2
         * 02-11 11:05:25.295 27905-28474/pers.bolin.rxjava2demo D/MainActivity: accept: 3
         * 02-11 11:05:25.295 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s: 1
         * 02-11 11:05:25.296 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s2: 3
         * 02-11 11:05:25.296 27905-28474/pers.bolin.rxjava2demo D/MainActivity: accept: 4
         * 02-11 11:05:26.295 27905-28475/pers.bolin.rxjava2demo D/MainActivity: s: 1
         * 02-11 11:05:26.296 27905-28475/pers.bolin.rxjava2demo D/MainActivity: s2: 4
         * 02-11 11:05:26.296 27905-28475/pers.bolin.rxjava2demo D/MainActivity: accept: 5
         * 02-11 11:05:27.295 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s: 2
         * 02-11 11:05:27.295 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s2: 4
         * 02-11 11:05:27.296 27905-28474/pers.bolin.rxjava2demo D/MainActivity: accept: 6
         * 02-11 11:05:29.293 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s: 3
         * 02-11 11:05:29.293 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s2: 4
         * 02-11 11:05:29.293 27905-28474/pers.bolin.rxjava2demo D/MainActivity: accept: 7
         * 02-11 11:05:31.294 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s: 4
         * 02-11 11:05:31.295 27905-28474/pers.bolin.rxjava2demo D/MainActivity: s2: 4
         * 02-11 11:05:31.295 27905-28474/pers.bolin.rxjava2demo D/MainActivity: accept: 8
         */
        private void rxJava2CombineLatest() {
            Observable<Long> o1 = Observable.interval(2, TimeUnit.SECONDS).take(5);
            Observable<Long> o2 = Observable.interval(1, TimeUnit.SECONDS).take(5);
    
            Observable.combineLatest(o1, o2, new BiFunction<Long, Long, Long>() {
                @Override
                public Long apply(Long s, Long s2) throws Exception {
                    Log.d(TAG, "s: " + s);
                    Log.d(TAG, "s2: " + s2);
                    return s + s2;
                }
            }).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long s) throws Exception {
                    Log.d(TAG, "accept: " + s);
                }
            });
        }
    
        /**
         * rxJava Merge
         * 将两个Observable发射的数据组合并成一个,但顺序不是先A在B
         * 结果:
         * 02-11 11:44:24.402 27774-27822/pers.bolin.rxjava2demo D/MainActivity: accept: 0
         * 02-11 11:44:25.403 27774-27821/pers.bolin.rxjava2demo D/MainActivity: accept: 0
         * 02-11 11:44:25.409 27774-27822/pers.bolin.rxjava2demo D/MainActivity: accept: 1
         * 02-11 11:44:26.404 27774-27822/pers.bolin.rxjava2demo D/MainActivity: accept: 2
         * 02-11 11:44:27.403 27774-27821/pers.bolin.rxjava2demo D/MainActivity: accept: 1
         * 02-11 11:44:29.403 27774-27821/pers.bolin.rxjava2demo D/MainActivity: accept: 2
         * 02-11 11:44:31.402 27774-27821/pers.bolin.rxjava2demo D/MainActivity: accept: 3
         * 02-11 11:44:33.403 27774-27821/pers.bolin.rxjava2demo D/MainActivity: accept: 4
         */
        private void rxJava2Merge() {
            Observable<Long> o1 = Observable.interval(2, TimeUnit.SECONDS).take(5);
            Observable<Long> o2 = Observable.interval(1, TimeUnit.SECONDS).take(3);
    
            Observable.merge(o1, o2)//使用merge操作符将两个被观察者合并
                    //.subscribeOn(Schedulers.newThread())
                    //.observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long s) throws Exception {
                            Log.d(TAG, "accept: " + s);
                        }
                    });
        }
    
        /**
         * rxJava StartWith
         * 在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项
         * 结果:
         * 02-11 11:48:42.394 31994-31994/? D/MainActivity: accept: -1
         * 02-11 11:48:42.394 31994-31994/? D/MainActivity: accept: -2
         * 02-11 11:48:42.394 31994-31994/? D/MainActivity: accept: -3
         * 02-11 11:48:42.394 31994-31994/? D/MainActivity: accept: -4
         * 02-11 11:48:42.394 31994-31994/? D/MainActivity: accept: 1
         * 02-11 11:48:42.395 31994-31994/? D/MainActivity: accept: 2
         * 02-11 11:48:42.395 31994-31994/? D/MainActivity: accept: 3
         * 02-11 11:48:42.395 31994-31994/? D/MainActivity: accept: 4
         * 02-11 11:48:42.395 31994-31994/? D/MainActivity: accept: 5
         */
        private void rxJava2StartWith() {
            Observable.just(1, 2, 3, 4, 5)
                    .startWith(Observable.just(-1, -2, -3, -4))
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: " + integer);
                        }
                    });
        }
    
    
        /**
         * rxJava Switch
         * 结果:
         * 02-11 13:57:11.076 31646-31646/pers.bolin.rxjava2demo D/MainActivity: onSubscribe
         * 02-11 13:57:11.077 31646-31646/pers.bolin.rxjava2demo D/MainActivity: subscribeActual
         * 02-11 13:57:11.077 31646-31646/pers.bolin.rxjava2demo D/MainActivity: onNext:null
         */
        private void rxJava2Switch() {
            Observable.just(1, 2).filter(new Predicate<Integer>() {
                @Override
                public boolean test(@NonNull Integer integer) throws Exception {
                    return integer > 3;
                }
            }).switchIfEmpty(new Observable<Integer>() {
                @Override
                protected void subscribeActual(Observer<? super Integer> observer) {
                    Log.d(TAG, "subscribeActual");
                    observer.onNext(-1);
                }
            }).map(new Function<Integer, String>() {
                @Override
                public String apply(@NonNull Integer integer) throws Exception {
                    if (integer < 0)
                        return "null";
                    return integer + "";
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    Log.d(TAG, "onSubscribe");
                }
    
                @Override
                public void onNext(@NonNull String integer) {
                    Log.d(TAG, "onNext:" + integer);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
        }
    
        /**
         * rxJava Zip
         * 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射
         * 结果:
         * 02-11 14:06:16.669 8886-8886/pers.bolin.rxjava2demo D/MainActivity: aLong: 1  aLong2:2
         * 02-11 14:06:16.669 8886-8886/pers.bolin.rxjava2demo D/MainActivity: accept: 3
         * 02-11 14:06:16.669 8886-8886/pers.bolin.rxjava2demo D/MainActivity: aLong: 2  aLong2:4
         * 02-11 14:06:16.669 8886-8886/pers.bolin.rxjava2demo D/MainActivity: accept: 6
         * 02-11 14:06:16.669 8886-8886/pers.bolin.rxjava2demo D/MainActivity: aLong: 3  aLong2:6
         * 02-11 14:06:16.669 8886-8886/pers.bolin.rxjava2demo D/MainActivity: accept: 9
         * 02-11 14:06:16.670 8886-8886/pers.bolin.rxjava2demo D/MainActivity: aLong: 4  aLong2:8
         * 02-11 14:06:16.670 8886-8886/pers.bolin.rxjava2demo D/MainActivity: accept: 12
         * 02-11 14:06:16.670 8886-8886/pers.bolin.rxjava2demo D/MainActivity: aLong: 5  aLong2:10
         * 02-11 14:06:16.670 8886-8886/pers.bolin.rxjava2demo D/MainActivity: accept: 15
         */
        private void rxJava2Zip() {
            Observable<Long> o1 = Observable.just(1L, 2L, 3L, 4L, 5L, 6L);
            Observable<Long> o2 = Observable.just(2L, 4L, 6L, 8L, 10L);
    
    
            Observable.zip(o1, o2, new BiFunction<Long, Long, String>() {
                @Override
                public String apply(@NonNull Long aLong, @NonNull Long aLong2) throws Exception {
                    Log.d(TAG, "aLong: " + aLong + "  aLong2:" + aLong2);
                    return aLong + aLong2 + "";
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, "accept: " + s);
                }
            });
        }
    
        /**
         * rxJava Catch
         * 捕获,继续序列操作,将错误替换为正常的数据,从onError通知中恢复
         * 结果:
         * 02-11 14:11:23.751 14001-14001/pers.bolin.rxjava2demo D/MainActivity: accept: 0
         */
        private void rxJava2Catch() {
            Observable.error(new Exception("抛出了一个错误")).onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(@NonNull Throwable throwable) throws Exception {
                    return 0;
                }
            }).subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object o) throws Exception {
                    Log.d(TAG, "accept: " + o);
                }
            });
        }
    
    
        /**
         * rxJava Delay
         * 延迟一段时间发射结果数据
         * 结果:
         * 02-11 14:32:25.382 6261-6348/pers.bolin.rxjava2demo D/MainActivity: subscribe currentThread id:2550
         * 02-11 14:32:25.384 6261-6348/pers.bolin.rxjava2demo D/MainActivity: subscribe currentThread id:2550
         * 02-11 14:32:26.384 6261-6261/pers.bolin.rxjava2demo D/MainActivity: accept: 1  currentThread id:2
         * 02-11 14:32:26.384 6261-6261/pers.bolin.rxjava2demo D/MainActivity: accept: 2  currentThread id:2
         * 02-11 14:32:26.384 6261-6261/pers.bolin.rxjava2demo D/MainActivity: accept: 3  currentThread id:2
         * 02-11 14:32:26.384 6261-6261/pers.bolin.rxjava2demo D/MainActivity: accept: 4  currentThread id:2
         * 02-11 14:32:26.385 6261-6261/pers.bolin.rxjava2demo D/MainActivity: accept: 5  currentThread id:2
         */
        private void rxJava2Delay() {
            Observable
                    .create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                            Log.d(TAG, "subscribe currentThread id:" + Thread.currentThread().getId());
                            emitter.onNext(1);
                            emitter.onNext(2);
                            emitter.onNext(3);
                            emitter.onNext(4);
                            emitter.onNext(5);
                            Log.d(TAG, "subscribe currentThread id:" + Thread.currentThread().getId());
                        }
                    })
                    .delay(1, TimeUnit.SECONDS)
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: " + integer + "  currentThread id:" + Thread.currentThread().getId());
                        }
                    });
        }
    
        /**
         * rxJava Do
         * 注册一个操作来执行各种可观察的生命周期事件。观察调用的过程。
         * 结果:
         * 02-11 14:45:57.925 21207-21207/pers.bolin.rxjava2demo D/MainActivity: doOnNext accept: 1
         * 02-11 14:45:57.925 21207-21207/pers.bolin.rxjava2demo D/MainActivity: doOnNext accept: 2
         * 02-11 14:45:57.925 21207-21207/pers.bolin.rxjava2demo D/MainActivity: doOnNext accept: 3
         * 02-11 14:45:57.925 21207-21207/pers.bolin.rxjava2demo D/MainActivity: accept: 3
         * 02-11 14:45:57.925 21207-21207/pers.bolin.rxjava2demo D/MainActivity: doOnNext accept: 4
         * 02-11 14:45:57.925 21207-21207/pers.bolin.rxjava2demo D/MainActivity: accept: 4
         * 02-11 14:45:57.926 21207-21207/pers.bolin.rxjava2demo D/MainActivity: doOnNext accept: 5
         * 02-11 14:45:57.926 21207-21207/pers.bolin.rxjava2demo D/MainActivity: accept: 5
         */
        private void rxJava2Do() {
            Observable
                    .just(1, 2, 3, 4, 5)
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "doOnNext accept: " + integer);
                        }
                    })
                    .filter(new Predicate<Integer>() {
                        @Override
                        public boolean test(@NonNull Integer integer) throws Exception {
                            return integer > 2;
                        }
                    })
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: " + integer);
                        }
                    });
        }
    
        /**
         * rxJava TimeInterval
         * 将一个Observable转换为发射两个数据之间所耗费时间的Observable
         * 结果:
         * 02-11 15:12:28.955 14639-14639/? D/MainActivity: integerTimed:Timed[time=0, unit=MILLISECONDS, value=1]
         * 02-11 15:12:28.955 14639-14639/? D/MainActivity: integerTimed:Timed[time=1, unit=MILLISECONDS, value=2]
         * 02-11 15:12:28.955 14639-14639/? D/MainActivity: integerTimed:Timed[time=0, unit=MILLISECONDS, value=3]
         * 02-11 15:12:28.955 14639-14639/? D/MainActivity: integerTimed:Timed[time=0, unit=MILLISECONDS, value=4]
         * 02-11 15:12:28.956 14639-14639/? D/MainActivity: integerTimed:Timed[time=0, unit=MILLISECONDS, value=5]
         * 02-11 15:12:28.956 14639-14639/? D/MainActivity: integerTimed:Timed[time=1, unit=MILLISECONDS, value=6]
         */
        private void rxJava2TimeInterval() {
            Observable.just(1, 2, 3, 4, 5, 6).timeInterval().subscribe(new Consumer<Timed<Integer>>() {
                @Override
                public void accept(Timed<Integer> integerTimed) throws Exception {
                    Log.d(TAG, "integerTimed:" + integerTimed);
                }
            });
        }
    
        /**
         * rxJava Timeout
         * 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知
         * 结果:
         * 02-11 15:17:39.709 21248-21248/pers.bolin.rxjava2demo D/MainActivity: onSubscribe
         * 02-11 15:17:44.711 21248-21324/pers.bolin.rxjava2demo D/MainActivity: onError:null
         */
        private void rxJava2Timeout() {
            Observable.interval(6, TimeUnit.SECONDS).timeout(5, TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            Log.d(TAG, "onSubscribe");
                        }
    
                        @Override
                        public void onNext(@NonNull Long aLong) {
                            Log.d(TAG, "onNext:" + aLong);
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            Log.d(TAG, "onError:" + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
        }
    
        /**
         * rxJava Timestamp
         * 给Observable发射的每个数据项添加一个时间戳
         * 结果:
         * 02-11 15:19:31.777 23124-23124/? D/MainActivity: integerTimed:Timed[time=1518333571776, unit=MILLISECONDS, value=1]
         * 02-11 15:19:31.777 23124-23124/? D/MainActivity: integerTimed:Timed[time=1518333571777, unit=MILLISECONDS, value=2]
         * 02-11 15:19:31.777 23124-23124/? D/MainActivity: integerTimed:Timed[time=1518333571777, unit=MILLISECONDS, value=3]
         * 02-11 15:19:31.777 23124-23124/? D/MainActivity: integerTimed:Timed[time=1518333571777, unit=MILLISECONDS, value=4]
         * 02-11 15:19:31.777 23124-23124/? D/MainActivity: integerTimed:Timed[time=1518333571777, unit=MILLISECONDS, value=5]
         * 02-11 15:19:31.777 23124-23124/? D/MainActivity: integerTimed:Timed[time=1518333571777, unit=MILLISECONDS, value=6]
         */
        private void rxJava2Timestamp() {
            Observable.just(1, 2, 3, 4, 5, 6).timestamp().subscribe(new Consumer<Timed<Integer>>() {
                @Override
                public void accept(Timed<Integer> integerTimed) throws Exception {
                    Log.d(TAG, "integerTimed:" + integerTimed);
                }
            });
        }
    
    
        /**
         * rxJava All
         * 判断Observable发射的所有的数据项是否都满足某个条件
         * 结果:
         * 02-11 14:22:59.498 26352-26352/pers.bolin.rxjava2demo D/MainActivity: accept: false
         */
        private void rxJava2All() {
            Observable
                    .just(1, 2, 3, 4, 5)
                    .all(new Predicate<Integer>() {
                        @Override
                        public boolean test(@NonNull Integer integer) throws Exception {
                            return integer > 1;
                        }
                    }).subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    Log.d(TAG, "accept: " + aBoolean);
                }
            });
        }
    
        /**
         * rxJava Concat
         * 合并,不交错的连接多个Observable的数据(按顺序)
         * 结果:
         * 02-11 16:10:08.505 26030-26094/pers.bolin.rxjava2demo D/MainActivity: accept: 0
         * 02-11 16:10:09.506 26030-26094/pers.bolin.rxjava2demo D/MainActivity: accept: 1
         * 02-11 16:10:10.505 26030-26094/pers.bolin.rxjava2demo D/MainActivity: accept: 2
         * 02-11 16:10:11.505 26030-26094/pers.bolin.rxjava2demo D/MainActivity: accept: 3
         * 02-11 16:10:12.506 26030-26094/pers.bolin.rxjava2demo D/MainActivity: accept: 4
         * 02-11 16:10:13.510 26030-26333/pers.bolin.rxjava2demo D/MainActivity: accept: 0
         * 02-11 16:10:14.510 26030-26333/pers.bolin.rxjava2demo D/MainActivity: accept: 1
         * 02-11 16:10:15.509 26030-26333/pers.bolin.rxjava2demo D/MainActivity: accept: 2
         * 02-11 16:10:16.508 26030-26333/pers.bolin.rxjava2demo D/MainActivity: accept: 3
         */
        private void rxJava2Concat() {
            Observable.merge(Observable.interval(1, TimeUnit.SECONDS).take(5), Observable.interval(1, TimeUnit.SECONDS).take(4)).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    Log.d(TAG, "accept: " + aLong);
                }
            });
        }
    
    }

    参考资料:

    https://www.zhihu.com/question/32209660

  • 相关阅读:
    Cocos2dx 3.0 过渡篇(二十五)死不了的贪食蛇(触摸版)
    IBinder对象在进程间传递的形式(一)
    【代码实现】PHP生成各种随机验证码
    win8 metro 调用摄像头拍摄照片并将照片保存在对应的位置
    薏米红豆粥功效及做法介绍
    应用程序无法正常启动0xc0150002 解决方式
    贪心算法
    Linux 进程通信之 ——信号和信号量总结
    关于 ioctl 的 FIONREAD 參数
    google域名邮箱申请 gmail域名邮箱申请(企业应用套件)指南
  • 原文地址:https://www.cnblogs.com/H-BolinBlog/p/8438659.html
Copyright © 2011-2022 走看看