zoukankan      html  css  js  c++  java
  • RxJava 2.x 理解-1

    在RxJava 1.x 系列中,讲解了RxJava的大致用法,因为现在都用RxJava 2了,所以Rxjava 1就不细讲,主要来学习RxJava 2。

    基本使用:

        /**
         * rajava2 的基本使用
         */
        private void rxJava2BaseUser() {
            Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                            emitter.onNext("1");
                            emitter.onNext("2");
                            emitter.onNext("3");
                            //throw new Exception("发生了错误");
                        }
                    })
                    .subscribe(new Observer<String>() {
    
                        Disposable disposable;
    
                        // 新增该方法
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            Log.d(TAG, "onSubscribe");
                            disposable = d;
                        }
    
                        @Override
                        public void onNext(@NonNull String s) {
                            Log.d(TAG, "Item: " + s);
                            if (s.equals("4"))
                                disposable.dispose(); // 在RxJava 2.x 中,新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            Log.d(TAG, "onError:" + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    
            Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                            emitter.onNext("----- 01 -----");
                            emitter.onNext("----- 02 -----");
                            emitter.onNext("----- 03 -----");
                        }
                    })
                    // Consumer 和 RxJava 1 中的 Action1 类似
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.d(TAG, "Item: " + s);
                        }
                    });
        }

    基本使用和RxJava 1没有什么区别。

    1.新增了onSubscribe方法,onSubscribe方法会在事件开始的时候,触发。

    2.新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件。

    3.Action1  --> Consumer 只接收onNext方法。

    4.那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管.

    线程切换:

    /**
         * rajava2 线程
         */
        private void rxJava2Thread() {
            Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                            Log.d(TAG, "事件处理线程:" + Thread.currentThread().getName());
                            emitter.onNext("---- 1 ----");
                            emitter.onNext("---- 2 ----");
                            emitter.onNext("---- 3 ----");
                        }
                    })
                    .subscribeOn(Schedulers.newThread())        // 指明被观察者处理的线程
                    .observeOn(AndroidSchedulers.mainThread())  // 指明观察者线程
                    .subscribe(new Observer<String>() {
    
                        @Override
                        public void onSubscribe(@NonNull Disposable d) {
                            Log.d(TAG, "onSubscribe:" + Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onNext(@NonNull String s) {
                            Log.d(TAG, "Item: " + s + " :" + Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onError(@NonNull Throwable e) {
                            Log.d(TAG, "onError:" + e.getMessage() + " :" + Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete:" + Thread.currentThread().getName());
                        }
                    });
        }

    结果:

    02-10 10:02:31.007 25414-25414/pers.bolin.rxjava2demo D/MainActivity: onSubscribe:main
    02-10 10:02:31.009 25414-25970/pers.bolin.rxjava2demo D/MainActivity: 事件处理线程:RxNewThreadScheduler-1
    02-10 10:02:31.047 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 1 ---- :main
    02-10 10:02:31.048 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 2 ---- :main
    02-10 10:02:31.048 25414-25414/pers.bolin.rxjava2demo D/MainActivity: Item: ---- 3 ---- :main

    可以看出事件已经被分到不同的线程去处理了。

                    .subscribeOn(Schedulers.newThread())        // 指明被观察者处理的线程
                    .observeOn(AndroidSchedulers.mainThread())  // 指明观察者线程

    需要注意的是subscribeOn 只在第一次切换有效,observeOn每次切换都是有效的

    看下线程的参数有哪些:

    • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
    • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
    • Schedulers.newThread() 代表一个常规的新线程
    • AndroidSchedulers.mainThread() 代表Android的主线程
    • Schedulers.single() 代表一个默认的、共享的、单线程支持的调度器实例,用于在相同的后台线程上执行强顺序执行。
    • Schedulers.trampoline()代表当其它排队的任务完成后,在当前线程排队开始执行

    变换/操作符:

     just;from ;map ;flatMap  和RxJava使用一致:RxJava 1.x 理解-3

    更多的操作符使用:

    官方:http://reactivex.io/documentation/operators.html

    RxJava 知识梳理(2) - RxJava2 操作符实战

    RxJava2-Android-Samples

     

     

  • 相关阅读:
    树莓派添加桌面快捷方式
    计算机网络
    django-auth2
    令牌桶算法-python
    linux centos-7 添加开机自启动脚本
    pymongodb-explain
    哈希表
    tcp/udp
    jemeter之jmeter+ant+jenkins搭建接口自动化测试环境
    jmeter之jmeter + ant + jenkins(二)Jenkins安装
  • 原文地址:https://www.cnblogs.com/H-BolinBlog/p/8438287.html
Copyright © 2011-2022 走看看