一、引言
本文只站在一个java程序员角度上,去了解Rocketmq和具体使用,不讲搭建,这是运维人的事情。
二、介绍
(1)RocketMQ 不遵循任何规范,但是参考了各种规范不同类产品的设计思想。
(2)RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:
- 具有高性能、高可靠、高实时、分布式特点;
- RocketMQ 的Producer、Consumer、队列都可以分布式;
- 能够保证严格的消息顺序;
- 提供丰富的消息拉取模式;
- 高效的订阅者水平扩展能力;
- 实时的消息订阅机制;
- 亿级消息堆积能力;
- 较少的依赖
(3)选用理由:
- 强调集群无单点,可扩展,任意一点高可用,水平可扩展。
- 海量消息堆积能力,消息堆积后,写入低延迟。
- 支持上万个队列
- 消息失败重试机制
- 消息可查询
- 成熟度(经过双十一考验)
三、Rocketmq关键概念
1、主题与标签
-
主题Tpoic:第一级消息类型,用来标识一类消息 , (书的标题)
-
标签Tags:第二级消息类型,区分一个 Topic 下的多种消息 , 可以基于Tag做消息过滤 , (书的目录)
-------举例-------
主题: 订单交易
签: 订单交易-创建 、订单交易-付款、 订单交易-完成
2、发送与订阅群组
-
消息生产者Producer : 消息生产者 , 负责生产消息,一般由业务系统负责产生消息。
-
消息消费者Consumer :消息消费者,负责消费消息,一般是后台系统负责异步消费。
分类:
(1) Pull Consumer:消息消费者应用主动调用 Consumer 的拉消息方法从 Broker拉消息,主动权由消息消费者应用控制。
(2) Push Consumer:消息消费者应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。
-
生产组Producer Group:一类 Producer 的集合的名称, 消息的发送。
可以通过运维工具查询返个发送消息应用下有几个 Producer 实例
发送分布式事务消息时,如果 Producer 中途意外宕机,Broker 会主动回调 Producer Group 内的任意一台机器来确认事务状态。
-
消费组Consumer Group:一类 Consumer 的集合的名称, 消息的订阅处理。
一个 Consumer Group 下包含多个 Consumer 实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个 Consumer 对象。
3、Broker与NameServer
NameServer:在系统中是做命名服务,更新和发现 broker 消息服务。
Broker-Master:broker 消息主机服务器。
Broker-Slave:broker 消息从机服务器。
关联关系:
- Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。
- 每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server
- Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并想提供 Topic 服务的 Master 建立长连接,且定时向 Master发収送心跳。Producer 完全无状态。
- Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
4、广播消费与集群消费
- 广播消费: 一条消息被多个 Consumer 消费。
- 集群消费 : 一个 Consumer Group 中的 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个 Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。
5、消息队列
-
消息队列Message Queue : 在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构。
所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100 年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。
也可以认为 Message Queue 是一个长度无限的数组,offset 就是下标。
6、集群方式
这里的Slave都不可写,但可读,类似于 Mysql 主备方式。
-
单个 Master
(1)定义:只有一个Master
(2)缺:这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。
-
多 Master 模式
(1)定义:一个集群无 Slave,全是 Master
(2)优点:配置简单,单个Master 宕机或重启维护对应用无影响。性能最高。异步刷盘丢失少量消息,同步刷盘一条不丢 。
(3)缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订 阅,消息实时性会受到受到影响。
(4)启动顺序: 先启动 NameServer,再在机器 A启动第一个 Master 再在机器 B启动第二个 Master。
-
多 Master 多 Slave 模式,异步复制
(1)定义: 每个 Master 配置一个 Slave,有多对Master-Slave,HA (高可用性集群)异步复制方式,主备有短暂消息延迟,毫秒级。
(2)优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
(3)缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。
(4)启动顺序: 先启动 NameServer ,再在机器 A启动第一个 Master,再在机器 B启动第二个 Master,再在机器 C启动第一个 Slave,再在机器 D启动第二个 Slave。
-
多 Master 多 Slave 模式,同步双写
(1)定义:每个 Master 配置一个 Slave,有多对Master-Slave,HA(高可用性集群) 采用同步双写方式,主备都写成功,向应用返回成功。
(2)优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。
(3 缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 响应时间会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能 。
(4)启动顺序:先启动 NameServer,再在机器 A启动第一个 Master,再在机器 B启动第二个 Master ,再在机器 C启动第一个 Slave ,再在机器 D启动第二个 Slave。
7、顺序消息
消费消息的顺序要同发送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送,且发送到同一个队列,返样 Consumer 就可以按照 Producer 发送 的顺序去消费消息。分类:
(1) 普通顺序消息: 正常情况下 RocketMQ 可以保证完全的顺序消息,但是一旦发生通信异常,Broker 重启,由于队列总数发生发化,哈希取模后定位的队列会发化,产生短暂的消息顺序不一致。
如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
(2)严格顺序消息: 无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover(故障转移)特性,即 Broker 集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。
如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主来避免,不过仍然会存在几分钟的服务不可用。
目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。
8、数据存储结构
commit Log: 操作日志,也可以理解为一个存数据的地方
消息队列服务:存储所有消息。
消息索引服务:存储 offset和消息的匹配表。
事务状态服务:存储 每条消息的状态
定时消息服务:管理 需要定时投递的消息
四、所有消息中间件 涉及的业务问题(随便看看)
这里内容很多,但是我希望大家尽量逼着自己读完,有个印象。非常重点的内容这节后面会再次叙述。并且用代码实际应用出来。
1、Publish/Subscribe 发布订阅
发布订阅是消息中间件的最基本功能,也是相对与传统 RPC 通信而言。这里不再叙述。
2、Message Priority 消息优先级
在一个消息队列中,每条消息都有不同的优先级,一般用整数来描述,优先级高的消息先投递,如果消息完全在一个内存队列中,那么在投递前可以按照优先级排序,令优先级高的先投递。
由于 RocketMQ 所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大,因此 RocketMQ 没有特意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级的队列, 将不同优先级发送到不同队列即可。
对于优先级问题,可以归纳为 2 类 :
(1) 只要达到优先级目的即可,不是严格意义上的优先级,通常将优先级划分为高、中、低,戒者再多几个级别。每个优先级可以用不同的 topic 表示,发消息时,指定不同的 topic 来表示优先级,这种方式可以解决绝大部分的优先级问题,但是对业务的优先级精确性做了妥协。
(2) 严格的优先级,优先级用整数表示,例如 0 ~ 65535,这种优先级问题一般使用不同 topic 解决就非常不合适。
如果要让 MQ 解决此问题,会对 MQ 的性能造成非常大的影响。
3、Message Order 消息有序
一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建,订单付款,订单完成。消费时,要按照返个顺序消费才能有意义。但是同时订单之间是可以并行消费的。 RocketMQ 可以严格的保证消息有序 。
4、Message Filter 消息过滤
-
Broker 端消息过滤:在 Broker 中,按照 Consumer 的要求做过滤。
优点:减少了对于 Consumer 无用消息的网络传输。
缺点:增加了 Broker 的负担,实现相对复杂。(1) 淘宝 Notify 支持多种过滤方式,包含直接按照消息类型过滤,灵活的语法表达式过滤,几乎可以满足最苛刻的过滤需求。
(2) 淘宝 RocketMQ 支持按照简单的 Message Tag 过滤,也支持按照 Message Header、body 进行过滤。
(3) CORBA Notification 规范中也支持灵活的诧法表达式过滤。
- Consumer 端消息过滤:消费者应用自己过滤。
优点:过滤方式可由消费者应用完全自定义实现,
缺点:很多无用的消息要传输到 Consumer 端
5、Message Persistence 消息持久化
消息中间件通常采用的几种持久化方式:
(1) 持久化到数据库,例如 Mysql。
(2) 持久化到 KV 存储,例如 levelDB、伯克利 DB 等 KV 存储系统。
(3) 文件记录形式持久化,例如 RocketMQ, Kafka。
(4) 对内存数据做一个持久化镜像,例如 beanstalkd,VisiNotify 。
(1)、(2)、(3)三种持久化方式都具有将内存队列 Buffer(缓存) 进行扩展的能力,(4)只是一个内存的镜像,作用是当 Broker 挂掉重启后仍然能将之前内存的数据恢复出来。
JMS(Java消息服务应用程序接口) 和 CORBA Notification 规范没有明确说明如何持久化,但是持久化部分的性能直接决定了整个消息中间件的性能。
RocketMQ 参考了 Kafka 的持久化方式,充分利用 Linux 文件系统内存 cache 来提高性能。
6、Message Reliablity 消息可靠性
影响消息可靠性的几种情框:
(1) Broker 正常关闭
(2) Broker 异常 Crash
(3) OS Crash
(4) 机器掉电,但是能立即恢复供电情框。
(5) 机器无法开机(可能是 cpu、主板、内存等关键设备损坏)
(6) 磁盘设备损坏。
(1)、(2)、(3)、(4)四种情框都属于硬件资源可立即恢复情况。RocketMQ 在返四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
(5)、(6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ 在这两种情框下,通过异步复制,可保证 99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免丢失, 同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与 Money 相关的应用。
7、Low Latency Messaging 低延迟消息传递
在消息不堆积情框下,消息到达 Broker 后,能立刻到达 Consumer。 RocketMQ 如果使用长轮询 Pull 方式,可保证消息非常实时,消息实时性不低 Push。
8、At least Once 至少回一次
生产者消息投递后,如果未能收到ack,则再次投递。
消费者先pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,RocketMQ可以很好的支持此特性。
9、Exactly Only Once 不重复
(1) 发送消息阶段,不允许发送重复的消息。
(2) 消费消息阶段,不允许消费重复的消息。
只有以上两个条件都满足情框下,才能人为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以 RocketMQ 为了追求高性能,并不保证此特性,要求在业务上进行去重, 也就是说消费消息要做到幂等性。
RocketMQ 虽然不能严格保证不重复,但是正常情框下很少会出现重复发送、消费情况,只有网络异常,Consumer 启停等异常情况下会出现消息重复。 此问题的本质原因是网络调用存在不确定性,即既不成功也不失败的第三种状态,所以才产生了消息重复性问题。
10、Broker 的 Buffer 满了怎么办?
Broker 的 Buffer 通常指的是 Broker 中一个队列的内存 Buffer 大小,这类 Buffer 通常大小有限,如果 Buffer 满 了以后怎么办?
-
CORBA Notification 规范中处理方式:
(1)拒绝新来的消息,向 Producer 返回 RejectNewEvents 错误码。
(2)按照特定策略丢弃已有消息 -
RocketMQ 没有内存 Buffer 概念,RocketMQ 的队列都是持久化磁盘,数据定期清除。 对于此问题的解决思路,RocketMQ 同其他 MQ 有非常显著的区别,RocketMQ 的内存 Buffer 抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker 会定期删除过期的数据,例如 Broker 只保存 3 天的消,那么这个 Buffer 虽然长度无限,但是 3 天前的数据会被从队尾删除。
11、回溯消费
回溯消费:Consumer 已经消费成功的消息,由于业务上需求需要重新消费。
为了支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度。
例如由于 Consumer 系统故障, 恢复后需要重新消费 1 小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。
RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。
12、消息堆积
(1) 消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力。
(2)消息堆积分以下两种情况:
-
消息堆积在内存 Buffer,一旦超过内存 Buffer,可以根据一定的丢弃策略来丢弃消息,如 CORBA Notification 规范中描述。适合能容忍丢弃消息的业务,这种情
况消息的堆积能力主要在于内存 Buffer 大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。 -
消息堆积到持久化存储系统中,例如 DB,KV 存储,文件记录形式。 当消息不能在内存 Cache 命中时,就不可避免的访问磁盘,会产生大量读 IO,读 IO 的吞吐量直接决定了消息堆积后的访问能力。
评估消息堆积能力主要有以下四点:
(1) 消息能堆积多少条,多少字节?即消息的堆积容量。
(2) 消息堆积后,发消息的吞吐量大小,是否会受堆积影响?
(3) 消息堆积后,正常消费的 Consumer 是否会受影响?
(4) 消息堆积后,访问堆积在磁盘的消息时,吞吐量有多大?
13、分布式事务
(1)已知的几个分布式事务规范,如 XA,JTA 等。
(2)分布式事务涉及到两阶段提交问题,在数据存储方面的方面必然需要 KV 存储的支持,因为第二阶段的提交回滚需要修改消息状态,一定涉及到根据 Key 去查找 Message 的动作。
(3)RocketMQ 在第二阶段绕过了根据 Key 去查找 Message 的问题,采用第一阶段发送 Prepared 消息时,拿到了消息的 Offset,第二阶段通过 Offset 去访问消息, 并修改状态,Offset 就是数据的地址。
(4)RocketMQ 返种实现事务方式,没有通过 KV 存储做,而是通过 Offset 方式,存在一个显著缺陷,即通过 Offset 更改数据,会令系统的脏页过多,需要特别关注。
14、定时消息
(1)定时消息是指消息收到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。 允许消息生产者指定消息进行定时(延时)投递,最长支持 40 天。
(2)如果要支持任意的时间精度,在 Broker 局面,必须要做消息排序,如果再涉及到持丽化,那举消息排序要不可避免的产生巨大性能开销。
(3)RocketMQ 支持定时消息,但是不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等。
15、消息重试
(1)Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。
(2)Consumer 消费消息失败通常有以下几种情况:
-
由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。
返种错误通常需要跳过这条消息,再消费其他消息。因为这条失败的消息即使立刻重试消费,99%也不成功, 所以最好提供一种定时重试机制,即过 10s 秒后再重试。
-
由于消费者应用服务不可用,例如 db 连接不可用,外系统网络不可达等。
遇到返种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,返样可以减轻 Broker 重试消息的压力。
四、具体代码使用demo
(1)新建两个springboot项目,分别叫做producer和consumer,这是一个非常简单的hello初体验项目,先运行consumer项目,再运行producer项目。
(2)修改producer的application.java
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MyproducerApplication.class, args);
//1.创建一个生产者,需要指定Producer的分组,
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Group-Producer-1");
//2.设置命名服务的地址,默认是去读取conf文件下的配置文件 rocketmq.namesrv.addr
defaultMQProducer.setNamesrvAddr("200.200.*.*:端口号");
try{
//3.启动生产者
defaultMQProducer.start();
//循环发十条消息
for(int i=0;i<10;i++) {
String text = "this is my hello content "+i;
//4.最基本的生产模式 topic+文本信息
Message msg = new Message("topic_hello", "Tag-"+i, text.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置代表消息的业务关键属性,请尽可能全局唯一
// 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发
msg.setKey("mykey");
//5.发送无序同步消息,发一个就立即获取发送响应,然后继续发
SendResult sendResult = defaultMQProducer.send(msg);
System.out.println("发送结果为:" + JSON.toJSONString(sendResult));
}
}catch (Exception e){
e.printStackTrace();
}finally {
//6.释放生产者
defaultMQProducer.shutdown();
System.out.println("生产者释放了");
}
}
}
SendResult sendResult值如下:
↓ 里面包含的内容包括 broker信息、消息队列的信息、发送结果信息等
{
"messageQueue": {
"brokerName": "broker-16",
"queueId": 1,
"topic": "topic_orderCreate"
},
"msgId": "C8C8060A1C9018B4AAC27C008DB90000",
"offsetMsgId": "C8C8061000002A9F000000CBD7063EFA",
"queueOffset": 3,
"regionId": "DefaultRegion",
"sendStatus": "SEND_OK",
"traceOn": true
}
(3)修改consumerr的application.java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.List;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
//这里是push消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group-Consumer-1");
consumer.setNamesrvAddr("200.200.6.16:9876");
//CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
//CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
//CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
//第二个参数表示消费匹配的tag * 表示topic所有的tag
// Tag1 || Tag2 || Tag3 表示订阅 Tag1 或 Tag2 或 Tag3 的消息
consumer.subscribe("topic_hello", "*");
//2. 注册消费者监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* msgs 表示消息体
* @param msgs
* @param context
* @return
*/
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
try {
System.out.println( new String(messageExt.getBody(), "UTF-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
//返回消费状态
//CONSUME_SUCCESS 消费成功
//RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//3.consumer 启动
consumer.start();
System.out.println("消费端起来了哈.........");
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
收到的某一条消息举例如下,类型为MessageExt:
↓ 里面包含的内容有消息信息、队列信息等
messageExt=[
queueId = 0
storeSize = 333
queueOffset = 650
sysFlag = 0
bornTimestamp = 1556174674734
bornHost = {InetSocketAddress@4406} "/生产者host:54192"
storeTimestamp = 1556174720581
storeHost = {InetSocketAddress@4407} "/mqhost:10911"
msgId = "C8C8061000002A9F000000CBD769DB5C"
commitLogOffset = 875492399964
bodyCRC = 1420010275
reconsumeTimes = 2
preparedTransactionOffset = 0
topic = "topic_hello"
flag = 0
properties = {HashMap@4410} size = 11
body = {byte[26]@4412}
]
运行结果:
(1)producer项目结果如下:
- 注意看图片,queueId总共是0~3,而且一直是按顺序循环交替存储,由此可得默认有四条消息队列提供存储消息,顺序循环交替存储,具体存在队列的哪个位置是随机的
- 会自动给每条消息一个msgId
- offsetMsgId就是这个消息队列当前的游标位置
(2)consumer项目结果如下:
注意看图片,顺序是6879,所以说明不是按照顺序的
五、发送的消息分类
1、普通消息
普通消息也叫做无序消息,简单来说就是没有顺序的消息,producer 只管发送消息,consumer 只管接收消息,至于消息和消息之间的顺序并没有保证,可能先发送的消息先消费,也可能先发送的消息后消费。
分类:可靠同步消息,可靠异步消息,单向发送。
1.1可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。
应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
上面第四节的demo例子就是普通消息,同步发送消息。
1.2可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 如果发送方通过回调接口接收到了服务器响应,就对响应结果进行处理。
consumer类不变,改producer类为下面:
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MyproducerApplication {
public static void main(String[] args) {
SpringApplication.run(MyproducerApplication.class, args);
//1.创建一个生产者,需要指定Producer的分组,
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Group-Producer-1");
//2.设置命名服务的地址,默认是去读取conf文件下的配置文件 rocketmq.namesrv.addr
defaultMQProducer.setNamesrvAddr("200.200.6.16:9876");
try{
//3.启动生产者
defaultMQProducer.start();
for(int i=0;i<100;i++) {
String text = "this is my hello content "+i;
//4.最基本的生产模式 topic+文本信息
Message msg = new Message("topic_hello", "Tag-"+i, text.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息, 发送结果通过 callback 返回给客户端。
defaultMQProducer.send(msg,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送结果为:" + JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable throwable) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
throwable.printStackTrace();
}
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
//6.释放生产者
defaultMQProducer.shutdown();
System.out.println("生产者释放了");
}
}
}
1.3单向(Oneway)发送
发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发
consumer类不变,改producer类为下面:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MyproducerApplication {
public static void main(String[] args) {
SpringApplication.run(MyproducerApplication.class, args);
//1.创建一个生产者,需要指定Producer的分组,
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Group-Producer-1");
//2.设置命名服务的地址,默认是去读取conf文件下的配置文件 rocketmq.namesrv.addr
defaultMQProducer.setNamesrvAddr("200.200.*.*:端口号");
try{
//3.启动生产者
defaultMQProducer.start();
for(int i=0;i<100;i++) {
String text = "this is my hello content "+i;
//4.最基本的生产模式 topic+文本信息
Message msg = new Message("topic_hello", "Tag-"+i, text.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
defaultMQProducer.sendOneway(msg);
}
}catch (Exception e){
e.printStackTrace();
}finally {
//6.释放生产者
defaultMQProducer.shutdown();
System.out.println("生产者释放了");
}
}
}
2、有序消息
有序消息就是按照一定的先后顺序的消息类型。也就是说生成者按什么顺序发送,消费者就按什么顺序消费。分类方法一:全局有序消息、局部有序消息;
2.1全局有序消息
所有消息都存在一个队列里,那肯定就先进先出了。但是效率太低。很少用到,我就不举例了。
2.2局部有序消息
比如一个订单的顺序必须是订单创建、订单付款、订单完成,但是可以多个订单同时进行,所以就同个orderid的顺序放同一个队列。效率高很多。
consumer类不变,改producer类为下面:
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.List;
@SpringBootApplication
public class MyproducerApplication {
public static void main(String[] args) {
SpringApplication.run(MyproducerApplication.class, args);
//1.创建一个生产者,需要指定Producer的分组,
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Group-Producer-1");
//2.设置命名服务的地址,默认是去读取conf文件下的配置文件 rocketmq.namesrv.addr
defaultMQProducer.setNamesrvAddr("200.200.6.16:9876");
try{
//3.启动生产者
defaultMQProducer.start();
int orderId =0;
for(int i=0;i<99;i++) {
//这里的意思是每三条的orderId是一样的
if(i%3==0){
orderId++;
}
String text = "the orderId is order"+orderId;
//每三条的后缀分别是create,pay,finish
if(i%3==0){
text+="-create";
}else if(i%3==1){
text+="-pay";
}else if(i%3==2){
text+="-finish";
}
//4.最基本的生产模式 topic+文本信息
Message msg = new Message("topic_hello", "Tag-"+i, text.getBytes(RemotingHelper.DEFAULT_CHARSET));
//5.获取发送响应
SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() {
// 选择发送消息的队列
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// arg的值其实就是orderId
Integer id = (Integer) arg;
// mqs是队列集合,也就是topic所对应的所有队列
int index = id % mqs.size();
// 这里根据前面的id对队列集合大小求余来返回所对应的队列
return mqs.get(index);
}
}, orderId);
System.out.println("发送结果为:" + JSON.toJSONString(sendResult));
}
}catch (Exception e){
e.printStackTrace();
}finally {
//6.释放生产者
defaultMQProducer.shutdown();
System.out.println("生产者释放了");
}
}
}
消费者结果如下:
注意看图,每个order都是先create再pay再finish,虽然可能各个order交替消费,比如order4和order5
3、延时消息和定时消息
3.1延时消息
延时消息,简单来说就是当 producer 将消息发送到 broker 后,会延时一定时间后才投递给 consumer 进行消费。RcoketMQ的延时等级为:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延时。level=1,表示 1 级延时,对应延时 1s。level=2 表示 2 级延时,对应5s,以此类推。这种消息一般适用于消息生产和消费之间有时间窗口要求的场景。比如说我们网购时,下单之后是有一个支付时间,超过这个时间未支付,系统就应该自动关闭该笔订单。那么在订单创建的时候就会就需要发送一条延时消息(延时15分钟)后投递给 consumer,consumer 接收消息后再对订单的支付状态进行判断是否关闭订单。设置延时非常简单,只需要在Message设置对应的延时级别即可:
consumer类不变,改producer类的发送部分为下面:
Message msg = new Message("topic_hello", "Tag",
text.getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(1);//1代表等级,不是1秒
SendResult sendResult = defaultMQProducer.send(msg);
3.2定时消息
定时消息可以做到在指定时间戳之后才可被消费者消费,适用于对消息生产和消费有时间窗口要求,或者利用消息出发定时任务的场景。
代码示例:
java apache的rocketmq不支持,但是人家阿里云rocketmq可以支持。
3.3区别和注意事项
定时消息和延时消息的使用在代码编写上存在略微的区别:
- 发送定时消息需要明确指定消息发送时间点之后的某一时间点作为消息投递的时间点。
- 发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
注意事项:
- 定时和延时消息的 msg.setStartDeliverTime 参数需要设置成当前时间戳之后的某个时刻(单位毫秒)。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
- 定时和延时消息的 msg.setStartDeliverTime 参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败。
- StartDeliverTime 是服务端开始向消费端投递的时间。 如果消费者当前有消息堆积,那么定时和延时消息会排在堆积消息后面,将不能严格按照配置的时间进行投递。
- 由于客户端和服务端可能存在时间差,消息的实际投递时间与客户端设置的投递时间之间可能存在偏差。
- 设置定时和延时消息的投递时间后,依然受 3 天的消息保存时长限制。例如,设置定时消息 5 天后才能被消费,如果第 5 天后一直没被消费,那么这条消息将在第8天被删除。
- 除 Java 语言支持延时消息外,其他语言都不支持延时消息。
五、push和pull消费者
1、push消费
Push Consumer:消息消费者应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。
push-优点:及时性、服务端统一处理实现方便
push-缺点:容易造成堆积、负载性能不可控
上面所有例子都是push消费,我就再不举例了
2、pull消费
Pull Consumer:消息消费者应用主动调用 Consumer 的拉消息方法从 Broker拉消息,主动权由消息消费者应用控制。
pull-优点:获得消息状态方便、负载均衡性能可控
pull-缺点:及时性差
举例:
package com.rocket.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("Group-Consumer-1");
//默认CLUSTERING,所以不写也可以
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr("200.200.6.16:9876");
try {
consumer.start();
System.out.println("消费端起来了哈.........");
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("topic_hello");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
//开始拉取,offset是指多个队列指向哪一个的游标;初始队列offerset为0
PullResultExt pullResult = (PullResultExt) consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
//获取下一个队列offset
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
//开始拿消息
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static final Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offsetTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offsetTable.get(mq);
if (offset != null)
return offset;
return 0;
}
}
六、消费者的消息过滤
看到这里你肯定知道消息过滤是什么啦,就是消费者依靠那个topic和tages来过滤消息,完全依赖consumer的subscribe(String topic, String tags, byte[] body)这个方法
举例如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group-Consumer-1");
//第二个参数表示消费匹配的tag * 表示topic所有的tag
// Tag1 || Tag2 || Tag3 表示订阅 Tag1 或 Tag2 或 Tag3 的消息
// Tag1 表示订阅Tag1的消息
consumer.subscribe("topic_hello", "*");
七、消费者的广播消费与集群消费
消费者订阅消息的方式分广播消费与集群消费,前面三、4节里面有讲过广播消费与集群消费,
1、广播消费
(1)广播消费模式:消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
(2)注意事项:
- 广播消费模式下不支持顺序消息。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
- 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
- 广播模式下,客户端第一次启动时默认从最新消息消费。
- 客户端的消费进度是被持久化在客户端本地的隐藏文件中,因此不建议删除该隐藏文件,否则会丢失部分消息。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。目前仅 Java 客户端支持广播模式。
- 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
(3)代码举例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group-Consumer-1");
consumer.setMessageModel(MessageModel.BROADCASTING);
2、集群消费
(1)集群消费:消息队列 RocketMQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。
(2)注意事项:
由于消费进度在服务端维护,可靠性更高。
集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
(3)代码举例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group-Consumer-1");
//默认CLUSTERING,所以不写也可以
consumer.setMessageModel(MessageModel.CLUSTERING);
八、rocketmq使用中指标限制
消息队列 RocketMQ 对某些具体指标进行了约束和规范,使用时注意不要超过相应的限制值,以免程序出现异常。具体的限制项和限制值请参见下表:
九、真正项目开发中的规范化使用方法
前面所有例子都是为了了解和认识,但是真正项目是要很规范使用的,我们来一步一步看。目前现在很多还不是springboot项目,所以下面例子配置文件我都写到xml里面,如果你们是spingboot项目,就自己改成对应的配置
1、生产者
(1)ProducerUtil工具类:这里面包含了初始化生产者、关闭生产者、常用发送方法(顺序异步发送、顺序超时异步发送、乱序超时异步发送,乱序发送)
package com.test.service.impl.mq;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 生产者工具
* 此生产者只是包含了四种常用发送方法,其他自己加
*/
public class ProducerUtil {
private static final Logger logger = LoggerFactory.getLogger(ProducerUtil.class);
//默认生产者
private DefaultMQProducer defaultMQProducer;
//namesrv 命名服务地址
private String namesrvAddr;
// 分组名称
private String producerGroupName;
// 实例名称
private String instanceName;
/**
* 初始化 生产者相关参数
* @throws MQClientException
*/
public void init() throws MQClientException {
// 参数信息
//logger.info("DefaultMQProducer initialize!");
logger.info("producerGroupName=" + producerGroupName + " &namesrvAddr=" + namesrvAddr + " &instanceName=" + instanceName);
// 初始化 设置相关参数
defaultMQProducer = new DefaultMQProducer(producerGroupName);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
defaultMQProducer.setInstanceName(instanceName);
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(10);
defaultMQProducer.setRetryTimesWhenSendFailed(10);
defaultMQProducer.setRetryAnotherBrokerWhenNotStoreOK(true);
defaultMQProducer.setSendMsgTimeout(5000);
defaultMQProducer.start();
logger.info("[DefaultMQProudcer 生产者初始化成功!]");
}
/**
* 关闭生产者
*/
public void destroy() {
defaultMQProducer.shutdown();
logger.info("DefaultMQProudcer has stop success!");
}
/**
* 顺序发送
* @param msg
* @param selector
* @param arg
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
return defaultMQProducer.send(msg, selector, arg);
}
/**
* 顺序超时发送
* @param msg
* @param selector
* @param arg
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducer.send(msg, selector, arg, timeout);
}
/**
* 超时发送
* @param msg
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult send(Message msg, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducer.send(msg, timeout);
}
/**
* 发送消息
* @param msg
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult send(Message msg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducer.send(msg);
}
/***get and set***/
public DefaultMQProducer getDefaultMQProducer() {
return defaultMQProducer;
}
public void setDefaultMQProducer(DefaultMQProducer defaultMQProducer) {
this.defaultMQProducer = defaultMQProducer;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getProducerGroupName() {
return producerGroupName;
}
public void setProducerGroupName(String producerGroupName) {
this.producerGroupName = producerGroupName;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
}
(2)我们在使用这个producerUtil类时,要先初始化好一些配置,所以创建一个rocketmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd">
<context:property-placeholder location="classpath:rockertmq-producer.properties"/>
<!-- 生产者工具 -->
<bean id="producerUtil" class="com.test.service.impl.mq.ProducerUtil"
init-method="init"
destroy-method="destroy"
scope="singleton">
<property name="producerGroupName" value="#{producerGroupName}" />
<property name="namesrvAddr" value="#{namesrvAddr}" />
<property name="instanceName" value="#{instanceName}" />
</bean>
</beans>
(3)rockertmq-producer.properties文件,内容如下:
#namesrvAddr
namesrvAddr =写自己的
#producerGroupName
producerGroupName = 写自己的
#instanceName
instanceName = 写自己的
# topic - name
busTopic =写自己的
(4)使用的时候直接用注解初始化ProducerUtil producerUti ,然后调用send方法即可。
2、Push消费者
(1)这里用的是push消费者工具类
package com.test.service.impl.mq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* 推模式下的mq消费服务
*/
public class PushConsumer {
private static Logger logger = LoggerFactory.getLogger("PushConsumer");
private String topic;
private String consumerGroupName;
private String namesrvAddr;
private String instanceName;
private MessageListenerConcurrently messageListenerConcurrently;
private DefaultMQPushConsumer consumer;
public void init() throws MQClientException {
//先把任务队列清空
logger.info("[PushConsumer 开始初始化消费者]");
//updateOffsetByTime();
consumer = new DefaultMQPushConsumer(consumerGroupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumerGroup(consumerGroupName);
consumer.setInstanceName(instanceName);
consumer.subscribe(topic, "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(messageListenerConcurrently);
consumer.start();
logger.info("[PushConsumer 初始化消费者成功]");
}
private void updateOffsetByTime(){
long currentTime = System.currentTimeMillis();
MqAdminExecutor.resetOffsetByTime(namesrvAddr, consumerGroupName, topic, currentTime);
}
public void destroy() throws MQClientException{
consumer.shutdown();
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public MessageListenerConcurrently getMessageListenerConcurrently() {
return messageListenerConcurrently;
}
public void setMessageListenerConcurrently(MessageListenerConcurrently messageListenerConcurrently) {
this.messageListenerConcurrently = messageListenerConcurrently;
}
public String getConsumerGroupName() {
return consumerGroupName;
}
public void setConsumerGroupName(String consumerGroupName) {
this.consumerGroupName = consumerGroupName;
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
public void setConsumer(DefaultMQPushConsumer consumer) {
this.consumer = consumer;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
}
(2)我们在使用这个PushConsumer类时,要先初始化好一些配置,所以创建一个rocketmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd">
<context:property-placeholder location="classpath:rockertmq-comsumer.properties"/>
<!-- 拉取消息后的处理 -->
<bean id="pushConsumerHandler" class="com.test.service.impl.mq.PushConsumerHandler">
</bean>
<!-- 推送消费 -->
<bean id="pushConsumer" class="com.test.service.impl.mq.PushConsumer"
init-method="init"
destroy-method="destroy"
scope="singleton">
<property name="consumerGroupName" value="#{consumerGroupName}" />
<property name="namesrvAddr" value="#{namesrvAddr}" />
<property name="instanceName" value="#{instanceName}" />
<property name="topic" value="#{topic}" />
<property name="messageListenerConcurrently" ref="pushConsumerHandler" />
</bean>
</beans>
(3)rockertmq-consumer.properties文件,内容如下:
#namesrvAddr
namesrvAddr=写自己的
#consumerGroupName
consumerGroupName=写自己的
#instanceName
instanceName=写自己的
#topic
topic=写自己的
#pullThreadNums
pullThreadNums=25
#pullNextDelayTimeMillis
pullNextDelayTimeMillis=1000
(4)获取到消息的结果处理器
public class PushConsumerHandler implements MessageListenerConcurrently {
@Overridepublic
ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//自己的消息结果处理
}
}
2、定时Pull消费者
(1)这里用的是push消费者工具类
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 定时拉取服务
*/
public class PullSchduleConsumer {
//日志
private static final Logger logger = LoggerFactory.getLogger(PullSchduleConsumer.class);
//定时拉取消费者服务
private MQPullConsumerScheduleService mqPullConsumerScheduleService;
//命名服务地址
private String namesrvAddr;
//消费组组名
private String consumerGroupName;
//实例名称
private String instanceName;
//消费主题
private String topic;
//拉取线程数量
private Integer pullThreadNums;
//消费处理
private PullTaskCallback pullTaskCallbackHandler;
/**
* 初始化定时
* @throws MQClientException
*/
public void init() throws MQClientException{
logger.info("PullSchduleConsumer initialize!");
logger.info("consumerGroupName="+consumerGroupName);
logger.info("namesrvAddr="+namesrvAddr);
logger.info("instanceName="+instanceName);
logger.info("topic="+topic);
mqPullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroupName);
mqPullConsumerScheduleService.getDefaultMQPullConsumer().setNamesrvAddr(namesrvAddr);
mqPullConsumerScheduleService.getDefaultMQPullConsumer().setInstanceName(instanceName);
mqPullConsumerScheduleService.setMessageModel(MessageModel.CLUSTERING);//默认使用集群模式
mqPullConsumerScheduleService.registerPullTaskCallback(topic, pullTaskCallbackHandler);
mqPullConsumerScheduleService.setPullThreadNums(pullThreadNums);
mqPullConsumerScheduleService.start();
logger.info("----PullSchduleConsumer has start successfully!----");
}
/**
* 停止定时消费服务
*/
public void destroy(){
mqPullConsumerScheduleService.shutdown();
logger.info("----PullSchduleConsumer has stop successfully!----");
}
/*****get and set*******/
public MQPullConsumerScheduleService getMqPullConsumerScheduleService() {
return mqPullConsumerScheduleService;
}
public void setMqPullConsumerScheduleService(
MQPullConsumerScheduleService mqPullConsumerScheduleService) {
this.mqPullConsumerScheduleService = mqPullConsumerScheduleService;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getConsumerGroupName() {
return consumerGroupName;
}
public void setConsumerGroupName(String consumerGroupName) {
this.consumerGroupName = consumerGroupName;
}
public String getInstanceName() {
return instanceName;
}
public void setInstanceName(String instanceName) {
this.instanceName = instanceName;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public PullTaskCallback getPullTaskCallbackHandler() {
return pullTaskCallbackHandler;
}
public void setPullTaskCallbackHandler(PullTaskCallback pullTaskCallbackHandler) {
this.pullTaskCallbackHandler = pullTaskCallbackHandler;
}
public void setPullThreadNums(Integer pullThreadNums) {
this.pullThreadNums = pullThreadNums;
}
}
(2)我们在使用这个PullConsumer类时,要先初始化好一些配置,所以创建一个rocketmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:p="http://www.springframework.org/schema/p" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd">
<!-- 引入config -->
<context:property-placeholder location="classpath*:properties/rocketmq-consumer.properties" />
<!-- 拉取消息后的处理 -->
<bean id="pullSchduleConsumerHandler" class="com.test.service.impl.mq.PullSchduleConsumerHandler">
</bean>
<!-- 定时拉取消费者 -->
<bean id="pullSchduleConsumer" class="com.test.service.impl.mq.PullSchduleConsumer"
init-method="init"
destroy-method="destroy"
scope="singleton">
<property name="consumerGroupName" value="#{consumerGroupName}" />
<property name="namesrvAddr" value="#{namesrvAddr}" />
<property name="instanceName" value="#{nstanceName}" />
<property name="topic" value="#{topic}" />
<property name="pullThreadNums" value="#{pullThreadNums}" />
<property name="pullTaskCallbackHandler" ref="pullSchduleConsumerHandler" />
</bean>
</beans>
(3)rockertmq-consumer.properties文件,内容如下:
#namesrvAddr
namesrvAddr=写自己的
#consumerGroupName
consumerGroupName=写自己的
#instanceName
instanceName=写自己的
#topic
topic=写自己的
#pullThreadNums
pullThreadNums=25
#pullNextDelayTimeMillis
pullNextDelayTimeMillis=1000
(4)获取到消息的结果处理器
public class PullSchduleConsumerHandler implements
PullTaskCallback {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
//自己的消息结果处理
}
}