zoukankan      html  css  js  c++  java
  • RocketMQ-broker存储机制-HA数据同步

    RocketMQ-broker存储机制-HA数据同步

    HA机制解决读写分离模式下slave与master的数据同步问题,在master broker高负载的情况下,实现slave broker的数据订阅。HA的主要实现逻辑在HaServer类中,入口在putMessage的handleHA()方法初。

    HA分为同步复制和异步复制,同步复制逻辑和同步刷盘机制差不多,都是同步等待通知的机制。put request之后,调用request的waitForFlush方法。

        public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
            if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
                HAService service = this.defaultMessageStore.getHaService();
                if (messageExt.isWaitStoreMsgOK()) {
                    // Determine whether to wait
                    if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                        GroupCommitRequest  request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                        service.putRequest(request);
                        service.getWaitNotifyObject().wakeupAll();
                        boolean flushOK =
                            request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                        if (!flushOK) {
                            log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                                + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                            putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                        }
                    }
                    // Slave problem
                    else {
                        // Tell the producer, slave not available
                        putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                    }
                }
            }
    
        }

    具体的处理逻辑在GroupTransferService中,其跟同步刷盘一样,采用读写分离,通过比较slave传过来的已经完成的offset是否 >= 与当前数据在本地commitlog的offset,来确定是否完成数据同步

            private void doWaitTransfer() {
                synchronized (this.requestsRead) {
                    if (!this.requestsRead.isEmpty()) {
                        for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                            boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                            /**
                             * 最多循环比较4次,如果发现已经同步到slave上的offset 超过了当前的需要同步数据在本地commitlog的offset的时候
                             * 表示已经成功同步
                             */
                            for (int i = 0; !transferOK && i < 5; i++) {
                                this.notifyTransferObject.waitForRunning(1000);
                                transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                            }
    
                            if (!transferOK) {
                                log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                            }
    
                            // 通知给客户端
                            req.wakeupCustomer(transferOK);
                        }
    
                        this.requestsRead.clear();
                    }
                }
            }

    而真正的数据同步是采用异步方式具体实现在HaConnection中,服务端发送commitlog中的数据,并接收到slave同步进度的ACKoffset.  而slave在HaClient中,每5s中上报同步进度或者收到master的数据并写到commitlog中后发送同步进度。

        public void start() throws Exception {
            // 注册服务端监听,绑定端口,注册selecter选择器和感兴趣的事件选择键
            this.acceptSocketService.beginAccept();
            // 启动服务端就绪选择,把接收到的客户端连接通道包装成HaConnection来处理
            /**
             * HaConnection内部实现了ReadSocketService和WriteSocketService
             * ReadSocketService 用于master接收来自slave的同步进度
             * WriteSocketService 用于发送commitlog的同步数据
             */
            this.acceptSocketService.start();
    
            // 接受broker HA请求,并通知客户端是否完成数据同步的服务
            this.groupTransferService.start();
    
            // haClient是slave处理master的同步数据  以及 slave上报同步进度
            this.haClient.start();
        }

     先来看一下master 的acceptSocketService.start()执行的逻辑

    /** {@inheritDoc} */
            @Override
            public void run() {
                log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        this.selector.select(1000);
                        Set<SelectionKey> selected = this.selector.selectedKeys();
    
                        if (selected != null) {
                            for (SelectionKey k : selected) {
                                if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                                    //  接收到来自slave的连接
                                    SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
    
                                    if (sc != null) {
                                        HAService.log.info("HAService receive new connection, "
                                            + sc.socket().getRemoteSocketAddress());
    
                                        try {
                                            // HAConnection管理着各个slave的读写
                                            HAConnection conn = new HAConnection(HAService.this, sc);
                                            conn.start();
                                            HAService.this.addConnection(conn);
                                        } catch (Exception e) {
                                            log.error("new HAConnection exception", e);
                                            sc.close();
                                        }
                                    }
                                } else {
                                    log.warn("Unexpected ops in select " + k.readyOps());
                                }
                            }
    
                            selected.clear();
                        }
                    } catch (Exception e) {
                        log.error(this.getServiceName() + " service has exception.", e);
                    }
                }
    
                log.info(this.getServiceName() + " service end");
            }
        public void start() {
            // 处理slave的同步进度
            this.readSocketService.start();
            // 向slave发送同步数据
            this.writeSocketService.start();
        }

    接下来看一下slave 是如何处理的

            @Override
            public void run() {
                log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        // 建立master連接 初始化socketChannel
                        // 同时也注册了读事件
                        if (this.connectMaster()) {
                            // 距离上一次slave请求master的时间是否超过5s
                            if (this.isTimeToReportOffset()) {
                                // 同步当前的offset到master
                                boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                                if (!result) {
                                    this.closeMaster();
                                }
                            }
                            // 就绪选择  堵塞
                            this.selector.select(1000);
                            // 收到master的数据
                            // 处理master的同步数据  并写到本地commitlog
                            boolean ok = this.processReadEvent();
                            if (!ok) {
                                this.closeMaster();
                            }
    
                            // salve本地写完数据  则立即向master同步进度
                            if (!reportSlaveMaxOffsetPlus()) {
                                continue;
                            }
    
                            long interval =
                                HAService.this.getDefaultMessageStore().getSystemClock().now()
                                    - this.lastWriteTimestamp;
                            // 距离上一次收到master同步的数据超过了20s   则断开连接
                            if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                                .getHaHousekeepingInterval()) {
                                log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                                    + "] expired, " + interval);
                                this.closeMaster();
                                log.warn("HAClient, master not response some time, so close connection");
                            }
                        } else {
                            this.waitForRunning(1000 * 5);
                        }
                    } catch (Exception e) {
                        log.warn(this.getServiceName() + " service has exception. ", e);
                        this.waitForRunning(1000 * 5);
                    }
                }
    
                log.info(this.getServiceName() + " service end");
            }
  • 相关阅读:
    CodeForces 660D Number of Parallelograms
    【POJ 1082】 Calendar Game
    【POJ 2352】 Stars
    【POJ 2481】 Cows
    【POJ 1733】 Parity Game
    【NOI 2002】 银河英雄传说
    【NOI 2015】 程序自动分析
    【POJ 1704】 Georgia and Bob
    【HDU 2176】 取(m堆)石子游戏
    【SDOI 2016】 排列计数
  • 原文地址:https://www.cnblogs.com/gaojy/p/15087877.html
Copyright © 2011-2022 走看看