zoukankan      html  css  js  c++  java
  • MQ任意延时消息(三)基于服务端实现

    启动RocketMQ

    • 启动nameserver

    • 修改broker配置参数,新增

      messageDelayLevel=1s 2s 4s 8s 16s 32s 64s 128s 256s 512s 1024s 2048s 4096s 8192s 16384s 32768s 65536s 131072s
      
    • org.apache.rocketmq.common.message.MessageConst新增常量

    public static final String PROPERTY_MSG_REMAIN_DELAY = "MSG_REMAIN_DELAY_SERVER";
    
    • org.apache.rocketmq.store.util包下新增工具类
    package org.apache.rocketmq.store.util;
    
    /**
     * 任意延时工具类
     * 
     * @author zby
     *
     */
    public class ArbitrarilyDelayUtil {
    
    	/**
    	 * 延时时间
    	 */
    	private static final long[] delayArray = new long[] { 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192,
    			16384, 32768, 65536, 131072 };
    
    	private ArbitrarilyDelayUtil() {
    	}
    
    	/**
    	 * 根据延时等级获取延时时间
    	 * 
    	 * @param delayLevel 延时等级
    	 * @return 延时时间
    	 */
    	public static long getDelayTimeFromDelayLevel(int delayLevel) {
    		validDelayLevel(delayLevel);
    		if (delayLevel == 0) {
    			return 0;
    		}
    		return delayArray[delayLevel - 1];
    
    	}
    
    	/**
    	 * 根据延时时间获取最近的延时等级
    	 * 
    	 * @param delayTime
    	 * @return
    	 */
    	public static int getLatestDelayLevelFromDelayTime(long delayTime) {
    		validDelayTime(delayTime);
    		if (delayTime == 0) {
    			return 0;
    		}
    		for (int i = 0; i < delayArray.length - 1; i++) {
    			if (delayTime >= delayArray[i] && delayTime < delayArray[i + 1]) {
    				return i + 1;
    			}
    		}
    		return delayArray.length;
    	}
    
    	/**
    	 * 校验延时时间
    	 */
    	private static void validDelayTime(long delayTime) {
    		if (delayTime < 0) {
    			throw new IllegalArgumentException("Not supported delay time:" + delayTime);
    		}
    	}
    
    	/**
    	 * 校验延时等级
    	 */
    	private static void validDelayLevel(int delayLevel) {
    		if (delayLevel < 0 || delayLevel > delayArray.length) {
    			throw new IllegalArgumentException("Not supported delay level:" + delayLevel);
    		}
    	}
    }
    
    
    • org.apache.rocketmq.store.CommitLog类新增代码,根据新增代码前的代码找到新增的位置
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    			// Delay Delivery
    			if (msg.getDelayTimeLevel() > 0) {
            省略.....
          }
      		<!-- 新增代码开始-->
    			if (msg.getUserProperty(MessageConst.PROPERTY_MSG_REMAIN_DELAY) != null
    					&& Long.parseLong(msg.getUserProperty(MessageConst.PROPERTY_MSG_REMAIN_DELAY)) > 0) {
    				long remainDelay = Long.parseLong(msg.getUserProperty(MessageConst.PROPERTY_MSG_REMAIN_DELAY));
    				int delayTimeLevel = ArbitrarilyDelayUtil.getLatestDelayLevelFromDelayTime(remainDelay);
    				msg.setDelayTimeLevel(delayTimeLevel);
    				msg.putUserProperty(MessageConst.PROPERTY_MSG_REMAIN_DELAY,
    						Long.toString(remainDelay - ArbitrarilyDelayUtil.getDelayTimeFromDelayLevel(delayTimeLevel)));
    
    				topic = ScheduleMessageService.SCHEDULE_TOPIC;
    				queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    				// Backup real topic, queueId
    				MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    				MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    				msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
    				msg.setTopic(topic);
    				msg.setQueueId(queueId);
    			}
      		<!-- 新增代码结束-->
    		}
    
    • 启动broker

    公共代码

    延时等级转换工具类

    package com.zby.server;
    
    import java.util.concurrent.TimeUnit;
    
    import org.apache.rocketmq.common.message.Message;
    
    /**
     * 消息延时工具类
     * 
     * @author mac
     *
     */
    public class DelayUtil {
    
    	public static final String PROPERTY_MSG_REMAIN_DELAY = "MSG_REMAIN_DELAY_SERVER";
    
    	private DelayUtil() {
    	}
    
    	/**
    	 * 消息任意延时发送
    	 * 
    	 * @param msg       消息
    	 * @param delayTime 延迟时间
    	 * @param timeUnit  延迟时间单位
    	 */
    	public static void delayMsg(Message msg, long delayTime, TimeUnit timeUnit) {
    		msg.putUserProperty(PROPERTY_MSG_REMAIN_DELAY, Long.toString(timeUnit.toSeconds(delayTime)));
    	}
    }
    
    

    生产者

    生产者示例

    package com.zby.server;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    public class Producer {
    	public static void main(String[] args) throws Exception {
    
    		DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    		producer.setNamesrvAddr("localhost:9876");
    		producer.start();
    
    		Message msg = new Message("TopicTest", ("消息发送时间 :" + new Date()).getBytes(StandardCharsets.UTF_8));
    		DelayUtil.delayMsg(msg, 27, TimeUnit.MINUTES);
    		SendResult sendResult = producer.send(msg);
    		System.out.println("消息发送结果:" + sendResult.getSendStatus());
    		producer.shutdown();
    	}
    }
    
    

    消费者

    消费者示例

    package com.zby.server;
    
    import java.io.UnsupportedEncodingException;
    import java.util.Date;
    import java.util.List;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class Consumer {
    
    	public static void main(String[] args) throws Exception {
    
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    		consumer.setNamesrvAddr("localhost:9876");
    		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    		consumer.subscribe("TopicTest", "*");
    
    		consumer.registerMessageListener(new MessageListenerConcurrently() {
    
    			@Override
    			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    				try {
    					System.out.printf("now:%s,%s Receive New Messages: %s %n", new Date(),
    							Thread.currentThread().getName(),
    							new String(msgs.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET));
    				} catch (UnsupportedEncodingException e) {
    					e.printStackTrace();
    				}
    				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    			}
    		});
    
    		consumer.start();
    
    		System.out.printf("Consumer Started.%n");
    	}
    }
    
    

    启动测试

    • 启动消费者
    • 启动生产者
    • 消费者输出结果
    Consumer Started.
    now:Wed Jul 22 12:42:45 CST 2020,ConsumeMessageThread_1 Receive New Messages: 消息发送时间 :Wed Jul 22 12:42:18 CST 2020 
    

    优点

    • 客户端唯一需要的就是发送消息前调用DelayUtil.delayMsg(msg, 27, TimeUnit.MINUTES);
    • 生产消费代码都是copy的quickstart的demo,可以看出侵入性很小

    注意

    • 改了延时等级,重试的延时间隔也会更改
  • 相关阅读:
    MATLAB仿真学习笔记(一)
    SolidWorks学习笔记(一)
    机械制造技术学习笔记(七)
    基于MATLAB的多功能语音处理器
    MATLAB图形界面设计(下)
    36、如何避免僵尸进程?
    37、局部性原理你知道吗?主要有哪两大局部性原理?各自是什么?
    35、 守护进程、僵尸进程和孤儿进程
    32、什么是快表,你知道多少关于快表的知识?
    30、终端退出,终端运行的进程会怎样?31、如何让进程后台运行
  • 原文地址:https://www.cnblogs.com/zby9527/p/13359835.html
Copyright © 2011-2022 走看看