zoukankan      html  css  js  c++  java
  • Rocketmq消息持久化

    本文编写,参考:https://my.oschina.net/bieber/blog/725646

    producer Send()的Message最终将由broker处理,处理类为:SendMessageProcessor ,处理方法:processRequet.

    public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    private List<ConsumeMessageHook> consumeMessageHookList;

    public SendMessageProcessor(final BrokerController brokerController) {
    super(brokerController);
    }

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {}
    上述方法,并不是直接处理消息,而是交由MessageStore处理,相关代码如下:
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);
    //......
    PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

    然而MessageStore也不直接持久化消息,转交给 CommitLog
    long beginTime = this.getSystemClock().now();
    PutMessageResult result = this.commitLog.putMessages(messageExtBatch);

    从MappedFileQueue中取出最新的一条:
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    //写消息
    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
    //持久化到磁盘,最终通过FileChannel持久化到文件
    handleDiskFlush(result, putMessageResult, messageExtBatch);

    handleHA(result, putMessageResult, messageExtBatch);


    2.cousumer 从broker读消息。
    消费者从broker读取消息经由PullMessageProcessor类处理的,processRequest()方法处理请求:
    RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)

    经过一系列的判断处理,之后交由 MessageStore:
    final GetMessageResult getMessageResult =
    this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
    requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    读取消息。
    之后交由commitLog,读出消息,
    SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
    可以看到是先从ConsumerQueue中获取消息索引,然后再从commitlog中读取消息内容。这些内容也是在存储消息的时候写入的。
    相关也可参考:http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/



  • 相关阅读:
    白色情人节为你身边的程序猿献上一份礼物!
    《大话操作系统——做坚实的project实践派》(3)
    hdu 1085 Holding Bin-Laden Captive!(母函数)
    LeetCode228:Summary Ranges
    android adb端口被占用解决方法
    TortoiseSVN比较工具设置为BeyondCompare 4
    Kotlin Android学习入门
    Android Studio中 ADB WIFI插件进行无线调试实践
    如何离线安装chrome插件
    Androoid studio 2.3 AAPT err(Facade for 596378712): \?C:Users中文文件夹.androiduild-cache
  • 原文地址:https://www.cnblogs.com/itdev/p/7086322.html
Copyright © 2011-2022 走看看