zoukankan      html  css  js  c++  java
  • RxJava2学习笔记(1)

    作为github上star数极高的响应式编程java扩展类库,rxjava是啥就不多说了,网上能查到一堆介绍,下面是一些学习记录:

    前提依赖:

    compile 'io.reactivex.rxjava2:rxjava:2.1.9'
    

    一、Observable

    1.1 hello world

    rxjava中的核心思路是“生产者-消费者”模型,生产者的java类通常用xxxEmitter命名,字面意思:发射器,可以想象为一个机关枪,一直biu biu biu的向外发射信息,另一端则是靶子(也就是消费者),在不停的接收。不过要注意的是:rxjava中,能接收子弹的靶子,可以同时有多个。

            Observable<String> observable = Observable.create(emitter -> {
                emitter.onNext("a");
                emitter.onNext("b");
                emitter.onNext("c");
                emitter.onComplete();
            });
    
            Observer observer1 = new Observer<String>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    System.out.println("subscribe=>");
                }
    
                @Override
                public void onNext(@NonNull String s) {
                    System.out.println(s + " ");
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    System.out.println(e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("complete");
                }
            };
    
            observable.subscribe(observer1);
    

     输出:

    subscribe=>
    a 
    b 
    c 
    complete
    

    注:最后一行,也可以改成

    observable.subscribe(observer1);
    observable.subscribe(observer1);
    

    这样就相当于2个靶子在接子弹了。 上面这是最正统的写法,官方推荐使用链式编程写法:

            Observable.create((ObservableOnSubscribe<String>) emitter -> {
                emitter.onNext("a");
                emitter.onNext("b");
                emitter.onNext("c");
                emitter.onComplete();
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    System.out.println("subscribe=>");
                }
    
                @Override
                public void onNext(@NonNull String s) {
                    System.out.println(s + " ");
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    System.out.println(e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("complete");
                }
            });
    

    1.2 onComplete事件

    emitter发送onComplete消息后,挨打的靶子(消费者),就不再继续处理了,不管后面emitter是否还继续发送。

    Observable.create((ObservableOnSubscribe<String>) emitter -> {
        emitter.onNext("a");
        emitter.onNext("b");
        emitter.onNext("c");
        emitter.onComplete(); //这里主动通知消费者complete
        System.out.println("complete后,emitter还继续发射...");
        emitter.onNext("d");
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            System.out.println("subscribe=>");
        }
    
        @Override
        public void onNext(@NonNull String s) {
            System.out.println(s + " ");
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            System.out.println(e.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("complete");
        }
    });
    

     输出:

    subscribe=>
    a 
    b 
    c 
    complete
    complete后,emitter还继续发射...
    

    注:onComplete之后,emitter再次发送的"d",消费者已经不再处理了。

    1.3 onError事件

    onError即可以在emitter(生产者)端报错,也可以在靶子(消费者)上报错,不管哪一端发生error,消费者就停止处理了。

    Observable.create((ObservableOnSubscribe<String>) emitter -> {
        emitter.onNext("a");
        emitter.onError(new Throwable("emitter报了个错!"));
        System.out.println("准备发送c");
        emitter.onNext("c");
        emitter.onComplete();
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            System.out.println("subscribe=>");
        }
    
        @Override
        public void onNext(@NonNull String s) {
            System.out.println(s + " ");
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            System.out.println(e.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("complete");
        }
    });
    

     输出:

    subscribe=>
    a 
    emitter报了个错!
    准备发送c
    

    下面模拟下消费者处理时,发生异常:

    Observable.create((ObservableOnSubscribe<String>) emitter -> {
        emitter.onNext("a");
        emitter.onNext("b");
        emitter.onNext("c");
        emitter.onComplete();
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            System.out.println("subscribe=>");
        }
    
        @Override
        public void onNext(@NonNull String s) {
            if (s.equals("b")) {
                int a = 0;
                int b = 1;
                System.out.println((b / a));
            }
            System.out.println(s + " ");
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            System.out.println("error:" + e.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("complete");
        }
    });
    

    输出:

    subscribe=>
    a
    error:/ by zero
    

    1.4 disposable

    如果消费者主动dispose()后,相当于就解除了生产者-消费者的关系。

    Observable.create((ObservableOnSubscribe<String>) emitter -> {
        emitter.onNext("a");
        emitter.onNext("b");
        System.out.println("准备发送c");
        emitter.onNext("c");
        emitter.onComplete();
    }).subscribe(new Observer<String>() {
    
        Disposable disposable;
    
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            disposable = d;
            System.out.println("subscribe=>");
        }
    
        @Override
        public void onNext(@NonNull String s) {
            if (s.equals("b")) {
                disposable.dispose();
            }
            System.out.println(s + " ");
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            System.out.println("error:" + e.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("complete");
        }
    });
    

     上面的代码,消费者在遇到b时,主动切断了与生产者的关联,emitter再发送的d,消费者就不处理了,输出:

    subscribe=>
    a 
    b 
    准备发送c
    

    1.5 大道至简

    如果消费者只关心onNext的处理部分,其它无所谓,上面这一堆代码,可以简化为一行:

    Observable.fromArray("a", "b", "c").subscribe(c -> System.out.println(c + " "));
    

     输出:

    a 
    b 
    c 
    

    最后再来一个示例:把3个单词拼成一句话,而且每个单词处理成“首字母大写”的风格。

    Observable.fromArray("I", "AM", "CHINESE")
            .map(c -> c.substring(0, 1).toUpperCase() + c.substring(1).toLowerCase())
            .subscribe(c -> System.out.print(c + " "));
    

    输出:

    I Am Chinese 
    

    参考:

    http://www.vogella.com/tutorials/RxJava/article.html

    http://www.cnblogs.com/aademeng/articles/7462540.html

    https://www.jianshu.com/c/299d0a51fdd4

  • 相关阅读:
    Codeforces 950E Data Center Maintenance 强连通分量
    Codeforces Round #469 Div. 2 A B C D E
    Codeforces Round #391 A B C D E
    bzoj 4569 [Scoi2016]萌萌哒 并查集 + ST表
    Codeforces 940F Machine Learning 带修改莫队
    How to read the SQL Server Database Transaction Log
    Mysql 主从复制触发器问题
    Git出现 SSL certificate的处理方法
    mac磁盘启动
    发件人地址重写的几种方法记录
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/rx-java-2-tutorial-1.html
Copyright © 2011-2022 走看看