RocketMQ入门
源码和应用下载
这里以RocketMQ的4.3.0版本为例,本地环境为windows10,jdk1.8, maven3.2.1.
源码下载地址: http://mirrors.hust.edu.cn/apache/rocketmq/4.3.0/rocketmq-all-4.3.0-source-release.zip
应用下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.0/rocketmq-all-4.3.0-bin-release.zip
启动
Windows下需要配置环境变量,ROCKETMQ_HOME, 我这里配置为: E:software
ocketmq-all-4.3.0-bin-release
配置完环境变量后,就可以进入到bin目录:
启动server: 直接运行bin目录下的
mqnamesrv.cmd
启动broker: 运行
mqbroker.cmd
,发现一闪而过,查看bin目录下的bk.log日志,发现错误日志如下:错误: 找不到或无法加载主类 FilesJavajdk1.8.0_121lib;C:Program
再查看
mqbroker.cmd
源码,发现其最终调用了runbroker.cmd
。该脚本的倒数第二行为:set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"
知道问题所在: CLASSPATH的配置中是包含空格的,而空格导致最终解析出来的路径错误。最终我修改倒数第二行为:
set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""
至此可以顺利启动
本以为启动之后就能就行消息收发了,于是我按照官网示例进入RocketMQ的bin目录,并通过命令向broker发送消息:
tools org.apache.rocketmq.example.quickstart.Producer
结果一直报错,搜索得知在windows下需要配置环境变量NAMESRV_ADDR
为127.0.0.1:9876
配置完成之后,再依次启动mqnamesrv和mqbroker,重新测试Producer发现Producer的输出大致如下:
......
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115E9C03E5, offsetMsgId=C0A8130100002A9F000000000002BC96, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=0], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115E9E03E6, offsetMsgId=C0A8130100002A9F000000000002BD4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115EA003E7, offsetMsgId=C0A8130100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=2], queueOffset=249]
11:44:47.790 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.19.1:10911] result: true
11:44:47.791 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:44:47.793 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.19.1:10909] result: true
在通过命令行运行Consumer:
tools org.apache.rocketmq.example.quickstart.Consumer
发现Consumer的输出为:
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=249, sysFlag=0, bornTimestamp=1537242287776, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287778, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002BDFE, commitLogOffset=179710, bodyCRC=638172955, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409812, UNIQ_KEY=C0A8029D46D461BBE9BA5A115EA003E7, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 57], transactionId='null'}]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=248, sysFlag=0, bornTimestamp=1537242287768, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287768, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002BB2E, commitLogOffset=178990, bodyCRC=801108784, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409811, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E9803E3, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 53], transactionId='null'}]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=247, sysFlag=0, bornTimestamp=1537242287761, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287761, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002B85E, commitLogOffset=178270, bodyCRC=684865321, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409807, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E9103DF, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 49], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=246, sysFlag=0, bornTimestamp=1537242287753, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287753, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002B58E, commitLogOffset=177550, bodyCRC=1487577949, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409807, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E8903DB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 55], transactionId='null'}]]
关闭
关闭的步骤与启动正好相反
- 关闭broker:
mqshutdown broker
- 关闭namesrv:
mqshutdown namesrv
简单示例
在进行简单的示例之前,我们先要知道为什么会出现RocketMQ,下面一段话摘自RocketMQ官网:
Based on our research, with increased queues and virtual topics in use, ActiveMQ IO module reaches a bottleneck. We tried our best to solve this problem through throttling, circuit breaker or degradation, but it did not work well. So we begin to focus on the popular messaging solution Kafka at that time. Unfortunately, Kafka can not meet our requirements especially in terms of low latency and high reliability, see here for details.
In this context, we decided to invent a new messaging engine to handle a broader set of use cases, ranging from traditional pub/sub scenarios to high volume real-time zero-loss tolerance transaction system. We believe this solution can be beneficial, so we would like to open source it to the community. Today, more than 100 companies are using the open source version of RocketMQ in their business. We also published a commercial distribution based on RocketMQ, a PaaS product called the Alibaba Cloud Platform.
可以知道RocketMQ是阿里在使用ActiveMQ时,出现了IO瓶颈,无法满足阿里业务所需要的低延迟和高可靠性要求时自己研发出来。并且最终捐赠给Apache,成为顶级开源项目的。 high volume real-time zero-loss tolerance transaction system
是其核心特点。
下面通过一个简单的示例,来说明RocketMQ的基本使用:
引入pom依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
Producer
Producer一般分为三种模式: 同步、异步和单向,具体代码如下:
public class SimpleProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException,
MQBrokerException, InterruptedException {
/**
* 同步消息发送: 一般用来进行通知、短信等重要消息的同步
*/
// syncProducer();
/**
* 异步消息发送: 一般用来对方法调用响应时间有较严格要求的情况下,异步调用,立即返回
* 不同于同步的唯一在于: send方法调用的时候多携带一个回调接口参数,用来异步处理消息发送结果
*/
asyncProducer();
/**
* 单向模式: 一般用来对可靠性有一定要求的消息发送,例如日志系统
* 不同于同步的唯一之处在于: 调用的是sendOneway方法,且该方法不会给调用者任何返回值
*/
// oneWayProducer();
}
private static void oneWayProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
// STEP1: 创建Producer并且指定组名
DefaultMQProducer oneWayProducer = new DefaultMQProducer("GroupA");
// STEP2: 指定nameServer地址
oneWayProducer.setNamesrvAddr("localhost:9876");
// STEP3: 启动Producer
oneWayProducer.start();
// STEP4: 循环发送消息
for (int i = 0; i < 50; i++) {
Message message = new Message("OneWayTopic", "TagA",
("OneWayMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
oneWayProducer.sendOneway(message);
}
// STEP5: 关闭Producer
oneWayProducer.shutdown();
}
private static void asyncProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
// STEP1: 创建Producer并且指定组名
DefaultMQProducer asyncProducer = new DefaultMQProducer("GroupA");
// STEP2: 指定nameServer地址
asyncProducer.setNamesrvAddr("localhost:9876");
// STEP3: 启动Producer
asyncProducer.start();
asyncProducer.setRetryTimesWhenSendAsyncFailed(0); // 设置异步发送失败重试次数,默认为2
// STEP4: 循环发送消息
for (int i = 0; i < 50; i++) {
Message message = new Message("AsyncTopic", "TagA",
("AsyncMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 创建回调函数处理发送成功或者异常
asyncProducer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}
// STEP5: 关闭Producer
TimeUnit.SECONDS.sleep(10); // 睡眠10秒,确保消息都发送出去
asyncProducer.shutdown();
}
private static void syncProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
// STEP1: 创建Producer并且指定组名
DefaultMQProducer syncProducer = new DefaultMQProducer("GroupA");
// STEP2: 指定nameServer地址
syncProducer.setNamesrvAddr("localhost:9876");
// STEP3: 启动Producer
syncProducer.start();
// STEP4: 循环发送消息
for (int i = 0; i < 50; i++) {
Message message = new Message("SyncTopic", "TagA",
("SyncMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = syncProducer.send(message);
System.out.println(sendResult);
}
// STEP5: 关闭Producer
syncProducer.shutdown();
}
}
Consumer
consumer的实现就较为简单了,定义一个事件监听接口即可.
public class SimpleConsumer {
public static void main(String[] args) throws MQClientException {
// STEP1: 创建默认Consumer并指定
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA");
// STEP2: 指定nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// STEP3: 订阅对应主题和tag
consumer.subscribe("AsyncTopic", "*");
// STEP4: 注册接收到broker消息后的处理接口
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
System.out.println(new String(msgs.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// STEP5: 启动consumer (必须在注册完消息监听器之后启动,否则会报错)
consumer.start();
System.out.println("Consumer started......");
}
}
总结
- 运行Producer的时候必须保证nameServer和broker都正常运行,否则会报
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic
- 即使先运行Producer只要在运行Consumer之前,未重启broker或者nameServer。Consumer启动时还是能正常收到消息
参考链接