zoukankan      html  css  js  c++  java
  • RocketMQ源码 — 五、 主要feature及其实现方式

    RocketMQ的主要特点以及实现方式

    单机支持1万以上持久队列

    所有数据单独存储到一个CommitLog,完全顺序写,随机读

    在一个broker上一个DefaultMessageStore管理一个commitLog
    顺序写:在commitLog.putMessage里面获取mapedFile之后进入synchronized块,开始写内存,所以当有新的消息需要保存的时候会等待锁释放,所以写消息的时候就是顺序的

    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
    // 给commitLog上锁
    synchronized (this) {
        // 保存消息
        result = mapedFile.appendMessage(msg, this.appendMessageCallback);
    }
    

    随机读:因为在pull Message的时候根据consumeQueue来读取消息的,consumeQueue里面记录了offset,所以读取mapedFile的时候是按照offset随机读取的

    对最终用户展现的是实际只存储了消息在commitLog的位置信息,并串行刷盘

    最终用户接触到的是逻辑队列ComsumeQueue,只存储了topic、offset等信息
    串行刷盘:DefaultMessageStore使用StoreCheckPoint记录当前刷盘的文件,并只将StoreCheckPoint的mapedFile进行刷盘
    PageCache:文件cache是文件数据在内存中的副本,因此文件cache管理与内存管理和文件系统管理相关。文件cache分为两个层面,Page Cache和Buffer Cache,每一个Page Cache包含多个Buffer Cache。linux中文件Cache的操作分为两类,一是在文件Cache与应用程序提供的用户空间buffer拷贝数据(普通的read/wrote操作),二是使用mmap将Cache映射到用户空间,并没有拷贝(所以速度更快),用户空间可以像使用指针一样(普通访问文件是使用流)访问文件

    刷盘策略

    在commitLog.putMessage中决定刷盘方式,在MessageStoreConfig中配置刷盘的方式

    RocketMQ的消息都是持久化的:所有消息保存在commitlog文件夹下的文件中
    先写入系统PageCache:所有commitlog下的文件都是使用的直接内存,采用mmap文件映射的方法,每次接收到消息的时候先把消息写入直接内存PageCache——即mapedFile
    然后刷盘:启动commitLog的时候会启动刷盘的线程(FlushCommitLogService)定时刷盘
    可以保证内存与磁盘都有一份数据,访问消息的时候直接从内存中读取:读取文件的时候直接从mapedFile取

    // 同步刷盘,使用GroupCommitService
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
    	// 是否配置为等待
        if (msg.isWaitStoreMsgOK()) {
            request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            service.putRequest(request);
    		// 超时等待
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
                        + " client address: " + msg.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }
    // 异步刷盘,FlushRealTimeService
    else {
        this.flushCommitLogService.wakeup();
    }
    

    消息过滤

    1. 在DefaultMessageStore.getMessage的时候,先根据topic和queueId获取ConsumeQueue,然后读取consumeQueue,一次对比consumeQueue的tag的hashCode,如果匹配才去读取commitLog
    2. 存储tag的hashCode,定长节省空间
    3. 先读取consumeQueue,在消息堆积的情况下也能高效过滤消息

    长轮询pull

    在PullMessageService的run方法中pull message,在获取返回结果的回调中再次发起请求(只是添加pullRequest到pullRequestQueue中)——也就是长轮询

    发送消息负载均衡

    在DefaultMQProducerImpl send message的时候会调用selectOneMessageQueue(MessageQueue包含了topic,broker,queueID等信息,表明消息发送到哪一个broker的哪一个queue),使用递增取模的方法决定使用哪一个messageQueue

    消费消息的负载均衡

    AllocateMessageQueueAveragely.allocate实现了consumer消费的默认负载均衡算法,、

    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
                                           List<String> cidAll) {
        if (currentCID == null || currentCID.length() < 1) {
            throw new IllegalArgumentException("currentCID is empty");
        }
        if (mqAll == null || mqAll.isEmpty()) {
            throw new IllegalArgumentException("mqAll is null or mqAll empty");
        }
        if (cidAll == null || cidAll.isEmpty()) {
            throw new IllegalArgumentException("cidAll is null or cidAll empty");
        }
    
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        if (!cidAll.contains(currentCID)) {
            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", //
                    consumerGroup, //
                    currentCID,//
                    cidAll);
            return result;
        }
    
        // 基本原则,每个队列只能被一个consumer消费
        // 当messageQueue个数小于等于consume的时候,排在前面(在list中的顺序)的consumer消费一个queue,index大于messageQueue之后的consumer消费不到queue,也就是为0
        // 当messageQueue个数大于consumer的时候,分两种情况
        //     当有余数(mod > 0)并且index < mod的时候,当前comsumer可以消费的队列个数是 mqAll.size() / cidAll.size() + 1
        //     可以整除或者index 大于余数的时候,队列数为:mqAll.size() / cidAll.size()
        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
                mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                        + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
    

    该负载平衡算法:
    就是把messageQueue放到一个队列中,consumer放到一个队列中,messageQueue依次分配给consumer,
    如果不够分配,则排在后面的consumer就不能消费messageQueue
    如果给consumer分配完一轮之后,messageQueue还有多余,那么messageQueue接着分配,consumer队列从头开始
    示意图如下:

    HA,同步双写,异步复制

    HAService,RocketMQ的高可用服务
    同步双写:在commitLog.putMessage中进行同步双写,将GroupCommitRequest放进GroupTransferService.requestWrite等待slave主动拉取,master超时等待同步双写完成。所以在写消息的时候是同步等待的,slave从master复制消息的时候是异步的

  • 相关阅读:
    Server 对象
    Response 对象
    bzoj 5252: [2018多省省队联测]林克卡特树
    bzoj 2167: 公交车站
    bzoj 5315: [Jsoi2018]防御网络
    bzoj 5319: [Jsoi2018]军训列队
    bzoj 4161: Shlw loves matrixI
    bzoj 4942: [Noi2017]整数
    bzoj 2648: SJY摆棋子
    kd-tree 小结
  • 原文地址:https://www.cnblogs.com/sunshine-2015/p/6298446.html
Copyright © 2011-2022 走看看