zoukankan      html  css  js  c++  java
  • RxJava部分操作符

    package qianxingzhe.rxjava_learning;
    
    import org.junit.Test;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import rx.Observable;
    import rx.Observer;
    import rx.Scheduler;
    import rx.Subscriber;
    import rx.Subscription;
    import rx.android.schedulers.AndroidSchedulers;
    import rx.functions.Action1;
    import rx.functions.Func1;
    import rx.schedulers.Schedulers;
    
    /**
     * Created by lunyi.yly on 16/8/6.
     */
    
    public class RxJavaText {
    
        @Test
        public void hello_world_01() {
            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("Hello, World!");
                    subscriber.onCompleted();
                }
            });
    
            Subscriber<String> subscriber = new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted()");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError()");
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println("onNext()");
                    System.out.println(s);
                }
            };
    
            observable.subscribe(subscriber);
        }
    
        @Test
        public void hello_world_02() {
            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("Hello, World!");
                    subscriber.onCompleted();
                }
            });
    
            Observer<String> observer = new Observer<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted()");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError()");
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println("onNext()");
                    System.out.println("s");
                }
            };
    
            observable.subscribe(observer);
        }
    
        @Test
        public void just_01() {
            Observable<String> observable = Observable.just("hello world");
    
            Action1<String> action1 = new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println(s);
                }
            };
    
            observable.subscribe(action1);
        }
    
        /**
         * just 用来创建只发出一个事件就结束的Observable对象
         */
        @Test
        public void just_02() {
            Observable.just("hello world").subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println(s);
                }
            });
        }
    
        /**
         * map 把一个事件转换为另一个事件
         */
        @Test
        public void map() {
            Observable.just("hello world")
                    .map(new Func1<String, Integer>() {
                        @Override
                        public Integer call(String s) {
                            return s.hashCode();
                        }
                    })
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.println(integer);
                        }
                    });
        }
    
        /**
         * from 接收一个集合作为输入,然后每次输出一个元素给subscriber
         */
        @Test
        public void from() {
            Observable.from(new String[]{"hello", "world"})
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
        }
    
        /**
         * flatMap 接收一个Observable的输出作为输入,同时输出另外一个Observable
         */
        @Test
        public void flatmap() {
            List<List<String>> lists = new ArrayList<>();
            Observable.from(lists)
                    .flatMap(new Func1<List<String>, Observable<String>>() {
                        @Override
                        public Observable<String> call(List<String> strings) {
                            return Observable.from(strings);
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
        }
    
        /**
         * filter 输出和输入相同的元素,并且会过滤掉那些不满足检查条件的
         */
        @Test
        public void filter() {
            Observable.from(new String[]{"hello", "world"})
                    .flatMap(new Func1<String, Observable<String>>() {
                        @Override
                        public Observable<String> call(String s) {
                            return Observable.just(s);
                        }
                    })
                    .filter(new Func1<String, Boolean>() {
                        @Override
                        public Boolean call(String s) {
                            return s.equals("hello");
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
        }
    
        /**
         * take 输出最多指定数量的结果
         */
        @Test
        public void take() {
            Observable.from(new Integer[]{1, 2, 3, 4, 5,})
                    .take(3)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.println(integer);
                        }
                    });
        }
    
        /**
         * doOnNext 允许我们在每次输出一个元素之前做一些额外的事情,比如这里的保存标题。
         */
        @Test
        public void doOnNext() {
            Observable.just("hello world")
                    .doOnNext(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
        }
    
        /**
         * subscribeOn 指定被观察者代码运行的线程
         * ObserverOn 指定观察者运行的线程
         */
        @Test
        public void subscribeOn_ObserverOn() {
            Observable.just("http://www.baidu.com")
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println("在UI线程执行");
                        }
                    });
    
        }
    
        /**
         * unsubscribe 在他当前执行的地方终止
         */
        @Test
        public void unsubscribe() {
            Subscription subscribe = Observable.just("hello world")
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
            subscribe.unsubscribe();
            System.out.println(subscribe.isUnsubscribed());
    
        }
    }
    
    
  • 相关阅读:
    java中Executor、ExecutorService、ThreadPoolExecutor介绍
    JAVA多线程实现的四种方式
    JVM内存结构
    Synchronized修饰静态变量和普通变量的区别
    tcpkill工作原理分析
    数据库路由中间件MyCat
    数据库路由中间件MyCat
    数据库路由中间件MyCat
    数据库路由中间件MyCat
    数据库路由中间件MyCat
  • 原文地址:https://www.cnblogs.com/FlySheep/p/5745514.html
Copyright © 2011-2022 走看看