zoukankan      html  css  js  c++  java
  • rocketmq源码分析2-broker的消息接收

    broker消息接收,假设接收的是一个普通消息(即没有事务),此处分析也只分析master上动作逻辑,不涉及ha。

    1. 如何找到消息接收处理入口

    可以通过broker的监听端口10911顺藤摸瓜式的找到 NettyClientConfig.setListenPort-->BrokerStartup-->BrokerController-->NettyRemotingServer
    com.alibaba.rocketmq.remoting.netty.NettyDecoder com.alibaba.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler.channelRead0(ChannelHandlerContext, RemotingCommand)

    也可以顺着broker的启动脚本找到
    BrokerStartup-->BrokerController-->NettyRemotingServer

    因为rocketmq连接层默认使用netty开发,如果熟悉netty的话,
    可以直接查找ChannelInitializer或者pipeline().addLast等

    2. 调试NettyDecoder

    com.alibaba.rocketmq.remoting.netty.NettyDecoder.decode(ChannelHandlerContext, ByteBuf)
    会调用RemotingCommand.decode(byteBuffer)组装RemotingCommand
    在com.alibaba.rocketmq.remoting.protocol.RemotingCommand.decode(ByteBuffer) 中断点 排除如下code

    1 cmd.getCode() != 0 && cmd.getCode() != 34 && cmd.getCode() != 15 
    2 && cmd.getCode() != 38 && cmd.getCode() != 11

    用producer的client发一条消息到broker
    发现RemotingCommand.decode解析出来的RemotingCommand的code为310 310对应RequestCode.SEND_MESSAGE_V2 发送消息

    此时处理线程是:Thread [NettyServerWorkerThread_2]
    RemotingCommand主要描述了操作code、opaque号、body即消息体。

    3. 调试NettyServerHandler

    对com.alibaba.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler.channelRead0(ChannelHandlerContext, RemotingCommand)断点
    断点条件属性是msg.getCode() == 310
    此时处理线程是:Thread [NettyServerWorkerThread_2]

    4. NettyRemotingAbstract

    com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(ChannelHandlerContext, RemotingCommand)
    方法负责分发给相应的NettyRequestProcessor实例
    负责处理的是com.alibaba.rocketmq.broker.processor.SendMessageProcessor@17fb0278
    处理线程是:Thread [SendMessageThread_1],跟上面的处理已经不在一个线程上了,异步

    5. SendMessageProcessor

    SendMessageProcessor.processRequest(ChannelHandlerContext, RemotingCommand)介入处理
    构建SendMessageRequestHeader
    进入SendMessageProcessor.sendMessage(ChannelHandlerContext, RemotingCommand, SendMessageContext, SendMessageRequestHeader)
    int queueIdInt = requestHeader.getQueueId(); 作用是什么 什么时候确定的id
    构建MessageExtBrokerInner实例 tags转tagsCode 就是tags的String的hashcode

    PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

    交给MessageStore进行put(store,进行存放)

    6. DefaultMessageStore

    com.alibaba.rocketmq.store.DefaultMessageStore.putMessage(MessageExtBrokerInner) 消息topic的长度校验 不能超过127
    消息properties转成字符串后,长度不能超过32767
    下面交个CommitLog进行putMessage

    7. CommitLog

    com.alibaba.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner)
    设置存储时间
    设置消息体CRC校验值
    交给MapedFile进行存储mapedFile.appendMessage(msg, this.appendMessageCallback)

    8. MapedFile

    mapedFile.appendMessage(msg, this.appendMessageCallback)
    获取当前文件已经写到什么位置了
    文件已经做了map map到mappedByteBuffer 讲当前的mappedByteBuffer 割出来并设置position为当前写位置
    交由callback做append
    callback是CommitLog$DefaultAppendMessageCallback
    传给callback的参数有

    • 1. 这个文件的offset,意思是:一个broker要存储很多消息,那么一个文件肯定不够存,当存到第二个文件的时候,这个文件里的第一条消息相对于整个broker中的消息有个offset,此值是文件名。
    • 2. 从store文件映射的buffer中割出来的byteBuffer
    • 3. 文件的剩余空间
    • 4. 消息体

    8.1 callback中append逻辑:

    计算wroteOffset 即整个broker中的offset, 代码注释中称之为物理offset,计算逻辑也很简单即上面参数1+这条消息在这个文件中的position
    计算消息id,逻辑使用主机物理IP加上一步的wroteOffset计算,用了ByteBuffer处理,比较高效,待完善验证用例。
    获取这个topic的这个队列的offset(一个topic写多个队列),这个offset有啥用?估计是用于快速查找
    将消息bean对象用msgStoreItemMemory进行以约定消息体的形式进行byte的put
    将msgStoreItemMemory put到上面传进来的参数2的buffer 即写入磁盘
    构建append结果,没啥逻辑,主要是一些信息 AppendMessageResult
    将消息在这个队列中的offset加1
    更新MapedFile的wrotePostion,就是将原来的wrotePostion加上这次写入的字节数
    此时订阅断已经收到消息,(应该更早,待确认,待分析消息如何送到consume的)

    8.2 回CommitLog--:

    构建 PutMessageResult对象。

    8.3 处理同异步刷盘逻辑

    同步刷盘 用GroupCommitService, 其中使用了多条消息一起刷的设计,并且设计了刷盘如果超时的异常场景的反馈
    异步刷盘 用CommitLog$FlushRealTimeService(是一个线程,此时会叫醒他)

    8.4 处理主从双机同异步同步逻辑

    同步形式同步从节点 交由HAService 实时同步,并等待同步结果。
    异步不用管。

    9. 收集统计数据

    耗时等等

    10. 对非oneway的消息做response处理

  • 相关阅读:
    关于celery踩坑
    关于git的分批提交pull requests流程
    SymGAN—Exploiting Images for Video Recognition: Heterogeneous Feature Augmentation via Symmetric Adversarial Learning学习笔记
    AFN—Larger Norm More Transferable: An Adaptive Feature Norm Approach for Unsupervised Domain Adaptation学习笔记
    Learning to Transfer Examples for Partial Domain Adaptation学习笔记
    Partial Adversarial Domain Adaptation学习笔记
    Partial Transfer Learning with Selective Adversarial Networks学习笔记
    Importance Weighted Adversarial Nets for Partial Domain Adaptation学习笔记
    Exploiting Images for Video Recognition with Hierarchical Generative Adversarial Networks学习笔记
    improved open set domain adaptation with backpropagation 学习笔记
  • 原文地址:https://www.cnblogs.com/simoncook/p/6368398.html
Copyright © 2011-2022 走看看