zoukankan      html  css  js  c++  java
  • zk 两阶段提交(待完善)

    zk 节点是一个 QuorumPeer,选举结束后,leader 和 follower 各自执行自己的逻辑:

    org.apache.zookeeper.server.quorum.QuorumPeer#run
    
    org.apache.zookeeper.server.quorum.QuorumPeer#setLeader
    org.apache.zookeeper.server.quorum.Leader#lead
    
    org.apache.zookeeper.server.quorum.QuorumPeer#setFollower
    org.apache.zookeeper.server.quorum.Follower#followLeader

    不管是 leader 还是 follower,都封装了一个 QuorumPeer 对象,QuorumPeer.ServerCnxnFactory 监听端口,处理客户端 io 事件。
    请求处理的入口:把请求交给 procesor 链的 firstProcessor

    org.apache.zookeeper.server.ZooKeeperServer#processPacket

    leader

    构建 processor 链

    // org.apache.zookeeper.server.quorum.LeaderZooKeeperServer#setupRequestProcessors
    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
                finalProcessor, getLeader().toBeApplied);
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }

    请求处理链:PrepRequestProcessor, ProposalRequestProcessor, CommitProcessor, ToBeAppliedRequestProcessor, FinalRequestProcessor

    由 ProposalRequestProcessor 构造函数可,还有一条链:SyncRequestProcessor, AckRequestProcessor

    public ProposalRequestProcessor(LeaderZooKeeperServer zks,
            RequestProcessor nextProcessor) {
        this.zks = zks;
        this.nextProcessor = nextProcessor;
        AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
        syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
    }

    follower

    构建 processor 链

    org.apache.zookeeper.server.quorum.FollowerZooKeeperServer#setupRequestProcessors

    链1:FollowerRequestProcessor, CommitProcessor, FinalRequestProcessor

    链2:SyncRequestProcessor, SendAckRequestProcessor

    两阶段写

    leader 接收写请求,反序列化请求体

    org.apache.zookeeper.server.PrepRequestProcessor#processRequest

    leader 发送建议给 follower

    // org.apache.zookeeper.server.quorum.ProposalRequestProcessor#processRequest
    zks.getLeader().propose(request);

    leader 发送建议给自身

    // 由 leader 的 SyncRequestProcessor 把请求写入事务日志
    syncProcessor.processRequest(request);

    leader 的 AckRequestProcessor 返回 ack 给自身

    org.apache.zookeeper.server.quorum.AckRequestProcessor#processRequest

    follower 接收并处理 leader 发送的建议

    org.apache.zookeeper.server.quorum.Follower#processPacket

    follower 的 SyncRequestProcessor 写入事务日志

    org.apache.zookeeper.server.SyncRequestProcessor#run

    follower 的 SendAckRequestProcessor 发送 ACK 给 leader

    org.apache.zookeeper.server.quorum.SendAckRequestProcessor#processRequest

    leader 接收并处理 follower 的 ack

    org.apache.zookeeper.server.quorum.LearnerHandler#run  
    org.apache.zookeeper.server.quorum.Leader#processAck

    ack 超过半数,向 follower 发送提交事件(把数据放入发送队列),并把写操作应用到 DataTree

    follower 提交写操作,把数据写入 DataTree

    CommitProcessor 和 FinalRequestProcessor

    数据写入的 2 个关键时机
    写事务日志

    // org.apache.zookeeper.server.SyncRequestProcessor
    zks.getZKDatabase().append(si)

    写 DataTree

    // org.apache.zookeeper.server.FinalRequestProcessor
    getZKDatabase().processTxn(hdr, txn)

    异常情况一:follower 处理 propose 超时,leader 如何处理?
    leader 定期发送 ping 给 follower

    // org.apache.zookeeper.server.quorum.LearnerHandler#ping
    public void ping() {
        long id;
        if (syncLimitCheck.check(System.nanoTime())) {
            synchronized(leader) {
                id = leader.lastProposed;
            }
            QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
            queuePacket(ping);
        } else {
            LOG.warn("Closing connection to peer due to transaction timeout.");
            shutdown();
        }
    }

    如果发现上一条 propose 超时,则断开与 follower 的连接,follower 会 shutdown,然后重新创建对象,重连 leader,会发送 ack 消息

    查看事务日志:

    org.apache.zookeeper.server.LogFormatter#main 

    参数 D:/zk_test/data1/version-2/log.100000001

  • 相关阅读:
    我今天能懂
    SpringMVC之RequestContextHolder分析
    idea只导入部分依赖
    idea中GitPush失败问题
    SpringBoot常用配置,引入外部配置文件信息,热加载
    idea的yml文件不识别问题
    SpringBoot介绍,快速入门小例子,目录结构,不同的启动方式,SpringBoot常用注解
    Java连接Redis,存储对象获取对象()byte和json),连接池
    Nginx的反向代理
    Nginx介绍,安装,配置
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11845549.html
Copyright © 2011-2022 走看看