zoukankan      html  css  js  c++  java
  • zookeeper源码分析:选举流程和请求处理

    集群启动:

    1.   QuorumPeerMain. runFromConfig()
    2.   quorumPeer.start();
      • loadDataBase();
      • cnxnFactory.start();          //网络通信交互
      • startLeaderElection();//启动选举类
      • super.start();
    3.   quorumPeer.run()
      • case LOOKING:   
        • new ReadOnlyZooKeeperServer()   //选举期间启用只读服务器,
        • FastLeaderElection.start()  //设置启动投票的后台线程
        • setCurrentVote(makeLEStrategy().lookForLeader()); while循环
      • case LEADING:        new LeaderZooKeeperServer().lead();

    case LOOKING:快速选举

    1.   FastLeaderElection.start()
      • sendqueue = new LinkedBlockingQueue<ToSend>();//发出的投票队列
      • recvqueue = new LinkedBlockingQueue<Notification>();//收到的投票对列
      • this.messenger = new Messenger(manager);//投票消息后台服务线程
        • new WorkerSender(manager); //开启投票的发送线程
        • new WorkerReceiver(manager);//开启投票的接收线程
    2.  FastLeaderElection.lookForLeader():
      •   sendNotifications
      •   比较epoch
      •   totalOrderPredicate//对两个投票进行比较,规则是:选举期数 > zxid > serverid
      •   recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));/收集选票
      •   termPredicate//判断是否已经选出leader
    3. case LEADING: termPredicate
    4. ReadOnlyZooKeeperServer.startup()
      • Zookeeper.startup
      • createSessionTracker();//session管理类
      • setupRequestProcessors();//子类重写的方法,设置request处理责任链

    case LEADING:

    请求处理

    1.   NIOServerCnxnFactory.start()
    2.   setupRequestProcessors
      1. PrepRequestProcessor
      2. ProposalRequestProcessor
      3. CommitProcessor
      4. ToBeAppliedRequestProcessor
      5. FinalRequestProcessor
    3.   ProposalRequestProcessor.run()
      1. LearnerHandler.run();//leader初始化的时候,每accept一个connection,就会生成一个follower专门的LearnerHandler用于处理事务
      2. CommitProcessor.processRequest(request);// LinkedList queuedRequests,是已经发出的提议,要等待收到过半服务器ack的请求队列
      3. Leader.propose(request);//向所有的folllower发送提案
      4. Leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
      5. p.ackSet.add(sid); //对于一个特定的提案,每个follower反馈的ACK信息
      6. if (self.getQuorumVerifier().containsQuorum(p.ackSet)){ //超过半数

    toBeApplied.add(p);//用于   ToBeAppliedRequestProcessor处理

    commit(zxid);//提交给集群中所有成员

    zk.commitProcessor.commit(p.request);//提交给commitProcessor处理,LinkedList committedRequests,是已经收到过半服务器ack的请求队列,以为着该请求可以被提交了。

    }

    4.  CommitProcessor.run();//匹配queuedRequests和committedRequests中的请求,并交给nextProcessor处理。

    5.  ToBeAppliedRequestProcessor.processRequest(request);//交给nextProcessor处理,如果toBeApplied中的第一个元素与request的zxid相等,则可以移除toBeApplied队首元素

  • 相关阅读:
    ansible4:playbook介绍及使用
    ansible3:模块介绍
    Rabin加密算法
    基础业务:图片懒加载
    基础业务:滚动到指定位置导航固定(CSS实现)
    数据库事务处理的并发控制技术(二):事务模型
    详解HTTP缓存
    数据库事务处理的并发控制技术(一):并发控制概述
    二叉树的深度优先遍历和广度优先遍历
    Virtual DOM的简单实现
  • 原文地址:https://www.cnblogs.com/nazhizq/p/8267695.html
Copyright © 2011-2022 走看看