zoukankan      html  css  js  c++  java
  • RocketMQ定时(延迟)消息

    RocketMQ 不支持任意时间自定义的延迟消息,仅支持内置预设值的延迟时间间隔的延迟消息。

    预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

    延时消息的使用场景

    比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

    生产

    package com.xin.rocketmq.demo.testrun;
    
    import com.xin.rocketmq.demo.config.JmsConfig;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    public class ProducerDelay {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    
            producer.setNamesrvAddr("192.168.10.11:9876");
    
            producer.start();
            Message msg1 = new Message(
                    JmsConfig.TOPIC,
                    "订单001".getBytes());
    
            msg1.setDelayTimeLevel(2);//延迟5秒
    
            Message msg2 = new Message(
                    JmsConfig.TOPIC,
                    "订单001".getBytes());
    
            msg2.setDelayTimeLevel(4);//延迟30秒
    
            SendResult sendResult1 = producer.send(msg1);
            SendResult sendResult2 = producer.send(msg2);
            System.out.println("Product1-同步发送-Product信息={}" + sendResult1);
            System.out.println("Product2-同步发送-Product信息={}" + sendResult2);
            producer.shutdown();
        }
    }

    消费

    package com.xin.rocketmq.demo.testrun;
    
    import com.xin.rocketmq.demo.config.JmsConfig;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class ConsumerDelay {
    
        public static void main(String[] args) throws Exception {
            // 实例化消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    
            // 设置NameServer的地址
            consumer.setNamesrvAddr("192.168.10.11:9876");
    
            // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
            consumer.subscribe(JmsConfig.TOPIC, "*");
            // 注册消息监听者
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                    for (MessageExt message : messages) {
                        // Print approximate delay time period
                        System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 启动消费者
            consumer.start();
        }
    }
  • 相关阅读:
    数据库优化
    Oracle语句集锦
    MVC Razor标签
    转载 操作MyBatis基础
    mysql sqlserver Oracle字符串连接
    Word
    部署IIS错误
    => 朗姆达表达式带入符号
    wcf例子01
    idea通过springboot初始化器新建项目
  • 原文地址:https://www.cnblogs.com/starcrm/p/13061971.html
Copyright © 2011-2022 走看看