zoukankan      html  css  js  c++  java
  • RockerMQ源码分析——Broker消息发送流程

    org.apache.rocketmq.example.quickstart.Producer

    • 创建一个消息的生产者,且指定一个组
    • 设置namesrv地址,可以从此地址获取topic的队列信息
    • 启动生产者实例
    • 循环中创建消息对象,并指定topic、tag和消息体
    • 在循环中发送消息,采用默认的负载策略,
      • 调用org.apache.rocketmq.client.producer.DefaultMQProducer#send
        • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send
          • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl:
          • Message:发送消息
          • CommunicationMode:发送方式
          • SendCallback:异步消息发送回调函数
          • timeout:消息发送超时时间
          • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:获取topic的路由信息( Broker负载消息存储,一个topic可以利用负载均衡分布在多台broker上,每个broker包含多个Queue:每个QueueData包含BrokerName,读队列和写队列个数,权限?、同步或异步)
            • 先从本地缓存中 ConcurrentMap<String/* topic */, TopicPublishInfo>中尝试获取
              • ->org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:尝试从Nameserver中获取Topic路由信息,并更新本地缓存
                • 为了避免重复从 NameServer 获取配置信息,添加了锁
                • 从默认的Topic或者指定的Topic中获取配置信息(从Nameserver获取)
                • 获取到最新的Topic信息后,与本地缓存进行对比,有变化的话,需要同步更新消费者、生产者关于该Topic的缓存,更新前是先复制一份信息
            • 如果未找到路由信息,则从默认的Topic中寻找路由配置
          • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue:根据Topic路由负载算法选择一个消息队列进行消息发送
            • ->org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
              • 如果开启了消息延时规避
                • 首先对Topic所有队列进行验证,因为加入了发送异常延时,确保消息队列(MessageQueue)所在的Broker是正常的
                • 关于消息延时机制
              • 没有开启的话,就循环向下一个消息队列发送
          • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl:向MessageQueue消息发送
            • 通过Product与Broker的长连接将消息发送给Broker,然后Broker将消息存储,并返回生产者
          • ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem如果失败就更新下容错策略,主要用来规避发生故障的broker
          • 如果是同步调用方式(SYNC),则执行失败重试策略,默认重试两次
            主要分析的是RocketMQ 以同步方式发送消息的过程,异步模式与单向模式实现原理基本一样,异步只是增加了发送成功或失败的回掉方法。
  • 相关阅读:
    protobuf lib库的使用
    protobuf的下载、编译和使用
    使用python和pygame绘制繁花曲线
    经典方块游戏-俄罗斯方块
    经典方块游戏-贪吃蛇
    经典方块游戏-基础
    经典方块游戏一
    Python脚本管理
    SublimeText3设置显示空格及Tab显示为4个空格
    域名解析记录类型
  • 原文地址:https://www.cnblogs.com/fyusac/p/14581186.html
Copyright © 2011-2022 走看看