zoukankan      html  css  js  c++  java
  • RocketMQ 主从同步

    RocketMQ 的主和从一直在使用 nio 进行数据同步:

    master

    master 监听端口
    org.apache.rocketmq.store.ha.HAService.AcceptSocketService#beginAccept
    
    master 建立连接
    org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run
    
    master 读取 slave 上报的 maxOffset
    org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#run
    
    master 传输数据给 slave
    org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run

    slave

    slave 连接 master
    org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster
    
    slave 定时报告 maxOffset 给 master
    org.apache.rocketmq.store.ha.HAService.HAClient#run
    
    slave 接收 master 传输来的数据
    org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent

    这里的同步,暂时只涉及到 commitLog。

    同步双写的本质,master 先写磁盘,然后等待 slave 同步消息成功。

    写磁盘:

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // Synchronization flush
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }
        // Asynchronous flush
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else {
                commitLogService.wakeup();
            }
        }
    }

    同步到 slave:

    // org.apache.rocketmq.store.CommitLog#handleHA
    public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // SYNC_MASTER 则执行逻辑
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (messageExt.isWaitStoreMsgOK()) {
                // Determine whether to wait
                // slave 没有落后 master 太多
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    // 创建 GroupCommitRequest 放入 GroupTransferService 的 requestsWrite 中
                    // GroupTransferService.run 会一直比较 GroupCommitRequest#nextOffset 和 slave 已提交的位移
                    service.putRequest(request);
                    service.getWaitNotifyObject().wakeupAll();
                    // 等待 5 秒,检查 slave 的同步结果
                    boolean flushOK =
                        request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                            + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }
    }
  • 相关阅读:
    pandas 读取excle ,迭代
    prettytable模块(格式化打印内容)
    Python开发丨这些面试题会不会难倒你
    python开发最受欢迎的十款工具
    语言组成
    运算符优先级
    Python 位运算符 逻辑运算符 成员运算符
    **算术运算符
    **Python数据类型转换
    字符串-数字-列表(转换)
  • 原文地址:https://www.cnblogs.com/allenwas3/p/11733334.html
Copyright © 2011-2022 走看看