启动RocketMQ
-
启动nameserver
-
修改broker配置参数,新增
messageDelayLevel=1s 2s 4s 8s 16s 32s 64s 128s 256s 512s 1024s 2048s 4096s 8192s 16384s 32768s 65536s 131072s
-
启动broker
公共代码
常量类
package com.zby.client;
/**
* 任意延时常量类
*
* @author zby
*
*/
public class ArbitrarityDelayConstants {
public static final String NAME_SERVER_ADDRESS = "localhost:9876";
public static final String ARBITRARITY_DELAY_TOPIC = "arbitrarity_delay_topic";
public static final String ARBITRARITY_DELAY_PRODUCER_GROUP = "arbitrarity_delay_producer_group";
public static final String ARBITRARITY_DELAY_CONSUMER_GROUP = "arbitrarity_delay_consumer_group";
public static final String REMAIN_DELAY_KEY = "MSG_REMAIN_DELAY_CLIENT";
}
延时等级转换工具类
package com.zby.client;
/**
* 任意延时工具类
*
* @author zby
*
*/
public class ArbitrarilyDelayUtil {
/**
* 延时时间
*/
private static final long[] delayArray = new long[] { 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192,
16384, 32768, 65536, 131072 };
private ArbitrarilyDelayUtil() {
}
/**
* 根据延时等级获取延时时间
*
* @param delayLevel 延时等级
* @return 延时时间
*/
public static long getDelayTimeFromDelayLevel(int delayLevel) {
validDelayLevel(delayLevel);
if (delayLevel == 0) {
return 0;
}
return delayArray[delayLevel - 1];
}
/**
* 根据延时时间获取最近的延时等级
*
* @param delayTime
* @return
*/
public static int getLatestDelayLevelFromDelayTime(long delayTime) {
validDelayTime(delayTime);
if (delayTime == 0) {
return 0;
}
for (int i = 0; i < delayArray.length - 1; i++) {
if (delayTime >= delayArray[i] && delayTime < delayArray[i + 1]) {
return i + 1;
}
}
return delayArray.length;
}
/**
* 校验延时时间
*/
private static void validDelayTime(long delayTime) {
if (delayTime < 0) {
throw new IllegalArgumentException("Not supported delay time:" + delayTime);
}
}
/**
* 校验延时等级
*/
private static void validDelayLevel(int delayLevel) {
if (delayLevel < 0 || delayLevel > delayArray.length) {
throw new IllegalArgumentException("Not supported delay level:" + delayLevel);
}
}
}
生产者
生产者简单封装
- 提供延时消息发送能力
package com.zby.client;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* 任意延时生产者
*
* @author zby
*
*/
public class ArbitrarilyDelayProducer extends DefaultMQProducer {
public ArbitrarilyDelayProducer(String producerGroup) {
super(producerGroup);
}
public SendResult send(final Message msg, long delay, TimeUnit timeUnit)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
int delayTimeLevel = ArbitrarilyDelayUtil.getLatestDelayLevelFromDelayTime(timeUnit.toSeconds(delay));
msg.setDelayTimeLevel(delayTimeLevel);
msg.putUserProperty(ArbitrarityDelayConstants.REMAIN_DELAY_KEY, Long
.toString(timeUnit.toSeconds(delay) - ArbitrarilyDelayUtil.getDelayTimeFromDelayLevel(delayTimeLevel)));
return send(msg);
}
}
生产者示例
package com.zby.client;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class ArbitrarilyDelayProducerDemo {
public static void main(String[] args) throws Exception {
ArbitrarilyDelayProducer arbitrarilyDelayProducer = new ArbitrarilyDelayProducer(
ArbitrarityDelayConstants.ARBITRARITY_DELAY_PRODUCER_GROUP);
arbitrarilyDelayProducer.setNamesrvAddr(ArbitrarityDelayConstants.NAME_SERVER_ADDRESS);
arbitrarilyDelayProducer.start();
Message msg = new Message(ArbitrarityDelayConstants.ARBITRARITY_DELAY_TOPIC,
("消息发送时间 :" + new Date()).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = arbitrarilyDelayProducer.send(msg, 0, TimeUnit.SECONDS);
System.out.println("消息发送结果:" + sendResult.getSendStatus());
arbitrarilyDelayProducer.shutdown();
}
}
消费者
封装消费者监听器
package com.zby.client;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
public class ArbitrarilyDelayMessageListenerConcurrently implements MessageListenerConcurrently {
private ArbitrarilyDelayProducer arbitrarilyDelayProducer;
private MessageListenerConcurrently proxyListener;
public ArbitrarilyDelayMessageListenerConcurrently(ArbitrarilyDelayProducer arbitrarilyDelayProducer,
MessageListenerConcurrently proxyListener) {
this.arbitrarilyDelayProducer = arbitrarilyDelayProducer;
this.proxyListener = proxyListener;
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
String remainDelay = messageExt.getUserProperty(ArbitrarityDelayConstants.REMAIN_DELAY_KEY);
if (remainDelay != null && Long.parseLong(remainDelay) > 0) {
try {
System.out.println("msgId:" + messageExt.getMsgId() + ",remainDelay:" + remainDelay);
arbitrarilyDelayProducer.send(messageExt, Long.parseLong(remainDelay), TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return proxyListener.consumeMessage(msgs, context);
}
}
消费者示例
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zby.client;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
public class ArbitrarilyDelayConsumerDemo {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
ArbitrarityDelayConstants.ARBITRARITY_DELAY_CONSUMER_GROUP);
consumer.setNamesrvAddr(ArbitrarityDelayConstants.NAME_SERVER_ADDRESS);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(ArbitrarityDelayConstants.ARBITRARITY_DELAY_TOPIC, "*");
ArbitrarilyDelayProducer arbitrarilyDelayProducer = new ArbitrarilyDelayProducer(
ArbitrarityDelayConstants.ARBITRARITY_DELAY_PRODUCER_GROUP);
arbitrarilyDelayProducer.setNamesrvAddr(ArbitrarityDelayConstants.NAME_SERVER_ADDRESS);
arbitrarilyDelayProducer.start();
ArbitrarilyDelayMessageListenerConcurrently arbitrarilyDelayMessageListenerConcurrently = new ArbitrarilyDelayMessageListenerConcurrently(
arbitrarilyDelayProducer, (msgs, context) -> {
System.out.printf("消费消息了,当前时间 %s:%s Receive New Messages: %s %n", new Date(),
Thread.currentThread().getName(),
new String(msgs.get(0).getBody(), StandardCharsets.UTF_8));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.registerMessageListener(arbitrarilyDelayMessageListenerConcurrently);
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
启动测试
- 启动消费者
- 启动生产者
- 消费者输出结果
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
Consumer Started.
msgId:C0A83EF0A7082A139A556ECFF5E10000,remainDelay:11
msgId:C0A83EF0A7082A139A556ECFF5E10000,remainDelay:3
msgId:C0A83EF0A7082A139A556ECFF5E10000,remainDelay:1
消费消息了,当前时间 Wed Jul 22 12:25:49 CST 2020:ConsumeMessageThread_4 Receive New Messages: 消息发送时间 :Wed Jul 22 12:25:22 CST 2020
优点
- 仅客户端代码修改,实现难度小
缺点
- 时间轮降级浪费一定网络资源