zoukankan      html  css  js  c++  java
  • rocketmq的broker如何同步信息的?

    public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
    this.defaultMessageStore = defaultMessageStore;
    this.acceptSocketService =
    new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
    this.groupTransferService = new GroupTransferService();
    this.haClient = new HAClient();
    }


    一个haservice下面有accpet和haclient分别对应客户端和服务端,grouptranserservie用来控制消息是否获取到,下面具体讲。

    拿haclient举例子,在主线程做的事情

    public void run() {
                log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        if (this.connectMaster()) {
    
                            if (this.isTimeToReportOffset()) {
                                boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                                if (!result) {
                                    this.closeMaster();
                                }
                            }
    
                            this.selector.select(1000);
    
                            boolean ok = this.processReadEvent();
                           
                        } else {
                            this.waitForRunning(1000 * 5);
                        }
    

    也就是在rocketmq里面,一个具体的任务就是单独分配一个线程,从而发挥多线程优势,在主线程上面休眠等待唤醒或者超时唤醒然后执行io动作。

    一个典型的基于bytebuffer的写操作,通过positon、limit来判断是否数据写完:

    private boolean reportSlaveMaxOffset(final long maxOffset) {
                this.reportOffset.position(0);
                this.reportOffset.limit(8);
                this.reportOffset.putLong(maxOffset);
                this.reportOffset.position(0);
                this.reportOffset.limit(8);
    
                for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                    try {
                        this.socketChannel.write(this.reportOffset);
                    } catch (IOException e) {
                        log.error(this.getServiceName()
                            + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                        return false;
                    }
                }
    
                return !this.reportOffset.hasRemaining();
            }
    

      

    haservice里面所有的io没有走netty,全部使用原始select做异步io,然后直接使用nio的bytebuff做read和write操作

    另外rocketmq里面的每个线程实现都有一个特别的标志位:

    public abstract class ServiceThread implements Runnable {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
    
        private static final long JOIN_TIME = 90 * 1000;
    
        protected final Thread thread;
        protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
        protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
        protected volatile boolean stopped = false;
    

      

    这个hasnotified和countdownlatch是配合一起使用的。如果一个线程被countdown过、唤醒过,那么hasnotified就通过cas被设置成true,下一个循环进入wait的时候,不用等待超时也不用等待下一次唤醒,直接通过hasnotified这个标志位可以直接唤醒,相当于第一次唤醒我的时候 我当时没有在阻塞,那么第一次唤醒我的时候 先设置一个标示hasnotified,下次进入阻塞的时候可以直接走唤醒流程,不用等待。

    下面具体讲下每个模块

    HAclient:

    干了两个事情:

    1 备broker去nameserv注册的时候,可以从nameserv拿到master-broker的ha-address,拿到这个地址以后,通过haclient去连接master-broker。定期给主机broker上报自己的currentReportedOffset,也就是备机broker自己当前的commit-log在什么地方了

    2 在channel上面尝试读取数据,这个就是主机broker发过来的具体数据提交到自己的commit-log里面。

    也就是对于一个备机broker而言,发布自己的ack-offset和接收主机broker的实际数据都在ha-client一个线程完成的:

    3 ha-client用到了双缓冲reallocateByteBuffer,因为主机broker发过来的数据有可能备机broker的bytebuffer已经存不下了,只能存一半,这时候需要把已经落盘的数据从bytebuffer清理掉,然后写了一半的bytebuffer从后半部分移动到前半部分,那么需要有一个第三者tmp做swap,bytebufferbackup就是这个tmp,大小跟bytebufferread一样,防止极端情况:

     private void reallocateByteBuffer() {
                int remain = READ_MAX_BUFFER_SIZE - this.dispatchPostion;
                if (remain > 0) {
                    this.byteBufferRead.position(this.dispatchPostion);
    
                    this.byteBufferBackup.position(0);
                    this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
                    this.byteBufferBackup.put(this.byteBufferRead);
                }
    
                this.swapByteBuffer();
    
                this.byteBufferRead.position(remain);
                this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
                this.dispatchPostion = 0;
            }
    
            private void swapByteBuffer() {
                ByteBuffer tmp = this.byteBufferRead;
                this.byteBufferRead = this.byteBufferBackup;
                this.byteBufferBackup = tmp;
            }
    

      

    
    

    AcceptSocketService:

    干一件事:绑定端口以后,作为accpet,在主循环的select里面,监听accpet事件,如果有客户端连接进来,那么生成一个haconnection。

    所以看得出来,只有主机broker才有这个accpetsocketservice和haconnection。下面具体说下HaConnection

    public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
            this.haService = haService;
            this.socketChannel = socketChannel;
            this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
            this.socketChannel.configureBlocking(false);
            this.socketChannel.socket().setSoLinger(false, -1);
            this.socketChannel.socket().setTcpNoDelay(true);
            this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
            this.socketChannel.socket().setSendBufferSize(1024 * 64);
            this.writeSocketService = new WriteSocketService(this.socketChannel);
            this.readSocketService = new ReadSocketService(this.socketChannel);
            this.haService.getConnectionCount().incrementAndGet();
        }
    

     socketchannel是服务端accept后拿到的、跟客户端通信的channel。

       SO_LINGER选项,使用默认的

       最主要的是有两个线程,一个是writesocketService一个是readSocketService

       writesocketservice:当主机写入commit-log以后offset肯定会长,但是备机传过来的ack-offset没有增长。通过这种方式主机知道此时需要把什么数据传给备机。

       这个线程没有双缓冲,也没有swap-bytebuffer,全部数据通过网络io写出去即可,不涉及磁盘io。这个线程平时不需要工作,只有在有新数据的时候才需要工作,啥时候被唤醒的呢?

       是在service.getWaitNotifyObject().wakeupAll()业务线程进行唤醒的

       readSocketService: 专门接收备机发过来的ack-offset的,收到新的ack以后,通过HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset)唤醒GroupTransferService,后者专门处理消息是否真的已经被接收。

      GroupTransferService:

      双缓冲逻辑,在主循环的waitforEnding结束后的onWaitEnd中,执行swapRequests,把requestsWrite和requestsRead互换,因为这个线程在处理的时候需要用synchronnized锁整个requestsRead,别人无法put了,所以弄一个requestsWrite出来,其他线程可以在这个里面put,跟自己的线程锁住的requestsRead不冲突。

      

      这个模块本质上就是一个thread,干两个事情:

      1 在waitfoRunning中等待

      2 等待超时或者被唤醒的话,那么针对requestRead里面所有request,push2SlaveMaxOffset(这个就是备机的ack-offset)大于request的offset的话,那么说明备机当前已经有这个数据了,那么wakeupCustomer把在request上的CountdownLatch去掉,并且把GroupCommitRequest的flushOK=ture。

    如果备机的ack-offset比GroupCommitRequest小的话,那么循环5次等待,阻塞在notifyTransferObject,尝试等待5次看看备机的ack-offset 也就是push2SlaveMaxOffset能不能追上来,从而也让这个GroupCommitRequest的flushOK=true。

    private void doWaitTransfer() {
                synchronized (this.requestsRead) {
                    if (!this.requestsRead.isEmpty()) {
                        for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                            boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                            for (int i = 0; !transferOK && i < 5; i++) {
                                this.notifyTransferObject.waitForRunning(1000);
                                transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                            }
    
                            if (!transferOK) {
                                log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                            }
    
                            req.wakeupCustomer(transferOK);
                        }
    
                        this.requestsRead.clear();
                    }
                }
            }
    

      

    那么谁在request上的countdownlatch等待呢?flushok啥含义

    如果是同步master的话,两个地方:handleHA和handleDiskFlush

    以前者举例子,handleHA里面:

     public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
            if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
                HAService service = this.defaultMessageStore.getHaService();
                if (messageExt.isWaitStoreMsgOK()) {
                    // Determine whether to wait
                    if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                        GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                        service.putRequest(request);
                        service.getWaitNotifyObject().wakeupAll();
                        boolean flushOK =
                            request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                        if (!flushOK) {
                            log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                                + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                            putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                        }
                    }
                    // Slave problem
                    else {
                        // Tell the producer, slave not available
                        putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                    }
                }
            }
    

      

    主机在sendmessage以后,执行service.putRequest

    1 会在GroupTransferService的requestWrite里面放入新的request,让GroupTransferService去检查这个request是否已经被备机同步了

    2 对GroupTransferService做waitpoint.countDown,让GroupTransferService干活必须先要唤醒他。

    public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
                synchronized (this.requestsWrite) {
                    this.requestsWrite.add(request);
                }
                if (hasNotified.compareAndSet(false, true)) {
                    waitPoint.countDown(); // notify
                }
            }
    

      

    GroupTransferService每次被唤醒的时候,首先把requestWrite放入到requestRead里面,然后检查request的offset和备机ackoffset是否ok。

    service.getWaitNotifyObject().wakeupAll(); 这个是唤醒writeSockeService,新的数据来了,那么写线程需要工作了。

    4 在每个request上面做waitforFlush,也就是在request上面countdown等待,然后检查flushok。

    所以主机上面新数据来了以后,业务线程唤醒writeSocketService去发数据给备机broker(writeSocketService通过检查commit-log的offset感知主机数据offset增长了),然后唤醒grouptransferservice去检查每个request是否已经ok。

    grouptransferservice的工作没有放在writeSocketService,而是单独一个线程来做,还是利用多核并发处理。

    grouptransferservice即使被业务线程在putRequest中waitPoint.countDown();被唤醒还会被this.notifyTransferObject.waitForRunning(1000)阻塞,因为被业务线程唤醒也不能表示立马可以更新request和ack-offset的关系,比较备机新的ack-offset还没来,所以还需要readSocketService在拿到新的ackoffset以后,通过this.groupTransferService.notifyTransferSome();进一步唤醒groupTransferService,此时才能真正更新request是状态flushok状态

    private void doWaitTransfer() {
                synchronized (this.requestsRead) {
                    if (!this.requestsRead.isEmpty()) {
                        for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                            boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                            for (int i = 0; !transferOK && i < 5; i++) {
                                this.notifyTransferObject.waitForRunning(1000);
                                transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                            }
    
                            if (!transferOK) {
                                log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                            }
    
                            req.wakeupCustomer(transferOK);
                        }
    
                        this.requestsRead.clear();
                    }
                }
            }
  • 相关阅读:
    python 线程之 数据同步 Queue
    python 线程之threading(五)
    python 线程之 threading(四)
    python 线程之 threading(三)
    php-属性和方法的重载
    wordpress-4.7.2-zh_CN页面加载慢
    php-__autoload()
    php-_toString()方法
    php-final
    php-parent::和self::
  • 原文地址:https://www.cnblogs.com/notlate/p/10326365.html
Copyright © 2011-2022 走看看