zoukankan      html  css  js  c++  java
  • Follower

    Follower是 follower节点启动的和leader进行同步的功能类

    主要逻辑如下:

      1.和leader建立链接

      2.向leader发送自己的epoch和zxid.

    void observeLeader() throws InterruptedException {
            zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
    
            try {
                InetSocketAddress addr = findLeader();
                LOG.info("Observing " + addr);
                try {
              //建立连接 connectToLeader(addr);
              //向leader发送自己的epoch和zxid,通过发送类型为Leader.OBSERVERINFO的packet,并阻塞到收到
    long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);           
              //和leader执行日志同步,并接收UPTODATE命令开始接收client的连接请求 syncWithLeader(newLeaderZxid); QuorumPacket qp
    = new QuorumPacket(); while (this.isRunning()) { readPacket(qp);
                //读取Packet并处理 processPacket(qp); } }
    catch (Exception e) { LOG.warn("Exception when observing the leader", e); try { sock.close(); } catch (IOException e1) { e1.printStackTrace(); } // clear pending revalidations pendingRevalidations.clear(); } } finally { zk.unregisterJMX(this); } }

    处理来自leader的数据包

    protected void processPacket(QuorumPacket qp) throws IOException{
            switch (qp.getType()) {
            case Leader.PING:      //心跳      
                ping(qp);            
                break;
            case Leader.PROPOSAL:     //写入提议       
                TxnHeader hdr = new TxnHeader();
                Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
                if (hdr.getZxid() != lastQueued + 1) {
                    LOG.warn("Got zxid 0x"
                            + Long.toHexString(hdr.getZxid())
                            + " expected 0x"
                            + Long.toHexString(lastQueued + 1));
                }
                lastQueued = hdr.getZxid();
                fzk.logRequest(hdr, txn);
                break;
            case Leader.COMMIT:  //提交提议
                fzk.commit(qp.getZxid());
                break;
            case Leader.UPTODATE:  //此时follower已经在上面的syncWithLeader执行过了,所以忽略.
                LOG.error("Received an UPTODATE message after Follower started");
                break;
            case Leader.REVALIDATE:
                revalidate(qp);
                break;
            case Leader.SYNC:
                fzk.sync();
                break;
            }
        }
  • 相关阅读:
    ZooKeeper实现配置中心的实例(原生API实现)(转)
    com.101tec.ZKClient实现中的subscribeDataChanges设置的监听器事件不回调的问题研究
    Spring Framework体系结构简介
    Spring MVC中@RequestMapping注解使用技巧(转)
    Tomcat配置文件server.xml(转)
    Ubuntu 16.04配置VNC进行远程桌面连接
    Eclipse查看方法/类调用的方法
    MySQL Workbench查看和修改表字段的Comment值
    MySQL常用函数(转)
    MySQL大小写问题的简单说明(关键字/函数/表名)(转)
  • 原文地址:https://www.cnblogs.com/ironroot/p/7403941.html
Copyright © 2011-2022 走看看