zoukankan      html  css  js  c++  java
  • hadoop-3

    结合https://blog.csdn.net/zhangjun5965/article/details/76796998,自己过一遍感受下

    public class DFSZKFailoverController extends ZKFailoverController 模板调用

      public static void main(String args[])
          throws Exception {
        StringUtils.startupShutdownMessage(DFSZKFailoverController.class,
            args, LOG);
        if (DFSUtil.parseHelpArgument(args, 
            ZKFailoverController.USAGE, System.out, true)) {
          System.exit(0);
        }
        
        GenericOptionsParser parser = new GenericOptionsParser(
            new HdfsConfiguration(), args);
        DFSZKFailoverController zkfc = DFSZKFailoverController.create(
            parser.getConfiguration());
        try {
          System.exit(zkfc.run(parser.getRemainingArgs()));
        } catch (Throwable t) {
          LOG.fatal("DFSZKFailOverController exiting due to earlier exception "
              + t);
          terminate(1, t);
        }
      }

    抽象类中run方法

      public int run(final String[] args) throws Exception {
        if (!localTarget.isAutoFailoverEnabled()) {
          LOG.error("Automatic failover is not enabled for " + localTarget + "." +
              " Please ensure that automatic failover is enabled in the " +
              "configuration before running the ZK failover controller.");
          return ERR_CODE_AUTO_FAILOVER_NOT_ENABLED;
        }
        loginAsFCUser();
        try {
          return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
            @Override
            public Integer run() {
              try {
                return doRun(args);
              } catch (Exception t) {
                throw new RuntimeException(t);
              } finally {
                if (elector != null) {
                  elector.terminateConnection();
                }
              }
            }
          });
        } catch (RuntimeException rte) {
          throw (Exception)rte.getCause();
        }
      }

    下面这个注释说明了选举的机制,就是利用创建zk临时节点机制, atomically creating an ephemeral lock file (znode) onZookeeper. The service instance that successfully creates the znode becomesactive and the rest become standbys,之中还定义了回调接口,包含当节点变为主节点或者从节点的通知,这里注意,存在一个防止脑裂的enterNeutralMode,在状态不确定的情况下通知到节点

    public class ActiveStandbyElector implements StatCallback, StringCallback {
    
      /**
       * Callback interface to interact with the ActiveStandbyElector object. <br/>
       * The application will be notified with a callback only on state changes
       * (i.e. there will never be successive calls to becomeActive without an
       * intermediate call to enterNeutralMode). <br/>
       * The callbacks will be running on Zookeeper client library threads. The
       * application should return from these callbacks quickly so as not to impede
       * Zookeeper client library performance and notifications. The app will
       * typically remember the state change and return from the callback. It will
       * then proceed with implementing actions around that state change. It is
       * possible to be called back again while these actions are in flight and the
       * app should handle this scenario.
       */
      public interface ActiveStandbyElectorCallback {
        /**
         * This method is called when the app becomes the active leader.
         * If the service fails to become active, it should throw
         * ServiceFailedException. This will cause the elector to
         * sleep for a short period, then re-join the election.
         * 
         * Callback implementations are expected to manage their own
         * timeouts (e.g. when making an RPC to a remote node).
         */
        void becomeActive() throws ServiceFailedException;
    
        /**
         * This method is called when the app becomes a standby
         */
        void becomeStandby();
    
        /**
         * If the elector gets disconnected from Zookeeper and does not know about
         * the lock state, then it will notify the service via the enterNeutralMode
         * interface. The service may choose to ignore this or stop doing state
         * changing operations. Upon reconnection, the elector verifies the leader
         * status and calls back on the becomeActive and becomeStandby app
         * interfaces. <br/>
         * Zookeeper disconnects can happen due to network issues or loss of
         * Zookeeper quorum. Thus enterNeutralMode can be used to guard against
         * split-brain issues. In such situations it might be prudent to call
         * becomeStandby too. However, such state change operations might be
         * expensive and enterNeutralMode can help guard against doing that for
         * transient issues.
         */
        void enterNeutralMode();
    
        void notifyFatalError(String errorMessage);
        void fenceOldActive(byte[] oldActiveData);
      }

    定义了一个watcher,监听zk相关事件

      /**
       * Watcher implementation which keeps a reference around to the
       * original ZK connection, and passes it back along with any
       * events.
       */
      private final class WatcherWithClientRef implements Watcher {
        private ZooKeeper zk;
        
        /**
         * Latch fired whenever any event arrives. This is used in order
         * to wait for the Connected event when the client is first created.
         */
        private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
    
        /**
         * Latch used to wait until the reference to ZooKeeper is set.
         */
        private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
    
        /**
         * Waits for the next event from ZooKeeper to arrive.
         * 
         * @param connectionTimeoutMs zookeeper connection timeout in milliseconds
         * @throws KeeperException if the connection attempt times out. This will
         * be a ZooKeeper ConnectionLoss exception code.
         * @throws IOException if interrupted while connecting to ZooKeeper
         */
        private void waitForZKConnectionEvent(int connectionTimeoutMs)
            throws KeeperException, IOException {
          try {
            if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
              LOG.error("Connection timed out: couldn't connect to ZooKeeper in " +
                  "{} milliseconds", connectionTimeoutMs);
              zk.close();
              throw KeeperException.create(Code.CONNECTIONLOSS);
            }
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(
                "Interrupted when connecting to zookeeper server", e);
          }
        }
    
        private void setZooKeeperRef(ZooKeeper zk) {
          Preconditions.checkState(this.zk == null,
              "zk already set -- must be set exactly once");
          this.zk = zk;
          hasSetZooKeeper.countDown();
        }
    
        @Override
        public void process(WatchedEvent event) {
          hasReceivedEvent.countDown();
          try {
            if (!hasSetZooKeeper.await(zkSessionTimeout, TimeUnit.MILLISECONDS)) {
              LOG.debug("Event received with stale zk");
            }
            ActiveStandbyElector.this.processWatchEvent(
                zk, event);
          } catch (Throwable t) {
            fatalError(
                "Failed to process watcher event " + event + ": " +
                StringUtils.stringifyException(t));
          }
        }
      }

    具体处理逻辑

      synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
        Event.EventType eventType = event.getType();
        if (isStaleClient(zk)) return;
        if (LOG.isDebugEnabled()) {
          LOG.debug("Watcher event type: " + eventType + " with state:"
              + event.getState() + " for path:" + event.getPath()
              + " connectionState: " + zkConnectionState
              + " for " + this);
        }
    
        if (eventType == Event.EventType.None) {
          // the connection state has changed
          switch (event.getState()) {
          case SyncConnected:
            LOG.info("Session connected.");
            // if the listener was asked to move to safe state then it needs to
            // be undone
            ConnectionState prevConnectionState = zkConnectionState;
            zkConnectionState = ConnectionState.CONNECTED;
            if (prevConnectionState == ConnectionState.DISCONNECTED &&
                wantToBeInElection) {
              monitorActiveStatus();
            }
            break;
          case Disconnected:
            LOG.info("Session disconnected. Entering neutral mode...");
    
            // ask the app to move to safe state because zookeeper connection
            // is not active and we dont know our state
            zkConnectionState = ConnectionState.DISCONNECTED;
            enterNeutralMode();
            break;
          case Expired:
            // the connection got terminated because of session timeout
            // call listener to reconnect
            LOG.info("Session expired. Entering neutral mode and rejoining...");
            enterNeutralMode();
            reJoinElection(0);
            break;
          case SaslAuthenticated:
            LOG.info("Successfully authenticated to ZooKeeper using SASL.");
            break;
          default:
            fatalError("Unexpected Zookeeper watch event state: "
                + event.getState());
            break;
          }
    
          return;
        }
    
        // a watch on lock path in zookeeper has fired. so something has changed on
        // the lock. ideally we should check that the path is the same as the lock
        // path but trusting zookeeper for now
        String path = event.getPath();
        if (path != null) {
          switch (eventType) {
          case NodeDeleted:
            if (state == State.ACTIVE) {
              enterNeutralMode();
            }
            joinElectionInternal();
            break;
          case NodeDataChanged:
            monitorActiveStatus();
            break;
          default:
            if (LOG.isDebugEnabled()) {
              LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
            }
            monitorActiveStatus();
          }
    
          return;
        }
    
        // some unexpected error has occurred
        fatalError("Unexpected watch error from Zookeeper");
      }

    看下加入选举,起始非常简单,就是创建节点

      private void joinElectionInternal() {
        Preconditions.checkState(appData != null,
            "trying to join election without any app data");
        if (zkClient == null) {
          if (!reEstablishSession()) {
            fatalError("Failed to reEstablish connection with ZooKeeper");
            return;
          }
        }
    
        createRetryCount = 0;
        wantToBeInElection = true;
        createLockNodeAsync();
      }
      private void createLockNodeAsync() {
        zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,
            this, zkClient);
      }

    一个变为不确定状态的代码

      @Override
      public void enterNeutralMode() {
        LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
            + zkSessionTimeout + " ms if connection is not reestablished.");
    
        // If we've just become disconnected, start a timer.  When the time's up,
        // we'll transition to standby.
        synchronized (zkDisconnectLock) {
          if (zkDisconnectTimer == null) {
            zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
            zkDisconnectTimer.schedule(new TimerTask() {
              @Override
              public void run() {
                synchronized (zkDisconnectLock) {
                  // Only run if the timer hasn't been cancelled
                  if (zkDisconnectTimer != null) {
                    becomeStandby();
                  }
                }
              }
            }, zkSessionTimeout);
          }
        }
      }
      @Override
      public void becomeStandby() {
        cancelDisconnectTimer();
    
        try {
          rm.getRMContext().getRMAdminService().transitionToStandby(req);
        } catch (Exception e) {
          LOG.error("RM could not transition to Standby", e);
        }
      }

    线索非常清晰

  • 相关阅读:
    shell-变量的数值运算let内置命令
    shell-变量的数值运算符-计算双括号(())的使用
    shell-批量修改文件名及扩展名多案例
    shell-变量的字串应用技术
    一个MVVM前端扩展器
    测试一个mysql 悲观锁
    spring rest项目提示Request method 'PUT' not supported Method Not Allowed 405 错误
    Mysql 使用sql语句添加列,修改列默认值,添加类注释
    理解java泛型中的 上界extend 下界super
    mysql存储过程游标循环装载字符串SQL语句示例
  • 原文地址:https://www.cnblogs.com/it-worker365/p/9928673.html
Copyright © 2011-2022 走看看