zoukankan      html  css  js  c++  java
  • RxJava漫谈-RxAndroid使用

    RxJava在github上的地址:https://github.com/ReactiveX/RxJava

    RxAndroid在github上的地址:https://github.com/ReactiveX/RxAndroid

    本文主要介绍RxAndroid的使用,如果对于RxJava还不熟悉的可以先看一下RxJava的介绍文章。

    Android的程序是用Java书写的,Android也有一些自己的线程模型,例如AsyncTask和Handler等。RxJava正是结合了前面的这几项,在此基础上推出了RxAndroid。下面介绍使用。

    首先,我们在项目中引入RxAndroid,主要是在gradle脚本中引入下面两句话即可。

    dependencies {
        compile fileTree(dir: 'libs', include: ['*.jar'])
        testCompile 'junit:junit:4.12'
        compile 'com.android.support:appcompat-v7:23.1.1'
        compile 'com.android.support:design:23.1.1'
        //引入RxAndroid----begin
        compile 'io.reactivex:rxandroid:1.1.0'
        compile 'io.reactivex:rxjava:1.1.0'
        //引入RxAndroid----end
    }

    这样就可以在Android代码中使用RxAndroid了,下面的例子展示了一段使用RxAndroid书写的代码:

    Observable.just("one", "two", "three", "four", "five")
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(/* an Observer */);

    这一段代码是在RxAndroid的官网上写的一段示例的代码。这里可以看出来,与RxJava相比较,其主要增加了如下的几个java文件:

        AndroidSchedulers
        BuildConfig
        HandlerScheduler
        MainThreadSubscription
        RxAndroidPlugins
        RxAndroidSchedulersHook

    下面分别对这几个文件的源码看一下:

    1.AndroidSchedulers类的源码

    public final class AndroidSchedulers {
        private AndroidSchedulers() {
            throw new AssertionError("No instances");
        }
    
        private static class MainThreadSchedulerHolder {
            static final Scheduler MAIN_THREAD_SCHEDULER =
                    new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
    
        public static Scheduler mainThread() {
            Scheduler scheduler =
                    RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
            return scheduler != null ? scheduler : MainThreadSchedulerHolder.MAIN_THREAD_SCHEDULER;
        }
    }

    这个类主要提供了MainThread调度器,在RxAndroid中需要在主线程中处理的食物都需要用到这个类的mainThread。

    2.BuildConfig类的源码:

    public final class BuildConfig {
      public static final boolean DEBUG = false;
      public static final String APPLICATION_ID = "rx.android";
      public static final String BUILD_TYPE = "release";
      public static final String FLAVOR = "";
      public static final int VERSION_CODE = -1;
      public static final String VERSION_NAME = "";
    }

    主要是一些常量配置。

    3.HandlerScheduler类的源码:

    public final class HandlerScheduler extends Scheduler {
    
        public static HandlerScheduler from(Handler handler) { //从一个Handler中创建一个Scheduler
            if (handler == null) throw new NullPointerException("handler == null");
            return new HandlerScheduler(handler);
        }
    
        private final Handler handler;
    
        HandlerScheduler(Handler handler) {
            this.handler = handler;
        }
    
        @Override
        public Worker createWorker() {//覆盖Scheduler的createWorker函数,创建基于Handler的Worker
            return new HandlerWorker(handler);
        }
    
        static class HandlerWorker extends Worker {
    
            private final Handler handler;
    
            private final CompositeSubscription compositeSubscription = new CompositeSubscription();
    
            HandlerWorker(Handler handler) {
                this.handler = handler;
            }
    
            @Override
            public void unsubscribe() {
                compositeSubscription.unsubscribe();
            }
    
            @Override
            public boolean isUnsubscribed() {
                return compositeSubscription.isUnsubscribed();
            }
    
            @Override
            public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {//覆盖Worker的调度函数schedule
                if (compositeSubscription.isUnsubscribed()) {
                    return Subscriptions.unsubscribed();
                }
    
                action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action);
    
                final ScheduledAction scheduledAction = new ScheduledAction(action);
                scheduledAction.addParent(compositeSubscription);
                compositeSubscription.add(scheduledAction);
    
                handler.postDelayed(scheduledAction, unit.toMillis(delayTime));//使用Handler处理这个调度动作ScheduleAction
    
                scheduledAction.add(Subscriptions.create(new Action0() {
                    @Override
                    public void call() {
                        handler.removeCallbacks(scheduledAction);//这句话保证当调度动作被取消的时候,能够及时把这个action从Handler中移除
                    }
                }));
    
                return scheduledAction;
            }
    
            @Override
            public Subscription schedule(final Action0 action) {
                return schedule(action, 0, TimeUnit.MILLISECONDS);
            }
        }
    }
    HandlerScheduler 类就是使用Handler作为处理核心的Scheduler类。

    4.MainThreadSubscription类的源码:

    public abstract class MainThreadSubscription implements Subscription {
    
      public static void verifyMainThread() { //静态方法,判断当前线程是否是主线程
        if (Looper.myLooper() != Looper.getMainLooper()) {
          throw new IllegalStateException(
              "Expected to be called on the main thread but was " + Thread.currentThread().getName());
        }
      }
    
      private final AtomicBoolean unsubscribed = new AtomicBoolean();
    
      @Override public final boolean isUnsubscribed() {
        return unsubscribed.get();
      }
    
      @Override public final void unsubscribe() {//主线程的取消订阅
        if (unsubscribed.compareAndSet(false, true)) {
          if (Looper.myLooper() == Looper.getMainLooper()) {//如果是主线程直接进行
            onUnsubscribe();
          } else {
            AndroidSchedulers.mainThread().createWorker().schedule(new Action0() {//如果不是主线程,就创建创建一个Action放到主线程中去执行
              @Override public void call() {
                onUnsubscribe();
              }
            });
          }
        }
      }
    
      protected abstract void onUnsubscribe();
    }
    MainThreadSubscription类主要在意的就是unsubscribe的执行线程,这里采取一切方式保证其在主线程中执行。

    5.RxAndroidPlugins类的源码:

    public final class RxAndroidPlugins {//这个类的主要作用就是维护了一个RxAndroidSchedulersHook
        private static final RxAndroidPlugins INSTANCE = new RxAndroidPlugins();
    
        public static RxAndroidPlugins getInstance() {
            return INSTANCE;
        }
    
        private final AtomicReference<RxAndroidSchedulersHook> schedulersHook =
                new AtomicReference<RxAndroidSchedulersHook>();
    
        RxAndroidPlugins() {
        }
    
        @Beta
        public void reset() {
            schedulersHook.set(null);
        }
    
        public RxAndroidSchedulersHook getSchedulersHook() {
            if (schedulersHook.get() == null) {
                schedulersHook.compareAndSet(null, RxAndroidSchedulersHook.getDefaultInstance());//如果原来是null,就设置一个RxAndroidSchedulersHook
                // We don't return from here but call get() again in case of thread-race so the winner will
                // always get returned.
            }
            return schedulersHook.get();
        }
    
        public void registerSchedulersHook(RxAndroidSchedulersHook impl) {
            if (!schedulersHook.compareAndSet(null, impl)) {//如果原来的RxAndroidSchedulerHook是空,则直接持有,否则抛出异常
                throw new IllegalStateException(
                        "Another strategy was already registered: " + schedulersHook.get());
            }
        }
    }

    可以看出RxAndroidSchedulerHook必须在使用前注册,一旦使用就不能再注册了。

    6.RxAndroidSchedulersHook类的源码:

    public class RxAndroidSchedulersHook {
        private static final RxAndroidSchedulersHook DEFAULT_INSTANCE = new RxAndroidSchedulersHook();
    
        public static RxAndroidSchedulersHook getDefaultInstance() {
          return DEFAULT_INSTANCE;
        }
    
        public Scheduler getMainThreadScheduler() {
            return null;
        }
    
        public Action0 onSchedule(Action0 action) {
            return action;
        }
    }

    这是RxAndroid提供的一个默认的RxAndroidSchedulerHook类,程序员也可以自己定义一个这样的类注册到RxAndroidPlugins中,但是必须在使用RxAndroidPlugins之前注册。

    自定义的RxAndroidSchedulerHook类可以覆盖onSchedule函数,在这里进行一些处理,例如日志记录等。

    以上只是说明了RxAndroid与RxJava中不一样的地方,并没有尝试说明RxJava是什么,在阅读本文之前,读者应该先弄明白这个问题。

    现在再来看之前的两个例子就很明白了:

    public class ReactiveFragment extends Fragment {//在UI线程中的例子
        @Override
        public void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            Observable.just("one", "two", "three", "four", "five")
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(/* an Observer */);
        }
    new Thread(new Runnable() {//在其他线程中的例子
        @Override
        public void run() {
            final Handler handler = new Handler(); //绑定到这个线程的Handler
            Observable.just("one", "two", "three", "four", "five")
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(HandlerScheduler.from(handler))
                    .subscribe(/* an Observer */)
    
            // perform work, ...
        }
    }, "custom-thread-1").start();
  • 相关阅读:
    HDU 4396
    Vijos1603 迷宫
    BZOJ1087 [SCOI2005] 互不侵犯King
    BZOJ2208 [JSOI2010] 连通数
    BZOJ1051 [HAOI2006] 受欢迎的牛
    BZOJ2751 [HAOI2012] 容易题(easy)
    BZOJ1015 [JSOI2008] 星球大战starwar
    BZOJ1012 [JSOI2008] 最大数maxnumber
    BZOJ1050 [HAOI2006] 旅行comf
    BZOJ2761 [JLOI2011] 不重复数字
  • 原文地址:https://www.cnblogs.com/yakovchang/p/rxjava_rxandroid.html
Copyright © 2011-2022 走看看