zoukankan      html  css  js  c++  java
  • MQ任意延时消息(二)基于客户端实现

    启动RocketMQ

    • 启动nameserver

    • 修改broker配置参数,新增

      messageDelayLevel=1s 2s 4s 8s 16s 32s 64s 128s 256s 512s 1024s 2048s 4096s 8192s 16384s 32768s 65536s 131072s
      
    • 启动broker

    公共代码

    常量类

    package com.zby.client;
    
    /**
     * 任意延时常量类
     * 
     * @author zby
     *
     */
    public class ArbitrarityDelayConstants {
    
    	public static final String NAME_SERVER_ADDRESS = "localhost:9876";
    	public static final String ARBITRARITY_DELAY_TOPIC = "arbitrarity_delay_topic";
    	public static final String ARBITRARITY_DELAY_PRODUCER_GROUP = "arbitrarity_delay_producer_group";
    	public static final String ARBITRARITY_DELAY_CONSUMER_GROUP = "arbitrarity_delay_consumer_group";
    	public static final String REMAIN_DELAY_KEY = "MSG_REMAIN_DELAY_CLIENT";
    }
    
    

    延时等级转换工具类

    package com.zby.client;
    
    /**
     * 任意延时工具类
     * 
     * @author zby
     *
     */
    public class ArbitrarilyDelayUtil {
    
    	/**
    	 * 延时时间
    	 */
    	private static final long[] delayArray = new long[] { 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192,
    			16384, 32768, 65536, 131072 };
    
    	private ArbitrarilyDelayUtil() {
    	}
    
    	/**
    	 * 根据延时等级获取延时时间
    	 * 
    	 * @param delayLevel 延时等级
    	 * @return 延时时间
    	 */
    	public static long getDelayTimeFromDelayLevel(int delayLevel) {
    		validDelayLevel(delayLevel);
    		if (delayLevel == 0) {
    			return 0;
    		}
    		return delayArray[delayLevel - 1];
    
    	}
    
    	/**
    	 * 根据延时时间获取最近的延时等级
    	 * 
    	 * @param delayTime
    	 * @return
    	 */
    	public static int getLatestDelayLevelFromDelayTime(long delayTime) {
    		validDelayTime(delayTime);
    		if (delayTime == 0) {
    			return 0;
    		}
    		for (int i = 0; i < delayArray.length - 1; i++) {
    			if (delayTime >= delayArray[i] && delayTime < delayArray[i + 1]) {
    				return i + 1;
    			}
    		}
    		return delayArray.length;
    	}
    
    	/**
    	 * 校验延时时间
    	 */
    	private static void validDelayTime(long delayTime) {
    		if (delayTime < 0) {
    			throw new IllegalArgumentException("Not supported delay time:" + delayTime);
    		}
    	}
    
    	/**
    	 * 校验延时等级
    	 */
    	private static void validDelayLevel(int delayLevel) {
    		if (delayLevel < 0 || delayLevel > delayArray.length) {
    			throw new IllegalArgumentException("Not supported delay level:" + delayLevel);
    		}
    	}
    }
    
    

    生产者

    生产者简单封装

    • 提供延时消息发送能力
    package com.zby.client;
    
    import java.util.concurrent.TimeUnit;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    /**
     * 任意延时生产者
     * 
     * @author zby
     *
     */
    public class ArbitrarilyDelayProducer extends DefaultMQProducer {
    
    	public ArbitrarilyDelayProducer(String producerGroup) {
    		super(producerGroup);
    	}
    
    	public SendResult send(final Message msg, long delay, TimeUnit timeUnit)
    			throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    		int delayTimeLevel = ArbitrarilyDelayUtil.getLatestDelayLevelFromDelayTime(timeUnit.toSeconds(delay));
    		msg.setDelayTimeLevel(delayTimeLevel);
    		msg.putUserProperty(ArbitrarityDelayConstants.REMAIN_DELAY_KEY, Long
    				.toString(timeUnit.toSeconds(delay) - ArbitrarilyDelayUtil.getDelayTimeFromDelayLevel(delayTimeLevel)));
    		return send(msg);
    	}
    
    }
    
    

    生产者示例

    package com.zby.client;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    public class ArbitrarilyDelayProducerDemo {
    
    	public static void main(String[] args) throws Exception {
    		ArbitrarilyDelayProducer arbitrarilyDelayProducer = new ArbitrarilyDelayProducer(
    				ArbitrarityDelayConstants.ARBITRARITY_DELAY_PRODUCER_GROUP);
    		arbitrarilyDelayProducer.setNamesrvAddr(ArbitrarityDelayConstants.NAME_SERVER_ADDRESS);
    		arbitrarilyDelayProducer.start();
    		Message msg = new Message(ArbitrarityDelayConstants.ARBITRARITY_DELAY_TOPIC,
    				("消息发送时间 :" + new Date()).getBytes(StandardCharsets.UTF_8));
    		SendResult sendResult = arbitrarilyDelayProducer.send(msg, 0, TimeUnit.SECONDS);
    		System.out.println("消息发送结果:" + sendResult.getSendStatus());
    		arbitrarilyDelayProducer.shutdown();
    	}
    }
    
    

    消费者

    封装消费者监听器

    package com.zby.client;
    
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class ArbitrarilyDelayMessageListenerConcurrently implements MessageListenerConcurrently {
    
    	private ArbitrarilyDelayProducer arbitrarilyDelayProducer;
    
    	private MessageListenerConcurrently proxyListener;
    
    	public ArbitrarilyDelayMessageListenerConcurrently(ArbitrarilyDelayProducer arbitrarilyDelayProducer,
    			MessageListenerConcurrently proxyListener) {
    		this.arbitrarilyDelayProducer = arbitrarilyDelayProducer;
    		this.proxyListener = proxyListener;
    	}
    
    	@Override
    	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    		MessageExt messageExt = msgs.get(0);
    		String remainDelay = messageExt.getUserProperty(ArbitrarityDelayConstants.REMAIN_DELAY_KEY);
    		if (remainDelay != null && Long.parseLong(remainDelay) > 0) {
    			try {
    				System.out.println("msgId:" + messageExt.getMsgId() + ",remainDelay:" + remainDelay);
    				arbitrarilyDelayProducer.send(messageExt, Long.parseLong(remainDelay), TimeUnit.SECONDS);
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    		}
    		return proxyListener.consumeMessage(msgs, context);
    	}
    
    }
    
    

    消费者示例

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package com.zby.client;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Date;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    
    public class ArbitrarilyDelayConsumerDemo {
    
    	public static void main(String[] args) throws MQClientException {
    
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
    				ArbitrarityDelayConstants.ARBITRARITY_DELAY_CONSUMER_GROUP);
    		consumer.setNamesrvAddr(ArbitrarityDelayConstants.NAME_SERVER_ADDRESS);
    		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    		consumer.subscribe(ArbitrarityDelayConstants.ARBITRARITY_DELAY_TOPIC, "*");
    
    		ArbitrarilyDelayProducer arbitrarilyDelayProducer = new ArbitrarilyDelayProducer(
    				ArbitrarityDelayConstants.ARBITRARITY_DELAY_PRODUCER_GROUP);
    		arbitrarilyDelayProducer.setNamesrvAddr(ArbitrarityDelayConstants.NAME_SERVER_ADDRESS);
    		arbitrarilyDelayProducer.start();
    
    		ArbitrarilyDelayMessageListenerConcurrently arbitrarilyDelayMessageListenerConcurrently = new ArbitrarilyDelayMessageListenerConcurrently(
    				arbitrarilyDelayProducer, (msgs, context) -> {
    					System.out.printf("消费消息了,当前时间 %s:%s Receive New Messages: %s %n", new Date(),
    							Thread.currentThread().getName(),
    							new String(msgs.get(0).getBody(), StandardCharsets.UTF_8));
    					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    				});
    		consumer.registerMessageListener(arbitrarilyDelayMessageListenerConcurrently);
    
    		consumer.start();
    		System.out.printf("Consumer Started.%n");
    	}
    }
    
    

    启动测试

    • 启动消费者
    • 启动生产者
    • 消费者输出结果
    RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
    RocketMQLog:WARN Please initialize the logger system properly.
    Consumer Started.
    msgId:C0A83EF0A7082A139A556ECFF5E10000,remainDelay:11
    msgId:C0A83EF0A7082A139A556ECFF5E10000,remainDelay:3
    msgId:C0A83EF0A7082A139A556ECFF5E10000,remainDelay:1
    消费消息了,当前时间 Wed Jul 22 12:25:49 CST 2020:ConsumeMessageThread_4 Receive New Messages: 消息发送时间 :Wed Jul 22 12:25:22 CST 2020 
    

    优点

    • 仅客户端代码修改,实现难度小

    缺点

    • 时间轮降级浪费一定网络资源
  • 相关阅读:
    顺序表代码(指针实现)
    顺序表代码
    爬虫问题之Unknown command: crawl
    MongoDB的启动
    python复制文件到文件夹中
    .content和.text的区别
    ip协议,IP,子网掩码,ping命令是什么
    网络通信流程
    tcp和udp得区别
    flask中的目录解析
  • 原文地址:https://www.cnblogs.com/zby9527/p/13359828.html
Copyright © 2011-2022 走看看