zoukankan      html  css  js  c++  java
  • Rxjava

    操作符是用来干什么的?Rxjava中的每一个操作符基本都是用来创建Observable,也就是被订阅者。RxJava中常用的操作符包括:创建操作符,连接操作符,工具操作符,变换操作符,过滤操作符,条件操作符,布尔操作符,合并操作符。本次着重了解创建操作符的用法。

    创建操作符

    10种常用的操作符定义

    摘自《RxJava实战》

    
    just:将一个或多个对象转换成发射这个或这些对象的一个Observable;
    
    from:将一个Interable,一个Future或者一个数组转换成一个Observable;
    
    create:使用一个函数从头创建一个Observable;
    
    defer:只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable;
    
    range:创建一个发射指定范围的整数序列的Observable;
    
    interval:创建一个按照给定的时间间隔发射整数序列的Observable;
    
    timer:创建一个在给定的延时之后发射单个数据的Observable;
    
    empty:创建一个什么都不做直接通知完成的Observable;
    
    error:创建一个什么都不做直接通知错误的Observable;
    
    never:创建一个不发射任何数据的Observable。
    
    

    下面做几个操作符的demo演示

    create

    此处模拟create中报错的场景

    
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> observableEmitter) throws Exception {
            try {
                if (!observableEmitter.isDisposed()) {
                    for (int i = 0; i < 10; i ++) {
                        observableEmitter.onNext(i);
                        String str = null;
                        str.length();
                    }
                    observableEmitter.onComplete();
                }
            } catch (Exception e) {
                observableEmitter.onError(e);
            }
    
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("Next: " + integer);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            System.out.println("Error: " + throwable.getMessage());
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("Sequence complete. ");
        }
    });
    

    运行结果:

    just就不写了,之前写hello world的时候就用过了。

    from

    发射Iterable或者数组的每一项数据

    
    Observable.fromArray("hello", "from").subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });
    

    运行结果:

    repeat

    重复的发射原始Observable的数据序列,次数通过repeat(n)指定

    
    Observable.just("hello repeat")
            .repeat(3)
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println(s);
                }
            });
    

    运行结果:

    defer

    这里我们在observable订阅前睡眠1秒,我们发现只有当observable被订阅了,发射的“hello defer”这条消息才被打印出来

    Observable observable = Observable.defer(new Callable<ObservableSource<?>>() {
        @Override
        public ObservableSource<?> call() throws Exception {
            return Observable.just("hello defer");
        }
    });
    TimeUnit.SECONDS.sleep(1);
    observable.subscribe(new Consumer<String>() {
        @Override
        public void accept(String str) throws Exception {
            System.out.println(str);
        }
    });
    

    运行结果:

    interval

    interval是按照固定的时间发射一个无限递增的整数序列

    Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println(aLong);
                }
            });
    TimeUnit.SECONDS.sleep(10);
    

    运行结果:

    Scheduler

    什么是RxJava线程?RxJava是一个为异步线程而实现的库,所以RxJava的特点就是异步,一个通过异步编程合理提高系统处理速度的。在默认情况下,RxJava是单线程的。用Observable发射数据,Observe接受和响应数据,各种操作符来加工处理数据流,都是在同一个线程中运行的,实现出来的就是一个同步的函数响应式。其实在Observer中接受和响应数据会牵涉到多线程来操作RxJava,这些多线程我们通过调度器(Scheduler)来实现。上述总结于《RxJava实战》这本书中。

    什么是Scheduler?

    Scheduler是RxJava对线程控制器的一个抽象,RxJava内置了多个Scheduler的实现。常用的调度器有single,newThread,computation,io,trampoline,Schedulers.from这五个。当然如果自带的调度器不能满足需求,我们是可以自己定义Executor来作为调度器的。

    如何使用Scheduler

    • 切换newThread线程
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> observableEmitter) throws Exception {
            observableEmitter.onNext("hello");
            observableEmitter.onNext("world");
        }
    }).observeOn(Schedulers.newThread())
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });
    
    • 线程调度的两种方法,也就是线程的切换
    通过observeOn或者subscribeOn方法
    Observable.just("hello", "world")
            .observeOn(Schedulers.newThread())
            .map(new Function<String, String>() {
                @Override
                public String apply(@NonNull String s) throws Exception {
                    return s.toUpperCase();
                }
            })
            .subscribeOn(Schedulers.single())
            .observeOn(Schedulers.io())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    System.out.println(s);
                }
            });
    

    不同线程调度器的使用场景

    • computation()
    用于CPU密集型的计算任务,不适合I/O操作
    
    • io()
    用于I/O密集型任务,支持异步阻塞I/O操作,这个调度器的线程池会根据需要增长,对于普通的计算任务,还是用Schedulers.computation()
    
    • newThread()
    为每个任务创建一个新的线程。
    
    • single()
    single拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,它的任务将会按照先进先出的顺序依次执行。
    
    • 几种线程测试demo
    Map map = new HashMap();
    Observable.just("hello world")
            .subscribeOn(Schedulers.single())
            .map(new Function<String, String>() {
                @Override
                public String apply(@NonNull String s) throws Exception {
                    s = s.toUpperCase();
                    map.put("map1", s);
                    return s;
                }
            })
            .observeOn(Schedulers.io())
            .map(new Function<String, String>() {
                @Override
                public String apply(@NonNull String s) throws Exception {
                    s = s + " leo.";
                    map.put("map2", s);
                    return s;
                }
            })
            .subscribeOn(Schedulers.computation())
            .map(new Function<String, String>() {
                @Override
                public String apply(@NonNull String s) throws Exception {
                    s = s + "it is a test";
                    map.put("map3", s);
                    return s;
                }
            })
            .observeOn(Schedulers.newThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    map.put("subscribe", s);
                    System.out.println(s);
                }
            });
    

    总结:操作符,线程操作的使用,大家可以多看看相关书籍,自己多敲代码,带着不懂去看源码或者相关文档,作为初学者,现在也只是看着书上的皮毛,自己敲着书上的demo来理解。学习之路慢慢,共勉。。。

  • 相关阅读:
    C# 装箱原型
    C# 反射浅谈(一)
    javascript介绍(二)
    javascript介绍(一)
    C#中 托管资源和非托管资源
    varchar && nvarchar 闲谈
    高内聚&&低耦合
    【android】移植IOS视图响应陀螺仪交互行为
    【android】如何实现猿题库题目的排版
    开心工作标准的硬件环境
  • 原文地址:https://www.cnblogs.com/levcon/p/9942151.html
Copyright © 2011-2022 走看看