zoukankan      html  css  js  c++  java
  • 「从零单排canal 04」 启动模块deployer源码解析

    基于1.1.5-alpha版本,具体源码笔记可以参考我的github:https://github.com/saigu/JavaKnowledgeGraph/tree/master/code_reading/canal

    本文将对canal的启动模块deployer进行分析。

    Deployer模块(绿色部分)在整个系统中的角色如下图所示,用来启动canal-server.

    「从零单排canal 04」 启动模块deployer源码解析

     

    模块内的类如下:

    「从零单排canal 04」 启动模块deployer源码解析

     

    为了能带着目的看源码,以几个问题开头,带着问题来一起探索deployer模块的源码。

    • CanalServer启动过程中配置如何加载?
    • CanalServer启动过程中涉及哪些组件?
    • 集群模式的canalServer,是如何实现instance的HA呢?
    • 每个canalServer又是怎么获取admin上的配置变更呢?

    1.入口类CanalLauncher


    这个类是整个canal-server的入口类。负责配置加载和启动canal-server。

    主流程如下:

    • 加载canal.properties的配置内容
    • 根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了
    • 如果是admin控制,使用PlainCanalConfigClient获取远程配置 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法) 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server
    • 核心是用canalStarter.start()启动
    • 使用CountDownLatch保持主线程存活
    • 收到关闭信号,CDL-1,然后关闭配置更新线程池,优雅退出
      1 public static void main(String[] args) {
      2 
      3     try {
      4 
      5         //note:设置全局未捕获异常的处理
      6 
      7         setGlobalUncaughtExceptionHandler();
      8 
      9         /**
     10 
     11          * note:
     12 
     13          * 1.读取canal.properties的配置
     14 
     15          * 可以手动指定配置路径名称
     16 
     17          */
     18 
     19         String conf = System.getProperty("canal.conf", "classpath:canal.properties");
     20 
     21         Properties properties = new Properties();
     22 
     23         if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
     24 
     25             conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
     26 
     27             properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
     28 
     29         } else {
     30 
     31             properties.load(new FileInputStream(conf));
     32 
     33         }
     34 
     35         final CanalStarter canalStater = new CanalStarter(properties);
     36 
     37         String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
     38 
     39         /**
     40 
     41          * note:
     42 
     43          * 2.根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了
     44 
     45          */
     46 
     47         if (StringUtils.isNotEmpty(managerAddress)) {
     48 
     49             String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
     50 
     51             //省略一部分。。。。。。
     52           
     53 
     54             /**
     55 
     56              * note:
     57 
     58              * 2.1使用PlainCanalConfigClient获取远程配置
     59 
     60              */
     61 
     62             final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
     63 
     64                     user,
     65 
     66                     passwd,
     67 
     68                     registerIp,
     69 
     70                     Integer.parseInt(adminPort),
     71 
     72                     autoRegister,
     73 
     74                     autoCluster);
     75 
     76             PlainCanal canalConfig = configClient.findServer(null);
     77 
     78             if (canalConfig == null) {
     79 
     80                 throw new IllegalArgumentException("managerAddress:" + managerAddress
     81 
     82                         + " can't not found config for [" + registerIp + ":" + adminPort
     83 
     84                         + "]");
     85 
     86             }
     87 
     88             Properties managerProperties = canalConfig.getProperties();
     89 
     90             // merge local
     91 
     92             managerProperties.putAll(properties);
     93 
     94             int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
     95 
     96                     CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
     97 
     98                     "5"));
     99 
    100             /**
    101 
    102              * note:
    103 
    104              * 2.2 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法)
    105 
    106              */
    107 
    108             executor.scheduleWithFixedDelay(new Runnable() {
    109 
    110                 private PlainCanal lastCanalConfig;
    111 
    112                 public void run() {
    113 
    114                     try {
    115 
    116                         if (lastCanalConfig == null) {
    117 
    118                             lastCanalConfig = configClient.findServer(null);
    119 
    120                         } else {
    121 
    122                             PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
    123 
    124                             /**
    125 
    126                              * note:
    127 
    128                              * 2.3 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server
    129 
    130                              */
    131 
    132                             if (newCanalConfig != null) {
    133 
    134                                 // 远程配置canal.properties修改重新加载整个应用
    135 
    136                                 canalStater.stop();
    137 
    138                                 Properties managerProperties = newCanalConfig.getProperties();
    139 
    140                                 // merge local
    141 
    142                                 managerProperties.putAll(properties);
    143 
    144                                 canalStater.setProperties(managerProperties);
    145 
    146                                 canalStater.start();
    147 
    148                                 lastCanalConfig = newCanalConfig;
    149 
    150                             }
    151 
    152                         }
    153 
    154                     } catch (Throwable e) {
    155 
    156                         logger.error("scan failed", e);
    157 
    158                     }
    159 
    160                 }
    161 
    162             }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
    163 
    164             canalStater.setProperties(managerProperties);
    165 
    166         } else {
    167 
    168             canalStater.setProperties(properties);
    169 
    170         }
    171 
    172         canalStater.start();
    173 
    174         //note: 这样用CDL处理和while(true)有点类似
    175 
    176         runningLatch.await();
    177 
    178         executor.shutdownNow();
    179 
    180     } catch (Throwable e) {
    181 
    182         logger.error("## Something goes wrong when starting up the canal Server:", e);
    183 
    184     }
    185 
    186 }

    2.启动类CanalStarter


    从上面的入口类,我们可以看到canal-server真正的启动逻辑在CanalStarter类的start方法。

    这里先对三个对象进行辨析:

    • CanalController:是canalServer真正的启动控制器
    • canalMQStarter:用来启动mqProducer。如果serverMode选择了mq,那么会用canalMQStarter来管理mqProducer,将canalServer抓取到的实时变更用mqProducer直接投递到mq
    • CanalAdminWithNetty:这个不是admin控制台,而是对本server启动一个netty服务,让admin控制台通过请求获取当前server的信息,比如运行状态、正在本server上运行的instance信息等

    start方法主要逻辑如下:

    • 根据配置的serverMode,决定使用CanalMQProducer或者canalServerWithNetty
    • 启动CanalController
    • 注册shutdownHook
    • 如果CanalMQProducer不为空,启动canalMQStarter(内部使用CanalMQProducer将消息投递给mq)
    • 启动CanalAdminWithNetty做服务器
      1 public synchronized void start() throws Throwable {
      2 
      3     String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
      4 
      5     /**
      6 
      7      * note
      8 
      9      * 1.如果canal.serverMode不是tcp,加载CanalMQProducer,并且启动CanalMQProducer
     10 
     11      * 回头可以深入研究下ExtensionLoader类的相关实现
     12 
     13      */
     14 
     15     if (!"tcp".equalsIgnoreCase(serverMode)) {
     16 
     17         ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
     18 
     19         canalMQProducer = loader
     20 
     21                 .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
     22 
     23         if (canalMQProducer != null) {
     24 
     25             ClassLoader cl = Thread.currentThread().getContextClassLoader();
     26 
     27             Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
     28 
     29             canalMQProducer.init(properties);
     30 
     31             Thread.currentThread().setContextClassLoader(cl);
     32 
     33         }
     34 
     35     }
     36 
     37     //note 如果启动了canalMQProducer,就不使用canalWithNetty(这里的netty是用在哪里的?)
     38 
     39     if (canalMQProducer != null) {
     40 
     41         MQProperties mqProperties = canalMQProducer.getMqProperties();
     42 
     43         // disable netty
     44 
     45         System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
     46 
     47         if (mqProperties.isFlatMessage()) {
     48 
     49             // 设置为raw避免ByteString->Entry的二次解析
     50 
     51             System.setProperty("canal.instance.memory.rawEntry", "false");
     52 
     53         }
     54 
     55     }
     56 
     57     controller = new CanalController(properties);
     58 
     59     //note 2.启动canalController
     60 
     61     controller.start();
     62 
     63     //note 3.注册了一个shutdownHook,系统退出时执行相关逻辑
     64 
     65     shutdownThread = new Thread() {
     66 
     67         public void run() {
     68 
     69             try {
     70 
     71                 controller.stop();
     72 
     73                 //note 主线程退出
     74 
     75                 CanalLauncher.runningLatch.countDown();
     76 
     77             } catch (Throwable e) {
     78 
     79 
     80             } finally {
     81 
     82             }
     83 
     84         }
     85 
     86     };
     87 
     88     Runtime.getRuntime().addShutdownHook(shutdownThread);
     89 
     90     //note 4.启动canalMQStarter,集群版的话,没有预先配置destinations。
     91 
     92     if (canalMQProducer != null) {
     93 
     94         canalMQStarter = new CanalMQStarter(canalMQProducer);
     95 
     96         String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
     97 
     98         canalMQStarter.start(destinations);
     99 
    100         controller.setCanalMQStarter(canalMQStarter);
    101 
    102     }
    103 
    104     // start canalAdmin
    105 
    106     String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
    107 
    108     //note 5.根据填写的canalAdmin的ip和port,启动canalAdmin,用netty做服务器
    109 
    110     if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
    111 
    112         String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
    113 
    114         String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
    115 
    116         CanalAdminController canalAdmin = new CanalAdminController(this);
    117 
    118         canalAdmin.setUser(user);
    119 
    120         canalAdmin.setPasswd(passwd);
    121 
    122         String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
    123 
    124         CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
    125 
    126         canalAdminWithNetty.setCanalAdmin(canalAdmin);
    127 
    128         canalAdminWithNetty.setPort(Integer.parseInt(port));
    129 
    130         canalAdminWithNetty.setIp(ip);
    131 
    132         canalAdminWithNetty.start();
    133 
    134         this.canalAdmin = canalAdminWithNetty;
    135 
    136     }
    137 
    138     running = true;
    139 
    140 }

    3.CanalController


    前面两个类都是比较清晰的,一个是入口类,一个是启动类,下面来看看核心逻辑所在的CanalController。

    这里用了大量的匿名内部类实现接口,看起来有点头大,耐心慢慢剖析一下。

    3.1 从构造器开始了解

    整体初始化的顺序如下:

    • 构建PlainCanalConfigClient,用于用户远程配置的获取
    • 初始化全局配置,顺便把instance相关的全局配置初始化一下
    • 准备一下canal-server,核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq是不需要这个netty的)
    • 初始化zkClient
    • 初始化ServerRunningMonitors,作为instance 运行节点控制
    • 初始化InstanceAction,完成monitor机制。(监控instance配置变化然后调用ServerRunningMonitor进行处理)

    这里有几个机制要详细介绍一下。

    3.1.1 CanalServer两种模式

    canalServer支持两种模式,CanalServerWithEmbedded和CanalServerWithNetty。

    在构造器中初始化代码部分如下:

     1 // 3.准备canal server
     2 
     3 //note: 核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq
     4 
     5 // 是不需要这个netty的)
     6 
     7 ip = getProperty(properties, CanalConstants.CANAL_IP);
     8 
     9 //省略一部分。。。
    10 
    11 embededCanalServer = CanalServerWithEmbedded.instance();
    12 
    13 embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
    14 
    15 int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
    16 
    17 //省略一部分。。。
    18 
    19 String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
    20 
    21 if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
    22 
    23     canalServer = CanalServerWithNetty.instance();
    24 
    25     canalServer.setIp(ip);
    26 
    27     canalServer.setPort(port);
    28 
    29 }

    embededCanalServer:类型为CanalServerWithEmbedded

    canalServer:类型为CanalServerWithNetty

    二者有什么区别呢?

    都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。

    关于这两种类型的实现,canal官方文档有以下描述:

    「从零单排canal 04」 启动模块deployer源码解析

     

    说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库进行订阅。

    如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。

    在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。

    因此,在构造器中,我们看到,

    用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,

    而ip和port被设置到CanalServerWithNetty中。

    关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。

    3.1.2 ServerRunningMonitor

    在CanalController的构造器中,canal会为每一个destination创建一个Instance,每个Instance都会由一个ServerRunningMonitor来进行控制。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。

    ServerRunningMonitor是做什么的呢?

    我们看下它的属性就了解了。它主要用来记录每个instance的运行状态数据的。

     1 /**
     2 
     3 * 针对server的running节点控制
     4 
     5 */
     6 
     7 public class ServerRunningMonitor extends AbstractCanalLifeCycle {
     8 
     9     private static final Logger        logger       = LoggerFactory.getLogger(ServerRunningMonitor.class);
    10 
    11     private ZkClientx                  zkClient;
    12 
    13     private String                     destination;
    14 
    15     private IZkDataListener            dataListener;
    16 
    17     private BooleanMutex               mutex        = new BooleanMutex(false);
    18 
    19     private volatile boolean           release      = false;
    20 
    21     // 当前服务节点状态信息
    22 
    23     private ServerRunningData          serverData;
    24 
    25     // 当前实际运行的节点状态信息
    26 
    27     private volatile ServerRunningData activeData;
    28 
    29     private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
    30 
    31     private int                        delayTime    = 5;
    32 
    33     private ServerRunningListener      listener;
    34 
    35     public ServerRunningMonitor(ServerRunningData serverData){
    36 
    37         this();
    38 
    39         this.serverData = serverData;
    40 
    41     }
    42         //。。。。。
    43 
    44 }

    在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener。

    ServerRunningListener是个接口,这里采用了匿名内部类的形式构建,实现了各个接口的方法。

    主要为instance在当前server上的状态发生变化时调用。比如要在当前server上启动这个instance了,就调用相关启动方法,如果在这个server上关闭instance,就调用相关关闭方法。

    具体的调用逻辑我们后面在启动过程中分析,这里大概知道下构造器中做了些什么就行了,主要就是一些启动、关闭的逻辑。

      1 new Function<String, ServerRunningMonitor>() {
      2 
      3     public ServerRunningMonitor apply(final String destination) {
      4 
      5         ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
      6 
      7         runningMonitor.setDestination(destination);
      8 
      9         runningMonitor.setListener(new ServerRunningListener() {
     10 
     11             /**
     12 
     13              * note
     14 
     15              * 1.内部调用了embededCanalServer的start(destination)方法。
     16 
     17              * 这里很关键,说明每个destination对应的CanalInstance是通过embededCanalServer的start方法启动的,
     18 
     19              * 这样我们就能理解,为什么之前构造器中会把instanceGenerator设置到embededCanalServer中了。
     20 
     21              * embededCanalServer负责调用instanceGenerator生成CanalInstance实例,并负责其启动。
     22 
     23              *
     24 
     25              * 2.如果投递mq,还会直接调用canalMQStarter来启动一个destination
     26 
     27              */
     28 
     29             public void processActiveEnter() {
     30 
     31                //省略具体内容。。。
     32             }
     33 
     34             /**
     35 
     36              * note
     37 
     38              * 1.与开始顺序相反,如果有mqStarter,先停止mqStarter的destination
     39 
     40              * 2.停止embedeCanalServer的destination
     41 
     42              */
     43 
     44             public void processActiveExit() {
     45 
     46                 //省略具体内容。。。
     47 
     48             }
     49 
     50             /**
     51 
     52              * note
     53 
     54              * 在Canalinstance启动之前,destination注册到ZK上,创建节点
     55 
     56              * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。
     57 
     58              * 此方法会在processActiveEnter()之前被调用
     59 
     60              */
     61 
     62             public void processStart() {
     63 
     64                 //省略具体内容。。。
     65 
     66             }
     67 
     68             /**
     69 
     70              * note
     71 
     72              * 在Canalinstance停止前,把ZK上节点删除掉
     73 
     74              * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。
     75 
     76              * 此方法会在processActiveExit()之前被调用
     77 
     78              */
     79 
     80             public void processStop() {
     81 
     82                 //省略具体内容。。。
     83             }
     84 
     85         });
     86 
     87         if (zkclientx != null) {
     88 
     89             runningMonitor.setZkClient(zkclientx);
     90 
     91         }
     92 
     93         // 触发创建一下cid节点
     94 
     95         runningMonitor.init();
     96 
     97         return runningMonitor;
     98 
     99     }
    100 
    101 }

    3.2 canalController的start方法

    具体运行逻辑如下:

    • 在zk的/otter/canal/cluster目录下根据ip:port创建server的临时节点,注册zk监听器
    • 先启动embededCanalServer(会启动对应的监控)
    • 根据配置的instance的destination,调用runningMonitor.start() 逐个启动instance
    • 如果cannalServer不为空,启动canServer (canalServerWithNetty)

    这里需要注意,canalServer什么时候为空?

    如果用户选择了serverMode为mq,那么就不会启动canalServerWithNetty,采用mqStarter来作为server,直接跟mq集群交互。canalServerWithNetty只有在serverMode为tcp时才启动,用来跟canal-client做交互。

    所以如果以后想把embeddedCanal嵌入自己的应用,可以考虑参考mqStarter的写法。后面我们在server模块中会做详细解析。

      1 public void start() throws Throwable {
      2 
      3     // 创建整个canal的工作节点
      4 
      5     final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
      6 
      7     initCid(path);
      8 
      9     if (zkclientx != null) {
     10 
     11         this.zkclientx.subscribeStateChanges(new IZkStateListener() {
     12 
     13             public void handleStateChanged(KeeperState state) throws Exception {
     14 
     15             }
     16 
     17             public void handleNewSession() throws Exception {
     18 
     19                 initCid(path);
     20 
     21             }
     22 
     23             @Override
     24 
     25             public void handleSessionEstablishmentError(Throwable error) throws Exception{
     26 
     27                 logger.error("failed to connect to zookeeper", error);
     28 
     29             }
     30 
     31         });
     32 
     33     }
     34 
     35     // 先启动embeded服务
     36 
     37     embededCanalServer.start();
     38 
     39     // 尝试启动一下非lazy状态的通道
     40 
     41     for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
     42 
     43         final String destination = entry.getKey();
     44 
     45         InstanceConfig config = entry.getValue();
     46 
     47         // 创建destination的工作节点
     48 
     49         if (!embededCanalServer.isStart(destination)) {
     50 
     51             // HA机制启动
     52 
     53             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
     54 
     55             if (!config.getLazy() && !runningMonitor.isStart()) {
     56 
     57                 runningMonitor.start();
     58 
     59             }
     60 
     61         }
     62 
     63         //note:为每个instance注册一个配置监视器
     64 
     65         if (autoScan) {
     66 
     67             instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
     68 
     69         }
     70 
     71     }
     72 
     73     if (autoScan) {
     74 
     75         //note:启动线程定时去扫描配置
     76 
     77         instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
     78 
     79         //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一
     80 
     81         for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
     82 
     83             if (!monitor.isStart()) {
     84 
     85                 monitor.start();
     86 
     87             }
     88 
     89         }
     90 
     91     }
     92 
     93     // 启动网络接口
     94 
     95     if (canalServer != null) {
     96 
     97         canalServer.start();
     98 
     99     }
    100 
    101 }

    我们重点关注启动instance的过程,也就是ServerRunningMonitor的运行机制,也就是HA启动的关键。

    入口在runningMonitor.start()。

    • 如果zkClient != null,就用zk进行HA启动
    • 否则,就直接processActiveEnter启动,这个我们前面已经分析过了
     1 public synchronized void start() {
     2 
     3     super.start();
     4 
     5     try {
     6 
     7         /**
     8 
     9          * note
    10 
    11          * 内部会调用ServerRunningListener的processStart()方法
    12 
    13          */
    14 
    15         processStart();
    16 
    17         if (zkClient != null) {
    18 
    19             // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
    20 
    21             String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
    22 
    23             zkClient.subscribeDataChanges(path, dataListener);
    24 
    25             initRunning();
    26 
    27         } else {
    28 
    29             /**
    30 
    31              * note
    32 
    33              * 内部直接调用ServerRunningListener的processActiveEnter()方法
    34 
    35              */
    36 
    37             processActiveEnter();// 没有zk,直接启动
    38 
    39         }
    40 
    41     } catch (Exception e) {
    42 
    43         logger.error("start failed", e);
    44 
    45         // 没有正常启动,重置一下状态,避免干扰下一次start
    46 
    47         stop();
    48 
    49     }
    50 
    51 }

    重点关注下HA启动方式,一般 我们都采用这种模式进行。

    在集群模式下,可能会有多个canal server共同处理同一个destination,

    在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。

    同时,通过监听对应的path节点,一旦发生变化,出现异常,可以立刻尝试自己进入running,保证了instace的 高可用!!

    启动的重点还是在initRuning()。

    利用zk来保证集群中有且只有 一个instance任务在运行。

    • 还构建一个临时节点的路径:/otter/canal/destinations/{0}/running
    • 尝试创建临时节点。
    • 如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。此时会抛出ZkNodeExistsException,进入catch代码块。
    • 如果创建成功,就说明没有其他server启动这个instance,可以创建
     1 private void initRunning() {
     2     if (!isStart()) {
     3         return;
     4     }
     5 
     6 
     7     //note: 还是一样构建一个临时节点的路径:/otter/canal/destinations/{0}/running
     8     String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
     9     // 序列化
    10     byte[] bytes = JsonUtils.marshalToByte(serverData);
    11     try {
    12         mutex.set(false);
    13         /**
    14          * note:
    15          * 尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。
    16          * 此时会抛出ZkNodeExistsException,进入catch代码块。
    17          */
    18         zkClient.create(path, bytes, CreateMode.EPHEMERAL);
    19         /**
    20          * note:
    21          * 如果创建成功,就开始触发启动事件
    22          */
    23         activeData = serverData;
    24         processActiveEnter();// 触发一下事件
    25         mutex.set(true);
    26         release = false;
    27     } catch (ZkNodeExistsException e) {
    28         /**
    29          * note:
    30          * 如果捕获异常,表示创建失败。
    31          * 就根据临时节点路径查一下是哪个canal-sever创建了。
    32          * 如果没有相关信息,马上重新尝试一下。
    33          * 如果确实存在,就把相关信息保存下来
    34          */
    35         bytes = zkClient.readData(path, true);
    36         if (bytes == null) {// 如果不存在节点,立即尝试一次
    37             initRunning();
    38         } else {
    39             activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
    40         }
    41     } catch (ZkNoNodeException e) {
    42         /**
    43          * note:
    44          * 如果是父节点不存在,那么就尝试创建一下父节点,然后再初始化。
    45          */
    46         zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
    47         initRunning();
    48     }
    49 }

    那运行中的HA是如何实现的呢,我们回头看一下

    zkClient.subscribeDataChanges(path, dataListener);
    对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常,此时需要尝试自己进入running状态。

    dataListener是在ServerRunningMonitor的构造方法中初始化的,

    包括节点发生变化、节点被删两种变化情况以及相对应的处理逻辑,如下 :

     
     1 public ServerRunningMonitor(){
     2     // 创建父节点
     3     dataListener = new IZkDataListener() {
     4         /**
     5          * note:
     6          * 当注册节点发生变化时,会自动回调这个方法。
     7          * 我们回想一下使用过程中,什么时候可能 改变节点当状态呢?
     8          * 大概是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive。
     9          * 可以 触发 HA。
    10          */
    11         public void handleDataChange(String dataPath, Object data) throws Exception {
    12             MDC.put("destination", destination);
    13             ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
    14             if (!isMine(runningData.getAddress())) {
    15                 mutex.set(false);
    16             }
    17 
    18             if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
    19                 releaseRunning();// 彻底释放mainstem
    20             }
    21 
    22             activeData = (ServerRunningData) runningData;
    23         }
    24 
    25 
    26         /**
    27          * note:
    28          * 如果其他canal instance出现异常,临时节点数据被删除时,会自动回调这个方法,此时当前canal instance要顶上去
    29          */
    30         public void handleDataDeleted(String dataPath) throws Exception {
    31             MDC.put("destination", destination);
    32             mutex.set(false);
    33             if (!release && activeData != null && isMine(activeData.getAddress())) {
    34                 // 如果上一次active的状态就是本机,则即时触发一下active抢占
    35                 initRunning();
    36             } else {
    37                 // 否则就是等待delayTime,避免因网络异常或者zk异常,导致出现频繁的切换操作
    38                 delayExector.schedule(new Runnable() {
    39                     public void run() {
    40                         initRunning();
    41                     }
    42                 }, delayTime, TimeUnit.SECONDS);
    43             }
    44         }
    45     };
    46 }
     

    当注册节点发生变化时,会自动回调zkListener的handleDataChange方法。

    我们回想一下使用过程中,什么时候可能 改变节点当状态呢?

    就是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive,可以 触发 HA。

    如下图所示

    「从零单排canal 04」 启动模块deployer源码解析

     

    4.admin的配置监控原理

    我们现在采用admin做全局的配置控制。

    那么每个canalServer是怎么监控配置的变化呢?

    还记得上吗cananlController的start方法中对配置监视器的启动吗?

     1 if (autoScan) {
     2         //note:启动线程定时去扫描配置
     3         instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
     4         //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一
     5         for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
     6             if (!monitor.isStart()) {
     7                 monitor.start();
     8             }
     9         }
    10     }

    这个就是关键的配置监控。

    我们来看deployer模块中的monitor包了。

    「从零单排canal 04」 启动模块deployer源码解析

     

    4.1 InstanceAction

    是一个接口,有四个方法,用来获取配置后,对具体instance采取动作。

     1 /**
     2 * config配置变化后的动作
     3 *
     4 */
     5 public interface InstanceAction {
     6 
     7 
     8     /**
     9      * 启动destination
    10      */
    11     void start(String destination);
    12 
    13 
    14     /**
    15      * 主动释放destination运行
    16      */
    17     void release(String destination);
    18 
    19 
    20     /**
    21      * 停止destination
    22      */
    23     void stop(String destination);
    24 
    25 
    26     /**
    27      * 重载destination,可能需要stop,start操作,或者只是更新下内存配置
    28      */
    29     void reload(String destination);
    30 }

    具体实现在canalController的构造器中实现了匿名类。

    4.2 InstanceConfigMonitor

    这个接口有两个实现,一个是基于spring的,一个基于manager(就是admin)。

    我们看下基于manager配置的实现的ManagerInstanceConfigMonitor即可。

    原理很简单。

    • 采用一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
    • 然后通过defaultAction去start
    • 这个start在canalController的构造器的匿名类中实现,会使用instance对应的runningMonitor做HA启动。具体逻辑上一小节已经详细介绍过了。
     1 /**
     2 * 基于manager配置的实现
     3 *
     4 */
     5 public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle {
     6 
     7 
     8     private static final Logger         logger               = LoggerFactory.getLogger(ManagerInstanceConfigMonitor.class);
     9     private long                        scanIntervalInSecond = 5;
    10     private InstanceAction              defaultAction        = null;
    11     /**
    12      * note:
    13      * 每个instance对应的instanceAction,实际上我们看代码发现都是用的同一个defaultAction
    14      */
    15     private Map<String, InstanceAction> actions              = new MapMaker().makeMap();
    16     /**
    17      * note:
    18      * 每个instance对应的远程配置
    19      */
    20     private Map<String, PlainCanal>     configs              = MigrateMap.makeComputingMap(new Function<String, PlainCanal>() {
    21                                                                  public PlainCanal apply(String destination) {
    22                                                                      return new PlainCanal();
    23                                                                  }
    24                                                              });
    25     /**
    26      * note:
    27      * 一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
    28      */
    29     private ScheduledExecutorService    executor             = Executors.newScheduledThreadPool(1,
    30                                                                  new NamedThreadFactory("canal-instance-scan"));
    31 
    32     private volatile boolean            isFirst              = true;
    33     /**
    34      * note:
    35      * 拉取admin配置的client
    36      */
    37     private PlainCanalConfigClient      configClient;
    38 //
    39 }

    5.总结

    deployer模块的主要作用:

    1)读取canal.properties,确定canal instance的配置加载方式。如果使用了admin,那么还会定时拉取admin上的配置更新。

    2)确定canal-server的启动方式:独立启动或者集群方式启动

    3)利用zkClient监听canal instance在zookeeper上的状态变化,动态停止、启动或新增,实现了instance的HA

    4)利用InstanceConfigMonitor,采用固定线程定时轮训admin,获取instance的最新配置

    5)启动canal server,监听客户端请求

    这里还有个非常有意思的问题没有展开说明,那就是CanalStarter里面的配置加载,通过ExtensionLoader类的相关实现,如何通过不同的类加载器,实现SPI,后面再分析吧。

    都看到最后了,原创不易,点个关注,点个赞吧~
    文章持续更新,可以微信搜索「阿丸笔记 」第一时间阅读,回复关键字【学习】有我准备的一线大厂面试资料。
    知识碎片重新梳理,构建Java知识图谱:github.com/saigu/JavaK…(历史文章查阅非常方便)
  • 相关阅读:
    SAP 多料号展BOM
    SAP QM 检验批可用库存回转为待检验库存
    SAP QM UD检验批回转为REL待检验状态
    在ABAP中获取应用程序服务器的IP地址
    SAP连接外部数据库后批量写入数据
    Java调用Axis2用SAP WSDL生成的Stub文件
    用最新的采购信息记录更新采购单的价格——BAPI_PO_CHANGE
    SAP QM——QA01、QA02、QA03屏幕增强
    Java递归实现一、二、三级菜单查询
    ABAP——查询
  • 原文地址:https://www.cnblogs.com/awan-note/p/13253534.html
Copyright © 2011-2022 走看看