zoukankan      html  css  js  c++  java
  • 海豚调度器(dolphinscheduler)的容错分析

    海豚调度器(dolphinscheduler)的容错,如图:

    容错有两种情况:一是启动第一个master的时候,该master节点扫描processInstance表里面正在还在执行状态的processInstance重新生成command命令,同时将该条processInstance记录的host置为null。

    执行状态是指:

     private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
                ExecutionStatus.RUNNING_EXECUTION.ordinal(),
                ExecutionStatus.READY_PAUSE.ordinal(),
                ExecutionStatus.READY_STOP.ordinal()};
    

    启动时一个节点容错的代码:

        // startup tolerant
                if (getActiveMasterNum() == 1) {
                    removeZKNodePath(null, ZKNodeType.MASTER, true);
                    removeZKNodePath(null, ZKNodeType.WORKER, true);
                }
    

    第二种情况是ZK注册了若干个节点,以ABC三个节点为例,C节点突然某个节点下线了,AB会受到通知,将C ip下面的处于上述列举的执行状态的processInstance重新生成command命令,同时将该条processInstance记录的host置为null,这个过程到底由AB谁来做呢?谁先抢到ZK的锁谁做。

     @Override
        protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
            //monitor master
            if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + Constants.SINGLE_SLASH)) {
                handleMasterEvent(event, path);
            } else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + Constants.SINGLE_SLASH)) {
                //monitor worker
                handleWorkerEvent(event, path);
            }
        }
    
      public void handleMasterEvent(TreeCacheEvent event, String path) {
            switch (event.getType()) {
                case NODE_ADDED:
                    logger.info("master node added : {}", path);
                    break;
                case NODE_REMOVED:
                    removeZKNodePath(path, ZKNodeType.MASTER, true);
                    break;
                default:
                    break;
            }
        }
    
     /**
         * remove zookeeper node path
         *
         * @param path zookeeper node path
         * @param zkNodeType zookeeper node type
         * @param failover is failover
         */
        private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
            logger.info("{} node deleted : {}", zkNodeType, path);
            InterProcessMutex mutex = null;
            try {
                String failoverPath = getFailoverLockPath(zkNodeType);
                // create a distributed lock
                mutex = new InterProcessMutex(getZkClient(), failoverPath);
                mutex.acquire();
    
                String serverHost = null;
                if (StringUtils.isNotEmpty(path)) {
                    serverHost = getHostByEventDataPath(path);
                    if (StringUtils.isEmpty(serverHost)) {
                        logger.error("server down error: unknown path: {}", path);
                        return;
                    }
                    // handle dead server
                    handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
                }
                //failover server
                if (failover) {
                    failoverServerWhenDown(serverHost, zkNodeType);
                }
            } catch (Exception e) {
                logger.error("{} server failover failed.", zkNodeType);
                logger.error("failover exception ", e);
            } finally {
                releaseMutex(mutex);
            }
        }
    
  • 相关阅读:
    Grid 拾遗
    WPF 依赖属性
    升级cordova
    oracle比较一行的最大值或最小值
    nginx完全关闭log
    hive计算周一的日期
    查看linux文件目录的大小和文件夹包含的文件数
    linux设置服务器时间同步
    oracle取出多个字段列中的最大值和最小值
    oracle SQL语句取本周本月本年的数据
  • 原文地址:https://www.cnblogs.com/d9e84208/p/15265544.html
Copyright © 2011-2022 走看看