zoukankan      html  css  js  c++  java
  • RxJava部分操作符

    package qianxingzhe.rxjava_learning;
    
    import org.junit.Test;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import rx.Observable;
    import rx.Observer;
    import rx.Scheduler;
    import rx.Subscriber;
    import rx.Subscription;
    import rx.android.schedulers.AndroidSchedulers;
    import rx.functions.Action1;
    import rx.functions.Func1;
    import rx.schedulers.Schedulers;
    
    /**
     * Created by lunyi.yly on 16/8/6.
     */
    
    public class RxJavaText {
    
        @Test
        public void hello_world_01() {
            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("Hello, World!");
                    subscriber.onCompleted();
                }
            });
    
            Subscriber<String> subscriber = new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted()");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError()");
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println("onNext()");
                    System.out.println(s);
                }
            };
    
            observable.subscribe(subscriber);
        }
    
        @Test
        public void hello_world_02() {
            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("Hello, World!");
                    subscriber.onCompleted();
                }
            });
    
            Observer<String> observer = new Observer<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("onCompleted()");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError()");
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println("onNext()");
                    System.out.println("s");
                }
            };
    
            observable.subscribe(observer);
        }
    
        @Test
        public void just_01() {
            Observable<String> observable = Observable.just("hello world");
    
            Action1<String> action1 = new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println(s);
                }
            };
    
            observable.subscribe(action1);
        }
    
        /**
         * just 用来创建只发出一个事件就结束的Observable对象
         */
        @Test
        public void just_02() {
            Observable.just("hello world").subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.println(s);
                }
            });
        }
    
        /**
         * map 把一个事件转换为另一个事件
         */
        @Test
        public void map() {
            Observable.just("hello world")
                    .map(new Func1<String, Integer>() {
                        @Override
                        public Integer call(String s) {
                            return s.hashCode();
                        }
                    })
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.println(integer);
                        }
                    });
        }
    
        /**
         * from 接收一个集合作为输入,然后每次输出一个元素给subscriber
         */
        @Test
        public void from() {
            Observable.from(new String[]{"hello", "world"})
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
        }
    
        /**
         * flatMap 接收一个Observable的输出作为输入,同时输出另外一个Observable
         */
        @Test
        public void flatmap() {
            List<List<String>> lists = new ArrayList<>();
            Observable.from(lists)
                    .flatMap(new Func1<List<String>, Observable<String>>() {
                        @Override
                        public Observable<String> call(List<String> strings) {
                            return Observable.from(strings);
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
        }
    
        /**
         * filter 输出和输入相同的元素,并且会过滤掉那些不满足检查条件的
         */
        @Test
        public void filter() {
            Observable.from(new String[]{"hello", "world"})
                    .flatMap(new Func1<String, Observable<String>>() {
                        @Override
                        public Observable<String> call(String s) {
                            return Observable.just(s);
                        }
                    })
                    .filter(new Func1<String, Boolean>() {
                        @Override
                        public Boolean call(String s) {
                            return s.equals("hello");
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
        }
    
        /**
         * take 输出最多指定数量的结果
         */
        @Test
        public void take() {
            Observable.from(new Integer[]{1, 2, 3, 4, 5,})
                    .take(3)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.println(integer);
                        }
                    });
        }
    
        /**
         * doOnNext 允许我们在每次输出一个元素之前做一些额外的事情,比如这里的保存标题。
         */
        @Test
        public void doOnNext() {
            Observable.just("hello world")
                    .doOnNext(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
        }
    
        /**
         * subscribeOn 指定被观察者代码运行的线程
         * ObserverOn 指定观察者运行的线程
         */
        @Test
        public void subscribeOn_ObserverOn() {
            Observable.just("http://www.baidu.com")
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println("在UI线程执行");
                        }
                    });
    
        }
    
        /**
         * unsubscribe 在他当前执行的地方终止
         */
        @Test
        public void unsubscribe() {
            Subscription subscribe = Observable.just("hello world")
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
            subscribe.unsubscribe();
            System.out.println(subscribe.isUnsubscribed());
    
        }
    }
    
    
  • 相关阅读:
    .NET实现Excel文件的读写 未测试
    权限管理设计
    struts1中配置应用
    POJ 2139 Six Degrees of Cowvin Bacon(floyd)
    POJ 1751 Highways
    POJ 1698 Alice's Chance
    POJ 1018 Communication System
    POJ 1050 To the Max
    POJ 1002 4873279
    POJ 3084 Panic Room
  • 原文地址:https://www.cnblogs.com/FlySheep/p/5745514.html
Copyright © 2011-2022 走看看