zoukankan      html  css  js  c++  java
  • QuorumPeer

    QuorumPeer是zookeeper执行同步,选主过程的线程

    主要逻辑:

      1.当自己处于 LOOKING状态:  执行lookForLeader()

      2.当自己处于OBSERVING状态: 建立一个Observer服务和leader进行同步

      3.当自己处于LEADING时: 建立一个Leader服务,处理peer的请求. 

      4.当自己处于FOLLOWING时:建立一个Follwer服务和leader执行同步

    @Override
        public void run() {
            setName("QuorumPeer" + "[myid=" + getId() + "]" +
                    cnxnFactory.getLocalAddress());
    
           ......try {
                /*
                 * Main loop
                 */
                while (running) {
                    switch (getPeerState()) {
                    case LOOKING:
                        LOG.info("LOOKING");
    
                        if (Boolean.getBoolean("readonlymode.enabled")) {
                            LOG.info("Attempting to start ReadOnlyZooKeeperServer");
    
                            // Create read-only server but don't start it immediately
                            final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                                    logFactory, this,
                                    new ZooKeeperServer.BasicDataTreeBuilder(),
                                    this.zkDb);
        
                            // Instead of starting roZk immediately, wait some grace
                            // period before we decide we're partitioned.
                            //
                            // Thread is used here because otherwise it would require
                            // changes in each of election strategy classes which is
                            // unnecessary code coupling.
                            Thread roZkMgr = new Thread() {
                                public void run() {
                                    try {
                                        // lower-bound grace period to 2 secs
                                        sleep(Math.max(2000, tickTime));
                                        if (ServerState.LOOKING.equals(getPeerState())) {
                                            roZk.startup();
                                        }
                                    } catch (InterruptedException e) {
                                        LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                    } catch (Exception e) {
                                        LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                    }
                                }
                            };
                            try {
                                roZkMgr.start();
                                setBCVote(null);
                                setCurrentVote(makeLEStrategy().lookForLeader());
                            } catch (Exception e) {
                                LOG.warn("Unexpected exception",e);
                                setPeerState(ServerState.LOOKING);
                            } finally {
                                // If the thread is in the the grace period, interrupt
                                // to come out of waiting.
                                roZkMgr.interrupt();
                                roZk.shutdown();
                            }
                        } else {
                            try {
                                setBCVote(null);
                                setCurrentVote(makeLEStrategy().lookForLeader());
                            } catch (Exception e) {
                                LOG.warn("Unexpected exception", e);
                                setPeerState(ServerState.LOOKING);
                            }
                        }
                        break;
                    case OBSERVING: //建立obser服务
                        try {
                            LOG.info("OBSERVING");
                            setObserver(makeObserver(logFactory));
                            observer.observeLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e );                        
                        } finally {
                            observer.shutdown();
                            setObserver(null);
                            setPeerState(ServerState.LOOKING);
                        }
                        break;
                    case FOLLOWING: //建立 Follower服务
                        try {
                            LOG.info("FOLLOWING");
                            setFollower(makeFollower(logFactory));
                            follower.followLeader();
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                        } finally {
                            follower.shutdown();
                            setFollower(null);
                            setPeerState(ServerState.LOOKING);
                        }
                        break;
                    case LEADING:  //建立 Leader服务
                        LOG.info("LEADING");
                        try {
                            setLeader(makeLeader(logFactory));
                            leader.lead();
                            setLeader(null);
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                        } finally {
                            if (leader != null) {
                                leader.shutdown("Forcing shutdown");
                                setLeader(null);
                            }
                            setPeerState(ServerState.LOOKING);
                        }
                        break;
                    }
                }
            } finally {
                LOG.warn("QuorumPeer main thread exited");
                try {
                    MBeanRegistry.getInstance().unregisterAll();
                } catch (Exception e) {
                    LOG.warn("Failed to unregister with JMX", e);
                }
                jmxQuorumBean = null;
                jmxLocalPeerBean = null;
            }
        }
  • 相关阅读:
    windows下的mysql备份恢复命令
    sp_addlinkedserver的使用方法 (转)
    一个以pubs数据库为例的SQL SERVER数据库全文索引
    【C#】可空类型(Nullable)
    【C#】 异常处理
    【电脑常识】如何查看电脑是32位(X86)还是64位(X64),如何知道硬件是否支持64位系统
    【C#】委托Delegate
    【工具】VS2010常用调试技巧(1)
    实验1 总结
    用C语言编程自动生成四则运算
  • 原文地址:https://www.cnblogs.com/ironroot/p/7403897.html
Copyright © 2011-2022 走看看