org.apache.rocketmq.example.quickstart.Producer
- 创建一个消息的生产者,且指定一个组
- 设置namesrv地址,可以从此地址获取topic的队列信息
- 启动生产者实例
- 循环中创建消息对象,并指定topic、tag和消息体
- 在循环中发送消息,采用默认的负载策略,
- 调用org.apache.rocketmq.client.producer.DefaultMQProducer#send
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl:
- Message:发送消息
- CommunicationMode:发送方式
- SendCallback:异步消息发送回调函数
- timeout:消息发送超时时间
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:获取topic的路由信息( Broker负载消息存储,一个topic可以利用负载均衡分布在多台broker上,每个broker包含多个Queue:每个QueueData包含BrokerName,读队列和写队列个数,权限?、同步或异步)
- 先从本地缓存中 ConcurrentMap<String/* topic */, TopicPublishInfo>中尝试获取
- ->org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:尝试从Nameserver中获取Topic路由信息,并更新本地缓存
- 为了避免重复从 NameServer 获取配置信息,添加了锁
- 从默认的Topic或者指定的Topic中获取配置信息(从Nameserver获取)
- 获取到最新的Topic信息后,与本地缓存进行对比,有变化的话,需要同步更新消费者、生产者关于该Topic的缓存,更新前是先复制一份信息
- ->org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:尝试从Nameserver中获取Topic路由信息,并更新本地缓存
- 如果未找到路由信息,则从默认的Topic中寻找路由配置
- 先从本地缓存中 ConcurrentMap<String/* topic */, TopicPublishInfo>中尝试获取
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue:根据Topic路由负载算法选择一个消息队列进行消息发送
- ->org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
- 如果开启了消息延时规避
- 首先对Topic所有队列进行验证,因为加入了发送异常延时,确保消息队列(MessageQueue)所在的Broker是正常的
- 关于消息延时机制
- 没有开启的话,就循环向下一个消息队列发送
- 如果开启了消息延时规避
- ->org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl:向MessageQueue消息发送
- 通过Product与Broker的长连接将消息发送给Broker,然后Broker将消息存储,并返回生产者
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem如果失败就更新下容错策略,主要用来规避发生故障的broker
- 如果是同步调用方式(SYNC),则执行失败重试策略,默认重试两次
主要分析的是RocketMQ 以同步方式发送消息的过程,异步模式与单向模式实现原理基本一样,异步只是增加了发送成功或失败的回掉方法。
- ->org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send
- 调用org.apache.rocketmq.client.producer.DefaultMQProducer#send