zoukankan      html  css  js  c++  java
  • Android 使用RxJava实现一个发布/订阅事件总线

    1.简单介绍

    1.1.发布/订阅事件主要用于网络请求的回调。

      事件总线可以使Android各组件之间的通信变得简单,而且可以解耦。

      其实RxJava实现事件总线和EventBus比较类似,他们都依据与观察者模式。

      个人比较习惯用RxJava来实现,因为非常简单而清晰。

      

    1.2.当然EventBus实现总线的方式也有很多人用。

      这里给个传送门==>EventBus的github地址:https://github.com/greenrobot/EventBus

      然后Otto实现总线也不错==>Otto的github地址:https://github.com/square/otto

    1.3.使用RxJava的好处以及注意点

      最明显的好处就是:项目体积缩小了。

      注意:使用RxLifecycle来解决RxJava内存泄漏的问题。

      ==>参考我的另一篇博客:RxLifecycle第三方库的使用。

      

    1.4.理解一下观察者模式。

      这是一种行为模式。

      当你的类或者主对象(称为被观察者)的状态发生改变就会通知所有对此感兴趣的类或对象(称为观察者)。

      详情了解请参考这篇文章:观察者模式--千军万马穿云箭。

    1.5.理解一下发布/订阅

      发布/订阅 模式的功能和观察者模式是一样的,都是完成特定事件发生后的消息通知。

      观察者模式和发布/订阅模式之间还是存在了一些差别,在发布/订阅模式中重点是发布消息,然后由调度中心

      统一调度,不需要知道具体有哪些订阅者。(这样就可以匿名)

    为什么要匿名?
    在计算机程序设计中有一个非常棒的思想叫“解耦”。你通常希望在你的设计中保持尽可能低的耦合度。
    通常情况下,你希望消息发布商能够直接了解所有需要接收消息的订阅者,
    这样,一旦“事件”或消息准备好就可以及时通知每一个订阅者。
    但是使用事件总线,发布者可以免除这种职责并实现独立性,
    因为消息发布者和消息订阅者可以相互不知道对方,只关心对应的消息,从而接触两者之间的依赖关系
    怎么实现匿名?
    提到匿名,自然而然你就会问:你是如何真正实现发布者和订阅者之间的匿名? 
    很简单,只要找到一个中间人,让这个中间人负责两方的消息调度。事件总线就是一个这样的中间人。 综上所述,事件总线就是这么简单。

    1.6.使用RxJava实现事件总线的简单案例

      案例来源:用RxJava实现事件总线-RxBus。

      github参考案例地址:https://github.com/kaushikgopal/RxJava-Android-Samples

      如下面的例子:

      我们从顶部片段(绿色部分)发布事件,并从底部片段(蓝色部分)监听点击事件(通过事件总线)。

      

      怎么实现这个功能呢?

      第一步自定义一个事件总线 

    public class RxBus {
     
      private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
     
      public void send(Object o) {
        _bus.onNext(o);
      }
     
      public Observable<Object> toObserverable() {
        return _bus;
      }
    }

      第二步将事件发布到总线中。

    @OnClick(R.id.btn_demo_rxbus_tap)
    public void onTapButtonClicked() {
     
        _rxBus.send(new TapEvent());
    }

      第三步监听来自其他组件或服务事件

    _rxBus.toObserverable()
        .subscribe(new Action1<Object>() {
          @Override
          public void call(Object event) {
     
            if(event instanceof TapEvent) {
              _showTapText();
     
            }else if(event instanceof SomeOtherEvent) {
              _doSomethingElse();
            }
          }
        });

    1.7.本篇文章的参考文献

      Android RxJava实现RxBus。

      Android基于RxJava、RxAndroid的EventBus实现。

      用RxJava实现事件总线-RxBus。


    2.封装好的总线类

    2.1.RxJava1.x的总线实现方式

    /**
     * desc   : 利用 PublishSubject的特性:与普通的Subject不同,在订阅时并不立即触发订阅事件,
     * 而是允许我们在任意时刻手动调用onNext(),onError(),onCompleted来触发事件。
     */
    public class RxBus {
    
        private ConcurrentHashMap<Object, List<Subject>> subjectMapper = new ConcurrentHashMap<>();
    
        private RxBus() {
    
        }
    
        private static class Holder {
            private static RxBus instance = new RxBus();
        }
    
        public static RxBus getInstance() {
            return Holder.instance;
        }
    
        public <T> Observable<T> register(@NonNull Class<T> clz) {
            return register(clz.getName());
        }
    
        public <T> Observable<T> register(@NonNull Object tag) {
            List<Subject> subjectList = subjectMapper.get(tag);
            if (null == subjectList) {
                subjectList = new ArrayList<>();
                subjectMapper.put(tag, subjectList);
            }
    
            Subject<T, T> subject = PublishSubject.create();
            subjectList.add(subject);
    
            //System.out.println("注册到rxbus");
            return subject;
        }
    
        public <T> void unregister(@NonNull Class<T> clz, @NonNull Observable observable) {
            unregister(clz.getName(), observable);
        }
    
        public void unregister(@NonNull Object tag, @NonNull Observable observable) {
            List<Subject> subjects = subjectMapper.get(tag);
            if (null != subjects) {
                subjects.remove(observable);
                if (subjects.isEmpty()) {
                    subjectMapper.remove(tag);
                    //System.out.println("从rxbus取消注册");
                }
            }
        }
    
        public void post(@NonNull Object content) {
            post(content.getClass().getName(), content);
        }
    
        public void post(@NonNull Object tag, @NonNull Object content) {
            List<Subject> subjects = subjectMapper.get(tag);
            if (!subjects.isEmpty()) {
                for (Subject subject: subjects) {
                    subject.onNext(content);
                }
            }
        }
    }
    几个关键方法: 
    register —— 由tag,生成一个subject List,同时利用PublishSubject创建一个Subject并返回,
    它同时也是Observable的子类。
    unregister —— 移除tag对应subject List 中的Observable。若subject List为空,也将被移除。
    post —— 遍历tag对应subject List 中的Subject,执行onNext()。
    这里实际执行的是观察者Observer的onNext(),
    Subject的定义:
    public abstract class Subject<T, R> extends Observable<R> implements Observer<T>。

      测试代码:

    /*
    rxbus
     */
    Observable<String> observable = RxBus.getInstance().register(String.class);
    observable.map(s -> {
        try {
            int v = Integer.valueOf(s);
            System.out.println("map变换成功, source = " + s);
            return v;
        } catch (Exception e) {
            System.out.println("map变换失败, source = " + s);
            return s;
        }
    }).subscribe(value -> {
        System.out.println("订阅 " + value);
    });
    
    RxBus.getInstance().post("888");
    RxBus.getInstance().post("发发发");
    RxBus.getInstance().unregister(String.class, observable);
    //这里比较有意思的是,使用了lambda表达式。
    //在map变换时,如果将字符串转成Integer,没有问题就返回整型;
    //若报异常,就返回String型。
    //同样的,在最终订阅时,value参数的类型也是由map变换来决定的。

    2.2.RxJava2.0总线实现类

      因为在RxJava2.0之后,io.reactivex.Observable中没有进行背压处理了。

      如果有大量消息堆积在总线中来不及处理会产生OutOfMemoryError。

      有新类io.reactivex.Flowable专门针对背压问题。

      无背压处理的Observable实现,跟RxJava1.0x中一样,使用PublishSubject来实现。

      要实现有背压的2.0x版,使用FlowableProcessor的子类PublishProcessor来产生Flowable。

      

      源代码如下:

    public class RxBus {
    
        private final FlowableProcessor<Object> mBus;
    
        private RxBus() {
            mBus = PublishProcessor.create().toSerialized();
        }
    
        private static class Holder {
            private static RxBus instance = new RxBus();
        }
    
        public static RxBus getInstance() {
            return Holder.instance;
        }
    
        public void post(@NonNull Object obj) {
            mBus.onNext(obj);
        }
    
        public <T> Flowable<T> register(Class<T> clz) {
            return mBus.ofType(clz);
        }
    
        public void unregisterAll() {
            //会将所有由mBus 生成的 Flowable 都置  completed 状态  后续的 所有消息  都收不到了
            mBus.onComplete();
        }
    
        public boolean hasSubscribers() {
            return mBus.hasSubscribers();
        }
    
    }

      测试代码:

    Flowable<Integer> f1 = RxBus.getInstance().register(Integer.class);
    f1.subscribe(value -> System.out.println("订阅f1消息 .. " + value));
    RxBus.getInstance().post(999);


    3.实际项目调用方式

    3.1.首先自定义一个RxBus。

      这个类感觉有点像工具类。和其他函数没有任何耦合关系。

      这个类见在上面2中封装好的RxBus类。

    3.2.在BaseListFragment实现了LazyLoadFragment中的抽象函数。

      这里解释一下:

      BaseListFragment是一个可以刷新可以加载更多的一个碎片。

      LazyLoadFragment是一个懒加载的被BaseListFragmetn继承的一个基类。

      LazyLoadFragment通过判断是否可见的函数setUserVisibleHint执行了一个抽象函数fetchData()。

      adapter是页面内容的一个适配器。

      然后在BaseListFragment中重写这个抽象函数。

     @Override
        public void fetchData() {
            observable = RxBus.getInstance().register(BaseListFragment.TAG);
            observable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    adapter.notifyDataSetChanged();
                }
            });
        }

      observable.subscribe(new Consumer<Integer>)返回的是一个Disposable类型。

      如下面Disposable的简单使用方式。

     Disposable disposable = observable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                      //这里接收数据项
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                  //这里接收onError
                }
            }, new Action() {
                @Override
                public void run() throws Exception {
                  //这里接收onComplete。
                }
            });

    3.3.小贴士

      RxBus的注册与反注册一定要对应出现。

      一般在活动或者Fragment中的onStart中register这个活动或者片段的TAG(也就是一个唯一标识字符串)。

      一般在活动或者Fragment中的onDestroy中ungister这个活动或者片段的TAG。

      post用于传递消息,看情况调用呗。

      



    既然选择了,便不顾风雨兼程。Just follow yourself.
  • 相关阅读:
    堆排序
    剑指 Offer 59
    面试题:happen-before原则和as-if-serial语义
    面试题:Redis的持久化机制是什么?各自的优缺点?
    面试题:单线程redis还这么快
    面试题:微服务理论
    wait和notify
    线程八锁
    面试题:在静态方法和非静态方法上加 Synchronized的区别
    面试题:3种线程阻塞唤醒的对比
  • 原文地址:https://www.cnblogs.com/Jason-Jan/p/8017412.html
Copyright © 2011-2022 走看看