zoukankan      html  css  js  c++  java
  • rocketmq里面生产者发送一次消息以后commit-log做了哪些事情

    生产者发送一条消息的时候,在主broker里面最终会运行到:

      

    PutMessageResult result = this.commitLog.putMessage(msg); 在这里第一次有了锁,也就是发送一条消息,一路行都没有锁,直到这里涉及mappedfile的时候才有锁。因为要保证消息有序性质,先到的消息的offset更低,所以这里必须要有锁。

    对于commit-log来说,首先要找一个存储介质,也就是从MappedFileQueue里面取出一个Queue,如果没有那么需要构造一个出来,构造MappedFile的过程都在AllocateMappedFileService里面。

    AllocateMappedFileService也是一个单独线程,他在主循环的mmapOperation里面,阻塞在req = this.requestQueue.take();

    private PriorityBlockingQueue<AllocateRequest> requestQueue =    new PriorityBlockingQueue<AllocateRequest>();

    他是一个优先阻塞队列,当外面有任务提交request的时候这个线程才会从阻塞中醒来。通过优先队列可以保证文件创建的先后顺序。

    对于mappedfile的初始化:

        private void init(final String fileName, final int fileSize) throws IOException {
            this.fileName = fileName;
            this.fileSize = fileSize;
            this.file = new File(fileName);
            this.fileFromOffset = Long.parseLong(this.file.getName());
            boolean ok = false;
    
            ensureDirOK(this.file.getParent());
    
            try {
                this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
                this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
    

      

    mappedByteBuffer就是跟磁盘文件共享存储的,并且是基于直接内存的,但是写在内存里面还需要做一次force操作才能刷到磁盘里面去。

    对于一个mappedfile来说,第一个mappedfile的文件名字就叫0000000000000,第二个交0000000001073741824,第一个mappedfile的大小就是0000000001073741824

    fileFromOffSet就是文件名字。wrotePosition是已经写到内存的位置,flushedPosition是已经刷盘到硬盘的位置。后面两个都是以这个mapped为起点的,不是绝对全局0作为起点

    在doAppend方法里面完成了对于生产者发送过来的消息的存储,其中涉及到这条消息的绝对offset,绝对offset其实就是fileFromOffSet+wrotePosition。

    还涉及topicQueueTable:private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);

    也就是一个topic、queue-id作为key,value是从0开始依次递增。

    完成对mappedfile的写入以后,锁就可以释放了,后面还有两个关于可靠性的两个同步问题,磁盘同步跟主备同步。

    handleDiskFlush(result, putMessageResult, msg);
    handleHA(result, putMessageResult, msg);

    commit-log里面维护的是mappedfileQueue,它的刷盘的逻辑是:

        public boolean flush(final int flushLeastPages) {
            boolean result = true;
            MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
            if (mappedFile != null) {
                long tmpTimeStamp = mappedFile.getStoreTimestamp();
                int offset = mappedFile.flush(flushLeastPages);
                long where = mappedFile.getFileFromOffset() + offset;
                result = where == this.flushedWhere;
                this.flushedWhere = where;
                if (0 == flushLeastPages) {
                    this.storeTimestamp = tmpTimeStamp;
                }
            }
    
            return result;
        }
    

      根据上次刷盘位置取出最后一次刷盘的mappedFile,执行它的flush方法,其实就是直接执行force即可刷盘。

        public int flush(final int flushLeastPages) {
            if (this.isAbleToFlush(flushLeastPages)) {
                if (this.hold()) {
                    int value = getReadPosition();
    
                    try {
                        //We only append data to fileChannel or mappedByteBuffer, never both.
                        if (writeBuffer != null || this.fileChannel.position() != 0) {
                            this.fileChannel.force(false);
                        } else {
                            this.mappedByteBuffer.force();
                        }
                    } catch (Throwable e) {
                        log.error("Error occurred when force data to disk.", e);
                    }
    
                    this.flushedPosition.set(value);
                    this.release();
                } else {
                    log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                    this.flushedPosition.set(getReadPosition());
                }
            }
            return this.getFlushedPosition();
        }
    

      

    能否刷盘需要看isAbleToFlush:

        private boolean isAbleToFlush(final int flushLeastPages) {
            int flush = this.flushedPosition.get();
            int write = getReadPosition();
    
            if (this.isFull()) {
                return true;
            }
    
            if (flushLeastPages > 0) {
                return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
            }
    
            return write > flush;
        }
    

      也就是刷盘的位置比写入到mappedfile内存的位置小的时候,就需要进行刷盘了。这里面的flushLeastPages其实就是等到数据积攒到比较大的时候再一次性刷盘,这个参数默认是4,只有经过一段时间以后才变成0。也就是平时都是攒着,超过一定时间才不考虑攒不攒的问题。

      

                    int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
    
                    int flushPhysicQueueThoroughInterval =
                        CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
    
                    boolean printFlushProgress = false;
    
                    // Print flush progress
                    long currentTimeMillis = System.currentTimeMillis();
                    if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                        this.lastFlushTimestamp = currentTimeMillis;
                        flushPhysicQueueLeastPages = 0;
                        printFlushProgress = (printTimes++ % 10) == 0;
                    }
    

      

    还有一点就是MappedFile还是继承自ReferenceResource,后者也只有前者这么一个子类。由于mappedfile涉及直接内存,所以需要我们自己去进行释放、维护引用计数,每次要用到mappedfile里面数据的时候,都会hold一次防止被回收。如果计数到达0,那么进入到clean方法操作直接内存的释放。

    后面的handle-ha前面已经提过不说了,还有涉及到reputMessageService关于index文件、consumeQueue的操作这里暂且不提。

  • 相关阅读:
    xml的建模
    P1341 无序字母对
    P1330 封锁阳光大学
    P2661 信息传递
    P1312 Mayan游戏
    P1514 引水入城
    C. Sad powers
    P1195 口袋的天空
    P1821 [USACO07FEB]银牛派对Silver Cow Party
    P1396 营救
  • 原文地址:https://www.cnblogs.com/notlate/p/12007242.html
Copyright © 2011-2022 走看看