zoukankan      html  css  js  c++  java
  • 观察者模式与Guava EventBus

    观察者模式

    结构图

    代码实现

    public abstract class Subject {
    
        private List<Observer> observerList = new ArrayList<Observer>();
    
        /**
         * 注册观察者
         * @param observer
         */
        public void register(Observer observer) {
            observerList.add(observer);
        }
    
        /**
         * 注销观察者
         *
         * @param observer
         */
        public void unregister(Observer observer) {
            observerList.remove(observer);
        }
    
        /**
         * 通知观察者更新
         */
        public void post() {
            for (Observer observer : observerList) {
                observer.update();
            }
        }
    
        /**
         * 获取被通知事件
         *
         * @return
         */
        public abstract Object getEvent();
    }
    
    
    public class ConcreteSubject1 extends Subject {
    
        /** 个性化的定制内容 */
        private String subjectState;
    
        public ConcreteSubject1(String subjectState) {
            this.subjectState = subjectState;
        }
    
        @Override
        public Object getEvent() {
            System.out.println("Custom ConcreteSubject1");
            return subjectState;
        }
    }
    
    
    public class ConcreteSubject2 extends Subject {
    
        private int subjectState;
    
        public ConcreteSubject2(int subjectState) {
            this.subjectState = subjectState;
        }
    
        @Override
        public Object getEvent() {
            System.out.println("Custom ConcreteSubject2");
            return subjectState;
        }
    }
    
    
    public abstract class Observer {
    
        /** 用于观察者获取被通知的事件 */
        protected Subject subject;
    
        /**
         * 用于给Subject通知时调用的更新方法
         */
        public abstract void update();
    }
    
    
    public class ConcreteObserver1 extends Observer {
    
        public ConcreteObserver1(Subject subject) {
            this.subject = subject;
        }
    
        @Override
        public void update() {
            System.out.println("Subject " + subject.getEvent() + " ConcreteObserver1");
        }
    }
    
    
    public class ConcreteObserver1 extends Observer {
    
        public ConcreteObserver1(Subject subject) {
            this.subject = subject;
        }
    
        @Override
        public void update() {
            System.out.println("Subject " + subject.getEvent() + " ConcreteObserver1");
        }
    }
    
    
    public class ConcreteObserver1 extends Observer {
    
        public ConcreteObserver1(Subject subject) {
            this.subject = subject;
        }
    
        @Override
        public void update() {
            System.out.println("Subject " + subject.getEvent() + " ConcreteObserver1");
        }
    }

    输出:

    Custom ConcreteSubject1
    Subject Sub1 ConcreteObserver1
    Custom ConcreteSubject1
    Subject Sub1 ConcreteObserver2

    EventBus简单示例

    EventBus是Guava提供的消息发布-订阅类库,它的工作机制类似于观察者模式,通过通知者去注册观察者,最后由通知者想观察者发布消息,示例代码

    public class MsgCenter {
    
        /** EventBus的定位跟接近于消息中心,而他的post()方法跟接近于一个自定义的Subject */
        public static EventBus eventBus = new EventBus();
    }
    
    
    public class Observer1 {
    
        /**
         * 只有通过@Subscribe注解的方法才会被注册进EventBus
         * 而且方法有且只能有1个参数
         * @param msg
         */
        @Subscribe
        public void ob1Mthod1(String msg) {
            System.out.println(msg + " test1!");
        }
    
        @Subscribe
        public void ob1Method2(String msg) {
            System.out.println(msg + " test2!");
        }
    }
    
    
    public class Observer2 {
        @Subscribe
        public void ob2Method1(String msg) {
            System.out.println(msg + " test3!");
        }
    
        // 错误的基本型参数
        // @Subscribe
        // public void ob2Method2(int msg) {
        // System.out.println(msg + " test4!");
        // }
        /**
         * post() 不支持自动装箱功能,只能使用Integer,不能使用int,否则handlersByType的Class会是int而不是Intege
         * 而传入的int msg参数在post(int msg)的时候会被包装成Integer,导致无法匹配到
         */
        @Subscribe
        public void ob2Method2(Integer msg) {
            System.out.println(msg + " test4!");
        }
    }
    
    
    
    public class Test {
    
        public static void main(String[] args) throws InterruptedException {
            EventBus eventBus = new EventBus();
            Observer1 observer1 = new Observer1();
            Observer2 observer2 = new Observer2();
    
            eventBus.register(observer1);
            eventBus.register(observer2);
    
            // 只有注册的参数类型为String的方法会被调用
            eventBus.post("post string method");
    
            // 注销observer2
            eventBus.unregister(observer2);
            eventBus.post("post string method after unregister");
        }
    }

    输出

    post string method test2!
    post string method test1!
    post string method test3!
    post string method after unregister test2!
    post string method after unregister test1!

    实际上EventBus要表达的意图很简单,就是将post(Object arg)这里的arg当做参数传入到已注册的方法(被@Subscribe)的方法里,并调用该方法,所以当post(String)的时候,调用的参数类型为String的注册方法,当post(int)的时候,调用则是参数类型为Integer的注册方法

    EventBus的实现方式

    eventbus的实现方式实际上类似于上例写的简单的观察者模式,不同点在于它实现了泛化的注册方法以及泛化的方法调用,另外还考虑到了多线程的问题,对多线程使用时做了一些优化

    register(Object listener)

    register()方法注册一个任意类型的实例并将其使用了@Subscribe的方法注册到一个Map中,这个Map使用方法的参数类型Class为Key,值为一个Set,这个Set包含了所有参数类型为Key的EventHandler,Eventhandler是EventBus定义的一个数据结构,由listener(方法拥有者instance,例如上例中的observer1)和这个listener的@Subscribe方法构成

    ]这个Map的结构示意图如下

    其实现代码如下

      /**
       * Registers all handler methods on {@code object} to receive events.
       * Handler methods are selected and classified using this EventBus's
       * {@link HandlerFindingStrategy}; the default strategy is the
       * {@link AnnotatedHandlerFinder}.
       *
       * @param object  object whose handler methods should be registered.
       */
      public void register(Object object) {
        handlersByType.putAll(finder.findAllHandlers(object));
      }
    
    
    
      /**
       * {@inheritDoc}
       *
       * This implementation finds all methods marked with a {@link Subscribe} annotation.
       */
      @Override
      public Multimap<Class<?>, EventHandler> findAllHandlers(Object listener) {
        Multimap<Class<?>, EventHandler> methodsInListener = HashMultimap.create();
        Class<?> clazz = listener.getClass();
        Set<? extends Class<?>> supers = TypeToken.of(clazz).getTypes().rawTypes();
    
        for (Method method : clazz.getMethods()) {
          /*
           * Iterate over each distinct method of {@code clazz}, checking if it is annotated with
           * @Subscribe by any of the superclasses or superinterfaces that declare it.
           */
          for (Class<?> c : supers) {
            try {
              Method m = c.getMethod(method.getName(), method.getParameterTypes());
              if (m.isAnnotationPresent(Subscribe.class)) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length != 1) {
                  throw new IllegalArgumentException("Method " + method
                      + " has @Subscribe annotation, but requires " + parameterTypes.length
                      + " arguments.  Event handler methods must require a single argument.");
                }
                Class<?> eventType = parameterTypes[0];
                EventHandler handler = makeHandler(listener, method);
    
                methodsInListener.put(eventType, handler);
                break;
              }
            } catch (NoSuchMethodException ignored) {
              // Move on.
            }
          }
        }
        return methodsInListener;
      }

    post(Object event)

    post(Object event)方法用于向已注册的方法传递一个参数,并以此调用参数类型为event.class的所有方法,调用的时候会使用一个ThreadLocal的Queue来进行任务分发,这样的结果就是在多线程情况下,线程间共享注册方法的Map(上面提到那个),当时在发送消息时线程会保有自己独立的一个Post任务的Queue,保证了线程执行post()方法时候的独立性而不会相互影响,下面是多线程执行post()时的示意图

    真正在执行post(Object event)的时候,会将msg与所有event.class对应的set里的所有method组合成一个EventBus.EventWithHandler对象并加入到ThreadLocal的Queue中,最后再将Queue出队依次执行这些方法,最后清空ThreadLocal的Queue,EventBus的实现代码如下

        /**
         * Posts an event to all registered handlers.  This method will return
         * successfully after the event has been posted to all handlers, and
         * regardless of any exceptions thrown by handlers.
         *
         * <p>If no handlers have been subscribed for {@code event}'s class, and
         * {@code event} is not already a {@link com.google.common.eventbus.DeadEvent}, it will be wrapped in a
         * DeadEvent and reposted.
         *
         * @param event  event to post.
         */
        @SuppressWarnings("deprecation") // only deprecated for external subclasses
        public void post(Object event) {
            Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());
    
            boolean dispatched = false;
            // 将event和所有event.class对应的方法组合成EventWithHandler并入队
            for (Class<?> eventType : dispatchTypes) {
                Set<EventHandler> wrappers = getHandlersForEventType(eventType);
    
                if (wrappers != null && !wrappers.isEmpty()) {
                    dispatched = true;
                    for (EventHandler wrapper : wrappers) {
                        enqueueEvent(event, wrapper);
                    }
                }
            }
    
            if (!dispatched && !(event instanceof DeadEvent)) {
                post(new DeadEvent(this, event));
            }
    
            dispatchQueuedEvents();
        }
    
        /**
         * Queue the {@code event} for dispatch during
         * {@link #dispatchQueuedEvents()}. Events are queued in-order of occurrence
         * so they can be dispatched in the same order.
         */
        void enqueueEvent(Object event, EventHandler handler) {
            eventsToDispatch.get().offer(new EventWithHandler(event, handler));
        }
    
        /**
         * Drain the queue of events to be dispatched. As the queue is being drained,
         * new events may be posted to the end of the queue.
         *
         * @deprecated This method should not be overridden outside of the eventbus package. It is
         *     scheduled for removal in Guava 14.0.
         */
        @Deprecated
        protected void dispatchQueuedEvents() {
            // don't dispatch if we're already dispatching, that would allow reentrancy
            // and out-of-order events. Instead, leave the events to be dispatched
            // after the in-progress dispatch is complete.
            if (isDispatching.get()) {
                return;
            }
    
            isDispatching.set(true);
            try {
                while (true) {
                    EventWithHandler eventWithHandler = eventsToDispatch.get().poll();
                    if (eventWithHandler == null) {
                        break;
                    }
    
                    dispatch(eventWithHandler.event, eventWithHandler.handler);
                }
            } finally {
                isDispatching.set(false);
            }
        }

    斜体加粗部分即为关键部分

    EventBus多线程使用示例

    public class Test2 {
    
        public static void main(String[] args) throws InterruptedException {
    
            Thread t1 = new Thread() {
    
                @Override
                public void run() {
                    System.out.println(Thread.currentThread());
                    System.out.println("start");
                    Observer1 observer1 = new Observer1();
                    MsgCenter.eventBus.register(observer1);
                    MsgCenter.eventBus.post("post string");
                    System.out.println("end");
                    System.out.println();
                }
            };
    
            Thread t2 = new Thread() {
    
                @Override
                public void run() {
                    System.out.println(Thread.currentThread());
                    System.out.println("start");
                    Observer2 observer2 = new Observer2();
                    MsgCenter.eventBus.register(observer2);
                    MsgCenter.eventBus.post("post string2");
                    System.out.println("end");
                    System.out.println();
                }
            };
    
            ExecutorService exec = Executors.newFixedThreadPool(2);
    
            exec.execute(t1);
    
            // 为何忽略多线程是run()方法的线程安全问题,让两个任务分开执行
            TimeUnit.MILLISECONDS.sleep(500);
    
            exec.execute(t2);
    
            exec.shutdown();
        }
    }

    输出

    Thread[pool-1-thread-1,5,main]
    start
    post string test1!
    post string test2!
    end

    Thread[pool-1-thread-2,5,main]
    start
    post string2 test1!
    post string2 test2!
    post string2 test3!
    end

    这里为了让执行结果更清晰,并没有让两个线程并发执行,但可以清楚看到他们是共享同一个注册方法的Map的,而由于post()分发消息时间的Queue是ThreadLocal的,这个Queue由每个线程独有

    对于这里使用ThreadLocal的Queue的个人理解就是假如不使用ThreadLocal,共享同一个队列,就有可能由Thread1入队的EventWithHandler会由Thread2来执行,而由Thread2入队的EventWithHandler又有可能会由Thread1来执行,而造成执行秩序混乱.

    而使用ThreadLocal的Queue,则这些EventWithHandler示例是由哪个线程入队就由哪个线程执行,同时不需要去考虑共享队列的入队和出队时的线程安全问题,也可以提升效率

  • 相关阅读:
    Redis学习
    MySQL索引
    细数 Java 线程池的原理
    红黑树学习
    HashMap学习
    Java集合框架
    Java性能优化的45个细节
    MyBatis理解
    jenkins+git+maven+tomcat+jdk本地部署windows版
    windows版docker安装nginx,并设置目录挂载
  • 原文地址:https://www.cnblogs.com/zemliu/p/3313782.html
Copyright © 2011-2022 走看看