zoukankan      html  css  js  c++  java
  • [从源码学设计]蚂蚁金服SOFARegistry之消息总线

    [从源码学设计]蚂蚁金服SOFARegistry之消息总线

    0x00 摘要

    SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。

    本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。

    本文为第四篇,介绍SOFARegistry之消息总线。

    0x01 相关概念

    1.1 事件驱动模型

    事件驱动模型,也即是我们通常说的观察者。基于发布-订阅模式的编程模型。

    1.1.1 概念

    定义对象间的一种一对多的依赖关系,当一个对象的状态发生变化时,所有依赖它的对象都得到通知并自动更新。

    从程序设计的角度来看,事件驱动模型的核心构件通常包含以下几个:

    • 事件源:负责产生事件的对象。比如我们常见的按钮,按钮就是一个事件源,能够产生“点击”这个事件
    • 事件监听器(事件处理器):负责处理事件的对象
    • 事件:或者称为事件对象,是事件源和事件监听器之间的信息桥梁。是整个事件模型驱动的核心

    1.1.2 应用环境

    当我们面对如下的环境时,事件驱动模型通常是一个好的选择:

    • 程序中有许多任务;
    • 任务之间高度独立(因此它们不需要互相通信,或者等待彼此);
    • 在等待事件到来时,某些任务会阻塞;

    1.2 消息总线

    总线(Bus)一般指计算机各种功能部件之间传送信息的公共通信干线,而EventBus则是事件源(publisher)向订阅方(subscriber)发送订阅事件的总线,它解耦了观察者模式中订阅方和事件源之间的强依赖关系

    消息总线扮演着一种消息路由的角色,拥有一套完备的路由机制来决定消息传输方向。发送端只需要向消息总线发出消息而不用管消息被如何转发,为了避免消息丢失,部分消息总线提供了一定的持久化存储和灾备的机制

    消息总线简单理解就是一个消息中心,众多微服务实例可以连接到总线上,实例可以往消息中心发送或接收信息(通过监听)。

    一般的应用的场景就是在用观察者模式的地方就可以用EventBus进行替代。

    img

    0x02 业务领域

    2.1 业务范畴

    DataServer 本质上是一个网络应用程序,所以有如下特点:

    • 需要处理各个方面发送来的消息;
    • 程序中任务繁多,任务之间独立,大多数任务不存在互斥通讯等操作;在等待事件到来时,某些任务会阻塞;
    • 某一个消息往往有多个投递源;

    因此天然适合用事件驱动机制来实现。

    2.2 问题点

    能够想到的问题点如下:

    • 因为一个事件往往会有多个投递源,如何解耦事件投递和事件处理之间的逻辑?
    • 怎样实现Listener一次注册,就能够知道Listener对那些事件感兴趣的,进而在有某类事件发生时通知到Listener的呢?
    • 如何使得一个Listener可以处理多个事件?
    • 如何使得一个事件被多个Listener处理?
    • 可否简化注册流程?
    • 是否需要维护消息顺序?
    • 处理消息方式是异步还是同步?
    • 多个同样消息是否要归并?

    具体我们在后文会详述阿里的思路。

    2.3 解决方案

    DataServer 内部逻辑主要是通过事件驱动机制来实现的,下图列举了部分事件在事件中心的交互流程,从图中可以看到,一个事件往往会有多个投递源,非常适合用 EventCenter 来解耦事件投递和事件处理之间的逻辑;

    0x03 EventCenter

    业界消息总线有很多,比如 Android EventBus是一个发布/订阅事件总线框架,基于观察者模式,将事件的接收者和发送者分开,简化了组件之间的通信。

    而SOFARegistry EventCenter 的作用也类似:从逻辑上解耦,将事件的接收者和发送者分开,简化组件之间通信。阿里的实现有自己的特点,开发者可以借鉴这里的使用技巧和思路。

    3.1 目录结构

    ├── event
    │   ├── AfterWorkingProcess.java
    │   ├── DataServerChangeEvent.java
    │   ├── Event.java
    │   ├── EventCenter.java
    │   ├── LocalDataServerChangeEvent.java
    │   ├── MetaServerChangeEvent.java
    │   ├── RemoteDataServerChangeEvent.java
    │   ├── StartTaskEvent.java
    │   ├── StartTaskTypeEnum.java
    │   └── handler
    │       ├── AbstractEventHandler.java
    │       ├── AfterWorkingProcessHandler.java
    │       ├── DataServerChangeEventHandler.java
    │       ├── LocalDataServerChangeEventHandler.java
    │       ├── MetaServerChangeEventHandler.java
    │       └── StartTaskEventHandler.java
    

    3.2 类定义

    类定义如下:

    public class EventCenter {
    
        private Multimap<Class<? extends Event>, AbstractEventHandler> MAP = ArrayListMultimap.create();
    
        /**
         * eventHandler register
         * @param handler
         */
        public void register(AbstractEventHandler handler) {
            List<Class<? extends Event>> interests = handler.interest();
            for (Class<? extends Event> interest : interests) {
                MAP.put(interest, handler);
            }
        }
    
        /**
         * event handler handle process
         * @param event
         */
        public void post(Event event) {
            Class clazz = event.getClass();
            if (MAP.containsKey(clazz)) {
                Collection<AbstractEventHandler> handlers = MAP.get(clazz);
                if (handlers != null) {
                    for (AbstractEventHandler handler : handlers) {
                        handler.handle(event);
                    }
                }
            } else {
                throw new RuntimeException("no suitable handler was found:" + clazz);
            }
        }
    }
    

    3.2.1 操作

    普通 EventBus 大多有三个操作:

    • 注册 Listener--register (Object Listener);
    • 注销 Listener--unregister (Object Listener);
    • 发布 Event--post (Object event);

    但是阿里的EventCenter并没有注销操作,因为业务上不需要,所以只有如下接口。

    • register(AbstractEventHandler handler) 的工作就是找出这个Listener对哪些事件感兴趣,然后把这种事件类型和对应的Listener注册到 EventCenter;
    • post一个event时候,会遍历这个消息的处理函数列表,逐一调用处理函数,其实就是同步执行了,当然也许 EventHandler 内部自己实现了异步;因为是同步执行,所以不需要维持消息的有序性,否则需要使用queue来实现每个线程post的Event是有序的;

    具体使用举例如下:在MetaServerChangeEventHandler中有如下代码投放消息。

    eventCenter.post(new StartTaskEvent(set));
    
    eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,
            DataServerChangeEvent.FromType.REGISTER_META));
    

    3.2.2 执行 & 解耦

    handler中声明了自己支持什么种类的event,当register时候,会以event为key,把自己注册到eventCenter的map中,在 post 函数中,根据event的class,取出了handler,从而执行,也做到了解耦。

    3.2.3 Listener列表

    在观察者模式中,事件源中会维护一个Listener的列表,而且向这个事件源注册的Listener一般只会收到一类事件的通知,如果Listener对多个不同类的事件感兴趣,则需要向多个事件源注册。

    EventCenter 是怎样实现Listener一次注册,能够知道Listener对那些事件感兴趣的,进而在有某类事件发生时通知到Listener的呢

    答案在ArrayListMultimap,其key是Event,其 Value 就是 AbstractEventHandler。这个 map 就是 Event 事件类型 和对其感兴趣的处理函数的列表,一个 Event 可能有多个处理函数。

    3.2.4 ArrayListMultimap

    顾名思义,com.google.common.collect.ArrayListMultimap 可以在key对应的value中设置一个ArrayList。这样就保证了一个事件可以有多个处理函数

    具体可以见下例子。

    import com.google.common.collect.ArrayListMultimap;
    import com.google.common.collect.Multimap;
    
    import java.util.Collection;
    
    public class testArrayListMultimap {
         static void main() {
            Multimap<String, String> multimap = ArrayListMultimap.create();
            multimap.put("fruit", "banana");
            multimap.put("fruit", "apple");
            multimap.put("fruit", "apple");
            multimap.put("fruit", "peach");
            multimap.put("fish","crucian");
            multimap.put("fish","carp");
    
            System.err.println(multimap.size());//6
            Collection<String> fruits = multimap.get("fruit");
            System.err.println(fruits);//[bannana, apple, apple, peach]
        }
    }
    
    

    3.3 Listener

    Listener 是由 AbstractEventHandler 的派生类实现的。

    3.3.1 基类

    EventHandler基类AbstractEventHandler定义具体如下:

    public abstract class AbstractEventHandler<Event> implements InitializingBean {
    
        @Autowired
        private EventCenter         eventCenter;
    
        @Override
        public void afterPropertiesSet() throws Exception {
            eventCenter.register(this);
        }
    
        /**
         * event handle func
         * @param event
         */
        public void handle(Event event) {
                doHandle(event);
        }
    
        public abstract List<Class<? extends Event>> interest();
    
        public abstract void doHandle(Event event);
    }
    

    其主要作用为三点:

    • 派生类必须实现interest来声明自己想处理什么Event,而且Event是配置在一个数组中,这样就使得一个函数可以处理多个事件
    @Override
    public List<Class<? extends LocalDataServerChangeEvent>> interest() {
        return Lists.newArrayList(LocalDataServerChangeEvent.class);
    }
    
    • 派生类实现doHandle来处理消息;

    • 因为afterPropertiesSet中做了设定,所以每一个继承此类的Handler都会自动注册到EventCenter之中

    3.3.2 派生类

    以MetaServerChangeEventHandler为例,只要在interest函数中声明自己对哪些消息感兴趣,在doHandle函数中实现业务即可。

    public class MetaServerChangeEventHandler extends AbstractEventHandler<MetaServerChangeEvent> {
      
        @Override
        public List<Class<? extends MetaServerChangeEvent>> interest() {
            return Lists.newArrayList(MetaServerChangeEvent.class);
        }
      
    		@Override
        public void doHandle(MetaServerChangeEvent event) {
           ......
        }
    }
    

    3.3.2 自动注册

    这里需要专门说一下自动注册,因为初接触者很容易疏漏从而感到奇怪。

    自动注册使用的是Spring的afterPropertiesSet方法完成

    afterPropertiesSet方法可以针对某个具体的bean进行配置,其将在Bean所有的属性被初始化后调用,但是会在init前调用。afterPropertiesSet 必须实现 InitializingBean接口。

    package org.springframework.beans.factory;
    
    public interface InitializingBean {
        void afterPropertiesSet() throws Exception;
    }
    

    基类AbstractEventHandler实现InitializingBean接口。

    public abstract class AbstractEventHandler<Event> implements InitializingBean
    

    而每一个派生类就注册了派生类本身到eventCenter。

    @Override
    public void afterPropertiesSet() throws Exception {
        eventCenter.register(this);
    }
    

    3.4 核心消息

    具体涉及到业务,EventCenter主要处理三种消息:

    • DataServerChangeEvent,是其他Data Server的节点变化消息;
    • MetaServerChangeEvent,是Meta Sever的变化消息;
    • StartTaskEvent:;

    分别对应三个消息处理handler:

    • public class DataServerChangeEventHandler extends AbstractEventHandler

    • public class MetaServerChangeEventHandler extends AbstractEventHandler

    • public class StartTaskEventHandler extends AbstractEventHandler

    我们用 StartTaskEvent 举例,具体消息内容根据具体业务设置。

    public class StartTaskEvent implements Event {
        private final Set<StartTaskTypeEnum> suitableTypes;
    
        public StartTaskEvent(Set<StartTaskTypeEnum> suitableTypes) {
            this.suitableTypes = suitableTypes;
        }
    
        public Set<StartTaskTypeEnum> getSuitableTypes() {
            return suitableTypes;
        }
    }
    

    3.5 主要逻辑

    EventCenter主要逻辑如下图所示:

              +------------------------------+
              | MetaServerChangeEventHandler |
              +-----------+------------------+
                          |
                          |  post(new StartTaskEvent)
                          |
                          |
                          |                                      +------------------------+
                          v                                      |  StartTaskEventHandler |
    +---------------------+-----------------------+              |                        |
    |                EventCenter                  |              | +--------------------+ |
    |                                             |              | |                    | |
    | +-----------------------------------------+ +---------------------> doHandle      | |
    | |Multimap< <Event>, AbstractEventHandler> | |              | |                    | |
    | +-----------------------------------------+ | <--------------+ afterPropertiesSet | |
    |                                             |  register    | |                    | |
    +---------------------------------------------+              | |      interest      | |
                                                                 | |                    | |
                                                                 | +--------------------+ |
                                                                 +------------------------+
    

    手机如下图:

    0x04 总结

    SOFARegistry EventCenter 的作用与业界大多总线类似:从逻辑上解耦,将事件的接收者和发送者分开,简化组件之间通信。但是阿里的实现有自己的特点,开发者可以借鉴这里的使用技巧和思路。

    针对我们前面提出的问题,现在回答如下:

    • 因为一个事件往往会有多个投递源,如何解耦事件投递和事件处理之间的逻辑?
      • 答案:handler中声明了自己支持什么种类的event,当register时候会以event为key,把自己注册到eventCenter的map中;在 post 函数中,根据event的class,取出了handler从而执行,也做到了解耦。
    • 怎样实现Listener一次注册,就能够知道Listener对那些事件感兴趣的,进而在有某类事件发生时通知到Listener的呢?
      • 答案:派生类必须实现interest来声明自己想处理什么Event;
    • 如何使得一个Listener可以处理多个事件?
      • 答案:接上问题,Event是配置在一个数组中,这样就使得一个函数可以处理多个事件
    • 如何使得一个事件被多个Listener处理?
      • 答案:采用ArrayListMultimap实现listener列表;
    • 可否简化注册流程?
      • 答案:自动注册,派生类不需要操心。afterPropertiesSet中做了设定,所以每一个继承此类的Handler都会自动注册到EventCenter之中
    • 是否需要维护消息顺序?
      • 答案:不需要,因为是同步处理;
    • 处理消息方式是异步还是同步?
      • 答案:这里是同步;
    • 多个同样消息是否要归并?
      • 答案:这里不需要归并,没有业务需求;

    0xFF 参考

    Guava中EventBus分析

    蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容

    蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路

    服务注册中心 Session 存储策略 | SOFARegistry 解析

    海量数据下的注册中心 - SOFARegistry 架构介绍

  • 相关阅读:
    STM32中管脚利用
    阻抗匹配
    无功功率与补偿
    宏的灵活应用例子
    串口接入检测与串口命令解析
    16 款最流行的JavaScript 框架
    不可错过的10个超棒jQuery表单操作代码片段
    20 个用于处理页面滚动效果的 jQuery 插件
    11 个最佳 jQuery 滚动条插件
    jQuery的搜索关键词自动匹配插件
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14058197.html
Copyright © 2011-2022 走看看