zoukankan      html  css  js  c++  java
  • zk 两阶段提交(待完善)

    zk 节点是一个 QuorumPeer,选举结束后,leader 和 follower 各自执行自己的逻辑:

    org.apache.zookeeper.server.quorum.QuorumPeer#run
    
    org.apache.zookeeper.server.quorum.QuorumPeer#setLeader
    org.apache.zookeeper.server.quorum.Leader#lead
    
    org.apache.zookeeper.server.quorum.QuorumPeer#setFollower
    org.apache.zookeeper.server.quorum.Follower#followLeader

    不管是 leader 还是 follower,都封装了一个 QuorumPeer 对象,QuorumPeer.ServerCnxnFactory 监听端口,处理客户端 io 事件。
    请求处理的入口:把请求交给 procesor 链的 firstProcessor

    org.apache.zookeeper.server.ZooKeeperServer#processPacket

    leader

    构建 processor 链

    // org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
                finalProcessor, getLeader().toBeApplied);
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }

    请求处理链:PrepRequestProcessor, ProposalRequestProcessor, CommitProcessor, ToBeAppliedRequestProcessor, FinalRequestProcessor

    由 ProposalRequestProcessor 构造函数可,还有一条链:SyncRequestProcessor, AckRequestProcessor

    public ProposalRequestProcessor(LeaderZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
        syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    }

    follower

    构建 processor 链

    org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#setupRequestProcessors

    链1:FollowerRequestProcessor, CommitProcessor, FinalRequestProcessor

    链2:SyncRequestProcessor, SendAckRequestProcessor

    两阶段写

    leader 接收写请求,反序列化请求体

    org.apache.zookeeper.server.PrepRequestProcessor#processRequest

    leader 发送建议给 follower

    // org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
    zks.getLeader().propose(request);

    leader 发送建议给自身

    // 由 leader 的 SyncRequestProcessor 把请求写入事务日志
    syncProcessor.processRequest(request);

    leader 的 AckRequestProcessor 返回 ack 给自身

    org.apache.zookeeper.server.quorum.AckRequestProcessor#processRequest

    follower 接收并处理 leader 发送的建议

    org.apache.zookeeper.server.quorum.Follower#processPacket

    follower 的 SyncRequestProcessor 写入事务日志

    org.apache.zookeeper.server.SyncRequestProcessor#run

    follower 的 SendAckRequestProcessor 发送 ACK 给 leader

    org.apache.zookeeper.server.quorum.SendAckRequestProcessor#processRequest

    leader 接收并处理 follower 的 ack

    org.apache.zookeeper.server.quorum.LearnerHandler#run  
    org.apache.zookeeper.server.quorum.Leader#processAck

    ack 超过半数,向 follower 发送提交事件(把数据放入发送队列),并把写操作应用到 DataTree

    follower 提交写操作,把数据写入 DataTree

    CommitProcessor 和 FinalRequestProcessor

    数据写入的 2 个关键时机
    写事务日志

    // org.apache.zookeeper.server.SyncRequestProcessor
    zks.getZKDatabase().append(si)

    写 DataTree

    // org.apache.zookeeper.server.FinalRequestProcessor
    getZKDatabase().processTxn(hdr, txn)

    异常情况一:follower 处理 propose 超时,leader 如何处理?
    leader 定期发送 ping 给 follower

    // org.apache.zookeeper.server.quorum.LearnerHandler#ping
    public void ping() {
        long id;
        if (syncLimitCheck.check(System.nanoTime())) {
            synchronized(leader) {
                id = leader.lastProposed;
            }
            QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
            queuePacket(ping);
        } else {
            LOG.warn("Closing connection to peer due to transaction timeout.");
            shutdown();
        }
    }

    如果发现上一条 propose 超时,则断开与 follower 的连接,follower 会 shutdown,然后重新创建对象,重连 leader,会发送 ack 消息

    查看事务日志:

    org.apache.zookeeper.server.LogFormatter#main 

    参数 D:/zk_test/data1/version-2/log.100000001

  • 相关阅读:
    [置顶] windows player,wzplayerV2 for windows
    wzplayer 近期将会支持BlackBerry和WinPhone8
    wzplayerEx for android(真正硬解接口,支持加密的 player)
    ffmpeg for ios 交叉编译 (支持i686 armv7 armv7s) 包含lame支持
    ffmpeg for ios 交叉编译 (支持i686 armv7 armv7s) 包含lame支持
    编译cegcc 0.59.1
    wzplayer 近期将会支持BlackBerry和WinPhone8
    wzplayerEx for android(真正硬解接口,支持加密的 player)
    windows player,wzplayerV2 for windows(20140416)更新
    编译cegcc 0.59.1
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11845549.html
Copyright © 2011-2022 走看看