zoukankan      html  css  js  c++  java
  • Android RxJava1.X升级到RxJava2.X笔记

    简书地址 http://www.jianshu.com/p/2badfbb3a33b

    描述RxJava 1.XRxJava 2.X
    package包名 rx.xxx io.reactivex.xxx
    Reactive Streams规范 1.X早于Reactive Streams规范出现,仅部分支持规范 完全支持
    Backpressure 背压 对背压的支持不完善 Observable设计为不支持背压
    新增Flowable支持背压
    null空值 支持 不再支持null值,传入null值会抛出 NullPointerException
    Schedulers线程调度器 Schedulers.immediate()
    Schedulers.trampoline()
    Schedulers.computation()
    Schedulers.newThread()
    Schedulers.io()
    Schedulers.from(executor)
    AndroidSchedulers.mainThread()
    移除Schedulers.immediate()
    新增Schedulers.single()
    其它未变
    Single 行为类似Observable,但只会发射一个onSuccessonError 按照Reactive Streams规范重新设计,遵循协议onSubscribe(onSuccess/onError)
    Completable 行为类似Observable,要么全部成功,要么就失败 按照Reactive Streams规范重新设计,遵循协议onSubscribe (onComplete/onError)
    Maybe 2.X新增,行为类似Observable,可能会有一个数据或一个错误,也可能什么都没有。可以将其视为一种返回可空值的方法。这种方法如果不抛出异常的话,将总是会返回一些东西,但是返回值可能为空,也可能不为空。按照Reactive Streams规范设计,遵循协议onSubscribe (onSuccess/onError/onComplete)
    Flowable 2.X新增,行为类似Observable,按照Reactive Streams规范设计,支持背压Backpressure
    Subject AsyncSubject
    BehaviorSubject
    PublishSubject
    ReplaySubject
    UnicastSubject
    2.X依然维护这些Subject现有的功能,并新增:
    AsyncProcessor
    BehaviorProcessor
    PublishProcessor
    ReplayProcessor
    UnicastProcessor
    支持背压Backpressure
    Subscriber Subscriber 由于与Reactive Streams的命名冲突,Subscriber已重命名为Disposable

    RxJava 2.X + Retrofit + OkHttp 简单示例点这里

    library依赖变化

    //1.X
    compile 'io.reactivex:rxjava:1.2.1'
    compile 'io.reactivex:rxandroid:1.2.1'
    
    //2.X
    compile 'io.reactivex.rxjava2:rxjava:2.0.0'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.0'

    package变化

    变动主要为rx.xxx --> io.reactivex.xxx

    //1.X
    import rx.Observable;
    import rx.Subscription;
    import rx.android.schedulers.AndroidSchedulers;
    import rx.schedulers.Schedulers;
    import rx.functions.Action1;
    
    //2.X
    import io.reactivex.Observable;
    import io.reactivex.ObservableSource;
    import io.reactivex.ObservableTransformer;
    import io.reactivex.disposables.Disposable;
    import io.reactivex.android.schedulers.AndroidSchedulers;
    import io.reactivex.schedulers.Schedulers;
    import io.reactivex.functions.Consumer;

    null

    RxJava 2.X不再支持null值,如果传入一个null会抛出NullPointerException

    Observable.just(null);
    
    Single.just(null);
    
    Observable.fromCallable(() -> null)
        .subscribe(System.out::println, Throwable::printStackTrace);
    
    Observable.just(1).map(v -> null)
        .subscribe(System.out::println, Throwable::printStackTrace);

    案例1

    //1.X
    public static final Observable.Transformer IO_TRANSFORMER = new Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable) observable).subscribeOn(Schedulers.io())
                                            .unsubscribeOn(Schedulers.io())
                                            .observeOn(Schedulers.io());
        }
    };
    public static final <T> Observable.Transformer<T, T> applySchedulers(Observable.Transformer transformer){
        return (Observable.Transformer<T, T>)transformer;
    }
    Action1<Integer> onNext = null;
    String[] items = { "item1", "item2", "item3" };
    Subscription subscription = Observable.from(items)
                                          .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
                                          .map(new Func1<String, Integer>() {
                                                      @Override public Integer call(String s) {
                                                          return Integer.valueOf(s);
                                                      }
                                                  })
                                          .subscribe(onNext);
    //TODO subscription.unsubscribe();   
    
    //2.X
    public static final ObservableTransformer IO_TRANSFORMER = new ObservableTransformer() {
        @Override public ObservableSource apply(Observable upstream) {
            return upstream.subscribeOn(Schedulers.io())
                           .unsubscribeOn(Schedulers.io())
                           .observeOn(Schedulers.io());
        }
    };
    public static final <T> ObservableTransformer<T, T> applySchedulers(ObservableTransformer transformer){
        return (ObservableTransformer<T, T>)transformer;
    }
    Consumer<Integer> onNext = null;
    String[] items = { "item1", "item2", "item3" };
    Disposable disposable = Observable.fromArray(items)
                                      .compose(RxUtil.<String>applySchedulers(IO_TRANSFORMER))
                                      .map(new Function<String, Integer>() {
                                                  @Override public Integer apply(String s) throws Exception {
                                                      return Integer.valueOf(s);
                                                  }
                                              })
                                      .subscribe(onNext);
    //TODO disposable.dispose();
      • .subscribe(...)返回值的变化:1.X为Subscription, 2.X为Disposable
      • Transformer的变化:1.X为rx.Observable内部的Transformer接口, 继承自Func1<Observable<T>, Observable<R>>, 2.X为io.reactivexObservableTransformer<Upstream, Downstream>,是一个独立的接口
      • AndroidSchedulers的变化: 1.X为rx.android.schedulers.AndroidSchedulers, 2.X为io.reactivex.android.schedulers.AndroidSchedulers
      • Func1的变化: 1.X为rx.functions.Func1, 2.X为io.reactivex.functions.Function
      • 其它重载方法见下方截图 
        1.X 
        2.X
      案例2
    //1.X
    public class AppBaseActivity extends AppCompatActivity {
        ...
        private CompositeSubscription mCompositeSubscription;
    
        protected void addSubscription(Subscription subscription) {
            if (null == mCompositeSubscription) {
                mCompositeSubscription = new CompositeSubscription();
            }
            mCompositeSubscription.add(subscription);
        }
    
        @Override protected void onDestroy() {
            if (null != mCompositeSubscription) {
                mCompositeSubscription.unsubscribe();
            }
            super.onDestroy();
        }
        ...
    }
    
    //2.X
    public class AppBaseActivity extends AppCompatActivity {
        ...
        private   CompositeDisposable mCompositeDisposable;
    
        protected void addDisposable(Disposable disposable) {
            if (null == mCompositeDisposable) {
                mCompositeDisposable = new CompositeDisposable();
            }
            mCompositeDisposable.add(disposable);
        }
    
        @Override protected void onDestroy() {
            if (null != mCompositeDisposable) {
                mCompositeDisposable.clear();
            }
            super.onDestroy();
        }
        ...
    }
  • 相关阅读:
    Hadoop Avro支持多输入AvroMultipleInputs
    Java LinqCollection 仿Linq的list常用函数
    json转成java对象
    symbol lookup error: /lib64/libpango-1.0.so.0: undefined symbol: g_log_structured_standard 错误
    CentOS 6&7安装ffmpeg
    MySQL的ibdata1文件占用过大瘦身
    Centos下磁盘管理的常用命令记录(如查找大文件)
    EasyPOI 教程以及完整工具类的使用
    github最火的springboot开源学习资料
    微信机器人
  • 原文地址:https://www.cnblogs.com/zhujiabin/p/8183827.html
Copyright © 2011-2022 走看看