先创建Topic,创建命令是如下(在bin目录下执行)
sh mqadmin updateTopic -t TopicTest -n 192.168.32.128:9876 -b localhost:10911
导入pom包
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
/** * 普通消息消费者 */ public class Consumer { public static final String NAME_SERVER_ADDR = "192.168.32.128:9876"; public static void main(String[] args) throws MQClientException { // 1. 创建消费者(Push)对象 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 consumer.setNamesrvAddr(NAME_SERVER_ADDR); consumer.setMaxReconsumeTimes(-1);// 消费重试次数 -1代表16次 // 3. 订阅对应的主题和Tag consumer.subscribe("TopicTest", "*"); // 4. 注册消息接收到Broker消息后的处理接口 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { MessageExt messageExt = list.get(0); System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); // 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错) consumer.start(); System.out.println("已启动消费者"); } }
/** * 普通消息消费者 */ public class PullConsumer { public static final String NAME_SERVER_ADDR = "192.168.32.128:9876"; public static void main(String[] args) throws Exception { // 1. 创建消费者(Pull)对象 DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 consumer.setNamesrvAddr(NAME_SERVER_ADDR); consumer.start(); // 3. 获取到对于topic的queue列表 Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("TopicTest"); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { try { // 4. 循环遍历 for (MessageQueue messageQueue : messageQueues) { // 5. 获取读取位置 long offset = consumer.fetchConsumeOffset(messageQueue, true); // 6. 从指定位置取queue中的消息,每次最多10条。 如果没有则阻塞等待 PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, offset, 10); // 7. 存储Offset,客户端每隔5s会定时刷新到Broker() System.out.println(pullResult.getNextBeginOffset()); consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); // 8. 遍历结果 if (pullResult.getPullStatus() == PullStatus.FOUND) { List<MessageExt> messageExtList = pullResult.getMsgFoundList(); for (MessageExt messageExt : messageExtList) { System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), messageExt.getTags(), new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)); } } } } catch (Exception e) { e.printStackTrace(); } }, 1000L, 1000L, TimeUnit.MILLISECONDS); } }
一、异步发送消息
Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下⼀条消息。Producer 通过回调接口接收 MQ 响应,并处理响应结果。该方式的消息可靠性可以得到保障,消息发送效率也可以。
一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
/** * 异步消息 * 一般用来对方法调用响应时间有较严格要求的情况下,异步调用,立即返回 * 不同于同步的唯一在于: send方法调用的时候多携带一个回调接口参数,用来异步处理消息发送结果 */ public class AsyncProducer { public static final String NAME_SERVER_ADDR = "192.168.32.128:9876"; public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException { // 1:创建生产者对象,并指定组名 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2:指定NameServer地址 producer.setNamesrvAddr(NAME_SERVER_ADDR); // 3:启动生产者 producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); // 设置异步发送失败重试次数,默认为2 int count = 10; CountDownLatch cd = new CountDownLatch(count); // 4:循环发送消息 for (int i = 0; i < count; i++) { final int index = i; // ID110:业务数据的ID,比如用户ID、订单编号等等 Message msg = new Message("TopicTest", "TagB", "ID110", ("Hello World " + index).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送异步消息 producer.send(msg, new SendCallback() { /** * 发送成功的回调函数 * 但会结果有多种状态,在SendStatus枚举中定义 * @param sendResult */ @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK MSG_ID:%s %n", index, sendResult.getMsgId()); cd.countDown(); } /** * 发送失败的回调函数 * @param e */ @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); cd.countDown(); } }); } // 确保消息都发送出去了 cd.await(); // 5:关闭生产者 producer.shutdown(); } }
二、单向发送消息
Producer 仅负责发送消息,不等待。该发送方式时 MQ 不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。此方式发送消息的过程耗时非常短,一般在微秒级别。
一般用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
/** * 单向模式 * 一般用来对可靠性有一定要求的消息发送,例如日志系统 * 不同于同步的唯一之处在于:调用的是sendOneway方法,且方法不返回任何值,即调用者不需要关心成功或失败 */ public class OnewayProducer { public static final String NAME_SERVER_ADDR = "192.168.32.128:9876"; public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException { // 1:创建生产者对象 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2:指定NameServer地址 producer.setNamesrvAddr(NAME_SERVER_ADDR); // 3:启动生产者 producer.start(); // 4:发送消息 for (int i = 0; i < 10; i++) { Message msg = new Message("TopicTest", "TagC", ("Hello OneWay :" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } System.out.println("消息已发送"); producer.shutdown(); } }
三、同步发送消息
Producer 发出⼀条消息后,会在收到 MQ 返回的 ACK 之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。同步发送应用场景广泛,例如通知邮件、报名短信通知等。
/** * 发送同步消息 * 可靠的同步传输用于广泛的场景,如重要的通知消息,短信通知,短信营销系统等。 */ public class SyncProducer { public static final String NAME_SERVER_ADDR = "192.168.32.128:9876"; public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { // 1. 创建生产者对象 DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST"); // 2. 设置NameServer的地址,如果设置了环境变量NAMESRV_ADDR,可以省略此步 producer.setNamesrvAddr(NAME_SERVER_ADDR); // 3. 启动生产者 producer.start(); // 4. 生产者发送消息 for (int i = 0; i < 10; i++) { Message message = new Message("TopicTest", "TagA", ("Hello MQ:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = producer.send(message); System.out.printf("发送结果:%s%n", result); } // 5. 停止生产者 producer.shutdown(); } }