zoukankan      html  css  js  c++  java
  • QuorumPeer

    QuorumPeer是zookeeper执行同步,选主过程的线程

    主要逻辑:

      1.当自己处于 LOOKING状态:  执行lookForLeader()

      2.当自己处于OBSERVING状态: 建立一个Observer服务和leader进行同步

      3.当自己处于LEADING时: 建立一个Leader服务,处理peer的请求. 

      4.当自己处于FOLLOWING时:建立一个Follwer服务和leader执行同步

    @Override
        public void run() {
            setName("QuorumPeer" + "[myid=" + getId() + "]" +
                    cnxnFactory.getLocalAddress());
    
           ......try {
                /*
                 * Main loop
                 */
                while (running) {
                    switch (getPeerState()) {
                    case LOOKING:
                        LOG.info("LOOKING");
    
                        if (Boolean.getBoolean("readonlymode.enabled")) {
                            LOG.info("Attempting to start ReadOnlyZooKeeperServer");
    
                            // Create read-only server but don't start it immediately
                            final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                                    logFactory, this,
                                    new ZooKeeperServer.BasicDataTreeBuilder(),
                                    this.zkDb);
        
                            // Instead of starting roZk immediately, wait some grace
                            // period before we decide we're partitioned.
                            //
                            // Thread is used here because otherwise it would require
                            // changes in each of election strategy classes which is
                            // unnecessary code coupling.
                            Thread roZkMgr = new Thread() {
                                public void run() {
                                    try {
                                        // lower-bound grace period to 2 secs
                                        sleep(Math.max(2000, tickTime));
                                        if (ServerState.LOOKING.equals(getPeerState())) {
                                            roZk.startup();
                                        }
                                    } catch (InterruptedException e) {
                                        LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                    } catch (Exception e) {
                                        LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                    }
                                }
                            };
                            try {
                                roZkMgr.start();
                                setBCVote(null);
                                setCurrentVote(makeLEStrategy().lookForLeader());
                            } catch (Exception e) {
                                LOG.warn("Unexpected exception",e);
                                setPeerState(ServerState.LOOKING);
                            } finally {
                                // If the thread is in the the grace period, interrupt
                                // to come out of waiting.
                                roZkMgr.interrupt();
                                roZk.shutdown();
                            }
                        } else {
                            try {
                                setBCVote(null);
                                setCurrentVote(makeLEStrategy().lookForLeader());
                            } catch (Exception e) {
                                LOG.warn("Unexpected exception", e);
                                setPeerState(ServerState.LOOKING);
                            }
                        }
                        break;
                    case OBSERVING: //建立obser服务
                        try {
                            LOG.info("OBSERVING");
                            setObserver(makeObserver(logFactory));
                            observer.observeLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e );                        
                        } finally {
                            observer.shutdown();
                            setObserver(null);
                            setPeerState(ServerState.LOOKING);
                        }
                        break;
                    case FOLLOWING: //建立 Follower服务
                        try {
                            LOG.info("FOLLOWING");
                            setFollower(makeFollower(logFactory));
                            follower.followLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                        } finally {
                            follower.shutdown();
                            setFollower(null);
                            setPeerState(ServerState.LOOKING);
                        }
                        break;
                    case LEADING:  //建立 Leader服务
                        LOG.info("LEADING");
                        try {
                            setLeader(makeLeader(logFactory));
                            leader.lead();
                            setLeader(null);
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                        } finally {
                            if (leader != null) {
                                leader.shutdown("Forcing shutdown");
                                setLeader(null);
                            }
                            setPeerState(ServerState.LOOKING);
                        }
                        break;
                    }
                }
            } finally {
                LOG.warn("QuorumPeer main thread exited");
                try {
                    MBeanRegistry.getInstance().unregisterAll();
                } catch (Exception e) {
                    LOG.warn("Failed to unregister with JMX", e);
                }
                jmxQuorumBean = null;
                jmxLocalPeerBean = null;
            }
        }
  • 相关阅读:
    Oracle11g 修改内存配置
    七.从按键输入到GPIO通用驱动
    三.C语言版本的LED驱动试验
    五.NXP恩智浦官方SDK使用
    前期准备——1.Makefile的使用及基本语法
    八.主频及时钟配置
    四.指针形式对寄存器进行操作(类似STM32效果)
    二.I.MX6U的启动方式
    六.蜂鸣器驱动
    六.项目的BSP工程管理
  • 原文地址:https://www.cnblogs.com/ironroot/p/7403897.html
Copyright © 2011-2022 走看看