zoukankan      html  css  js  c++  java
  • [从源码学设计]蚂蚁金服SOFARegistry之延迟操作

    [从源码学设计]蚂蚁金服SOFARegistry之延迟操作

    0x00 摘要

    SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。

    本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让大家借以学习阿里如何设计。

    本文为第十七篇,介绍SOFARegistry的延迟操作。

    0x01 业务领域

    1.1 业务缘由

    为什么要有AfterWorkingProcess?

    AfterWorkingProcess 的作用是延迟操作。猜测大致是因为某些情况下,无法执行业务,只能在后续时机进行弥补。

    在官方博客有类似论述也支持我们的判断 :

    在数据未同步完成之前,所有对新节点的读数据操作,将转发到拥有该数据分片的数据节点。

    在数据未同步完成之前,禁止对新节点的写数据操作,防止在数据同步过程中出现新的数据不一致情况。

    1.2 学习方向

    可以看到类似这种业务上延迟操作应该如何实现。

    0x02 实现

    2.1 定义

    接口定义如下:

    public interface AfterWorkingProcess {
        void afterWorkingProcess();
        int getOrder();
    }
    

    2.2 配置

    这个 afterWorkProcessors 会作为 AfterWorkingProcessHandler 的成员变量进行处理。用于处理一些业务逻辑结束后的处理动作。

            @Bean(name = "afterWorkProcessors")
            public List<AfterWorkingProcess> afterWorkingProcessors() {
                List<AfterWorkingProcess> list = new ArrayList<>();
                list.add(renewDatumHandler());
                list.add(datumLeaseManager());
                list.add(disconnectEventHandler());
                list.add(notifyDataSyncHandler());
                return list;
            }
    
            @Bean
            public AfterWorkingProcessHandler afterWorkingProcessHandler() {
                return new AfterWorkingProcessHandler();
            }
    

    2.3 引擎

    这里用法比较少见。AfterWorkingProcessHandler 也是 AfterWorkingProcess 的实现类

    在其 afterWorkingProcess 函数中,会对 Bean afterWorkingProcessors 中间注册的实现类一一调用其 afterWorkingProcess 业务函数。

    其中,getOrder 会指定执行优先级,这是一个常见套路。

    public class AfterWorkingProcessHandler implements AfterWorkingProcess {
    
        @Resource(name = "afterWorkProcessors")
        private List<AfterWorkingProcess> afterWorkingProcessors;
    
        @Override
        public void afterWorkingProcess() {
    
            if(afterWorkingProcessors != null){
                List<AfterWorkingProcess> list = afterWorkingProcessors.stream().sorted(Comparator.comparing(AfterWorkingProcess::getOrder)).collect(Collectors.toList());
    
                list.forEach(AfterWorkingProcess::afterWorkingProcess);
            }
        }
    
        @Override
        public int getOrder() {
            return 0;
        }
    }
    

    2.4 调用

    只有在 DataServerCache # updateDataServerStatus 函数中有调用:

    afterWorkingProcessHandler.afterWorkingProcess();
    

    而在 DataServerCache 中有如下函数都会调用到 updateDataServerStatus:

    • synced
    • notifiedAll
    • checkAndUpdateStatus
    • addNotWorkingServer

    图示如下:

    +------------------------------------------+
    | DataServerCache                          |                                 +----------------------------------------------+
    |                                          |                                 |   AfterWorkingProcess                        |
    | synced +----------------------+          |                                 |                                              |
    |                               |          | +----------------------------+  | +------------------------------------------+ |
    |                               |          | | AfterWorkingProcessHandler |  | |renewDatumHandler.afterWorkingProcess     | |
    |                               |          | |                            |  | |                                          | |
    |                               v          | |                            |  | |datumLeaseManager.afterWorkingProcess     | |
    | notifiedAll +--->updateDataServerStatus +------> afterWorkingProcess +------>+                                          | |
    |                                 ^   ^    | |                            |  | |disconnectEventHandler.afterWorkingProcess| |
    |                                 |   |    | +----------------------------+  | |                                          | |
    |                                 |   |    |                                 | |notifyDataSyncHandler.afterWorkingProcess | |
    | checkAndUpdateStatus+-----------+   |    |                                 | +------------------------------------------+ |
    |                                     |    |                                 +----------------------------------------------+
    | addNotWorkingServer +---------------+    |
    |                                          |
    +------------------------------------------+
    
    

    手机如下:

    因为是业务关联,所以不需要什么定时,异步之类。

    2.5 业务实现

    2.5.1 DisconnectEventHandler

    public class DisconnectEventHandler implements InitializingBean, AfterWorkingProcess {
        /**
         * a DelayQueue that contains client disconnect events
         */
        private final DelayQueue<DisconnectEvent>           EVENT_QUEUE        = new DelayQueue<>();
    
        @Autowired
        private SessionServerConnectionFactory              sessionServerConnectionFactory;
    
        @Autowired
        private DataChangeEventCenter                       dataChangeEventCenter;
    
        @Autowired
        private DataServerConfig                            dataServerConfig;
    
        @Autowired
        private DataNodeStatus                              dataNodeStatus;
    
        private static final int                            BLOCK_FOR_ALL_SYNC = 5000;
    
        private static final BlockingQueue<DisconnectEvent> noWorkQueue        = new LinkedBlockingQueue<>();
    }
    

    在receive的正常业务操作中,如果发现本身状态不是 WORKING,则把event放入 BlockingQueue 之中。

    public void receive(DisconnectEvent event) {
            if (event.getType() == DisconnectTypeEnum.SESSION_SERVER) {
                SessionServerDisconnectEvent sessionServerDisconnectEvent = (SessionServerDisconnectEvent) event;
                    sessionServerDisconnectEvent.getProcessId());
            } else if (event.getType() == DisconnectTypeEnum.CLIENT) {
                ClientDisconnectEvent clientDisconnectEvent = (ClientDisconnectEvent) event;
            }
    
            if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
                noWorkQueue.add(event);
                return;
            }
            EVENT_QUEUE.add(event);
    }
    

    当时机来到时候,系统再次调用afterWorkingProcess。这里会始终Block在noWorkQueue上,如果不为空,则会执行请求。

    public void afterWorkingProcess() {
        try {
            /*
             * After the snapshot data is synchronized during startup, it is queued and then placed asynchronously into
             * DatumCache. When the notification becomes WORKING, there may be data in the queue that is not executed
             * to DatumCache. So it need to sleep for a while.
             */
            TimeUnit.MILLISECONDS.sleep(BLOCK_FOR_ALL_SYNC);
    
            while (!noWorkQueue.isEmpty()) {
                DisconnectEvent event = noWorkQueue.poll(1, TimeUnit.SECONDS);
                if (event != null) {
                    receive(event);
                }
            }
        } 
    }
    

    图示如下:

    +----------------------------------------------------------+
    |                                  DisconnectEventHandler  |
    |    +-------------------------+                           |
    |    | receive                 |                           |
    |    |                         |  NOT WORKING              |
    |    | dataNodeStatus.getStatus+---------------+           |
    |    |            +            |               |           |
    |    |            | WORKING    |               | add       |
    |    |            |            |               |           |
    |    |            v            |               |           |
    |    |  EVENT_QUEUE.add(event) |               |           |
    |    |                         |           +---v---------+ |
    |    +-------------------------+           |             | |
    |                                          | noWorkQueue | |
    |                                          |             | |
    |    +-----------------------+             +-----+-------+ |
    |    | afterWorkingProcess   |                   |         |
    |    |                       |                   | poll    |
    |    |                       |      NOT isEmpty  |         |
    |    |     receive(event) <----------------------+         |
    |    |                       |                             |
    |    |                       |                             |
    |    +-----------------------+                             |
    +----------------------------------------------------------+
    
    

    2.5.2 NotifyDataSyncHandler

    DisconnectEventHandler 和 NotifyDataSyncHandler 的实现类似。

    依托一个 LinkedBlockingQueue 做缓存queue。

    public class NotifyDataSyncHandler extends AbstractClientHandler<NotifyDataSyncRequest> implements AfterWorkingProcess {
      
      private static final BlockingQueue<SyncDataRequestForWorking> noWorkQueue = new LinkedBlockingQueue<>();
      
    }
    

    在doHandle的正常业务操作中,如果发现本身状态不是 WORKING,则用业务逻辑SyncDataRequestForWorking 构建一个消息 SyncDataRequestForWorking,放入 LinkedBlockingQueue 之中。

    @Override
    public Object doHandle(Channel channel, NotifyDataSyncRequest request) {
            final Connection connection = ((BoltChannel) channel).getConnection();
            if (dataNodeStatus.getStatus() != LocalServerStatusEnum.WORKING) {
                noWorkQueue.add(new SyncDataRequestForWorking(connection, request));
                return CommonResponse.buildSuccessResponse();
            }
            executorRequest(connection, request);
            return CommonResponse.buildSuccessResponse();
    }
    

    当时机来到时候,系统再次调用afterWorkingProcess。这里会始终Block在noWorkQueue上,如果不为空,则会执行请求。

    @Override
    public void afterWorkingProcess() {
                while (!noWorkQueue.isEmpty()) {
                    SyncDataRequestForWorking event = noWorkQueue.poll(1, TimeUnit.SECONDS);
                    if (event != null) {
                        executorRequest(event.getConnection(), event.getRequest());
                    }
                }
            } 
    }
    

    图示如下:

    +----------------------------------------------------------+
    |                                   NotifyDataSyncHandler  |
    |    +-------------------------+                           |
    |    | doHandle                |                           |
    |    |                         |  NOT WORKING              |
    |    | dataNodeStatus.getStatus+---------------+           |
    |    |            +            |               |           |
    |    |            | WORKING    |               | add       |
    |    |            |            |               |           |
    |    |            v            |               |           |
    |    |     executorRequest     |               |           |
    |    |                         |           +---v---------+ |
    |    +-------------------------+           |             | |
    |                                          | noWorkQueue | |
    |                                          |             | |
    |    +-----------------------+             +-----+-------+ |
    |    | afterWorkingProcess   |                   |         |
    |    |                       |                   | poll    |
    |    |                       |      NOT isEmpty  |         |
    |    |   executorRequest  <----------------------+         |
    |    |                       |                             |
    |    |                       |                             |
    |    +-----------------------+                             |
    +----------------------------------------------------------+
    
    

    2.5.3 RenewDatumHandler

    RenewDatumHandler 同 DatumLeaseManager 这两者很类似。并没有使用queue,只是提交一个线程。

    其实现目的在注释中写的很清楚:

    /* * After the snapshot data is synchronized during startup, it is queued and then placed asynchronously into * DatumCache. When the notification becomes WORKING, there may be data in the queue that is not executed * to DatumCache. So it need to sleep for a while. */
    

    但是细节又有所不同,这两个类是同一个作者,怀疑此君在实验比较两种不同实现方式。

    RenewDatumHandler 基于 ThreadPoolExecutorDataServer 来实现。

    public class RenewDatumHandler extends AbstractServerHandler<RenewDatumRequest> implements
                                                                                   AfterWorkingProcess {
    
        @Autowired
        private ThreadPoolExecutor  renewDatumProcessorExecutor;
    
    }
    

    renewDatumProcessorExecutor 是一个Bean,具体代码如下,ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,按FIFO原则进行排序。

    @Bean(name = "renewDatumProcessorExecutor")
    public ThreadPoolExecutor renewDatumProcessorExecutor(DataServerConfig dataServerConfig) {
                return new ThreadPoolExecutorDataServer("RenewDatumProcessorExecutor",
                    dataServerConfig.getRenewDatumExecutorMinPoolSize(),
                    dataServerConfig.getRenewDatumExecutorMaxPoolSize(), 300, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(dataServerConfig.getRenewDatumExecutorQueueSize()),
                    new NamedThreadFactory("DataServer-RenewDatumProcessor-executor", true));
    }
    

    ThreadPoolExecutorDataServer 主要代码如下,就是简单继承了ThreadPoolExecutor,估计这里后续会有新功能添加,现在只是占坑:

    public class ThreadPoolExecutorDataServer extends ThreadPoolExecutor {
        @Override
        public void execute(Runnable command) {
    		super.execute(command);
        }
    }
    

    对于afterWorkingProcess,就是提交了一个线程,其业务是:等待一段时间,然后设置renewEnabled。

    @Override
    public void afterWorkingProcess() {
            renewDatumProcessorExecutor.submit(() -> {
                TimeUnit.MILLISECONDS.sleep(dataServerConfig.getRenewEnableDelaySec());
                renewEnabled.set(true);
            });
    }
    

    0xFF 参考

    蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容

    蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路

    服务注册中心 Session 存储策略 | SOFARegistry 解析

    海量数据下的注册中心 - SOFARegistry 架构介绍

    服务注册中心数据分片和同步方案详解 | SOFARegistry 解析

    蚂蚁金服开源通信框架SOFABolt解析之连接管理剖析

    蚂蚁金服开源通信框架SOFABolt解析之超时控制机制及心跳机制

    蚂蚁金服开源通信框架 SOFABolt 协议框架解析

    蚂蚁金服服务注册中心数据一致性方案分析 | SOFARegistry 解析

    蚂蚁通信框架实践

    sofa-bolt 远程调用

    sofa-bolt学习

    SOFABolt 设计总结 - 优雅简洁的设计之道

    SofaBolt源码分析-服务启动到消息处理

    SOFABolt 源码分析

    SOFABolt 源码分析9 - UserProcessor 自定义处理器的设计

    SOFARegistry 介绍

    SOFABolt 源码分析13 - Connection 事件处理机制的设计

  • 相关阅读:
    phpqrcode生成带logo的二维码图片及带文字的二维码图片
    php 文件压缩zip扩展
    js常用的正则表达操作
    WebViewJavascriptBridge详细使用(转载)
    html5的FormData对象和input的file属性以及window.URL.createObjectURL( ) 方法(转载)
    js面向对象的实现(example 二)
    PHP二维数组(或任意维数组)转换成一维数组的方法汇总
    Yii2实现自定义独立验证器的方法
    yii2.0配置以pathinfo的形式访问
    安装 AdminLTE和 yii2-admin
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14289232.html
Copyright © 2011-2022 走看看