zoukankan      html  css  js  c++  java
  • RocketMQ(三)——————javaAPI (9.延迟消息)

    RocketMQ使用 messageDelayLevel 可以设置延迟投递:

    现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,

    从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,

    消息发送时间与设置的延时等级和重试次数有关


    默认配置为:

    messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h


    更改配置:

    在`broker.conf `中添加配置

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h


    这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间;

    时间单位支持:s、m、h、d,分别表示秒、分、时;

    java使用:

    发送消息时 可设置 延时等级

    message.setDelayTimeLevel(1);

    1、生产者样例

    发送延时消息
    
        //官网示例
       public static void main(String[] args) throws Exception {
          // 实例化一个生产者来产生延时消息
          DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
          // 启动生产者
          producer.start();
          int totalMessagesToSend = 100;
          for (int i = 0; i < totalMessagesToSend; i++) {
              Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
              // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
              message.setDelayTimeLevel(3);
              // 发送消息
              producer.send(message);
          }
           // 关闭生产者
          producer.shutdown();
      }

    2、消费者样例

    启动消费者等待传入订阅消息
     
        //官网示例
       public static void main(String[] args) throws Exception {
          // 实例化消费者
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
          // 订阅Topics
          consumer.subscribe("TestTopic", "*");
          // 注册消息监听者
          consumer.registerMessageListener(new MessageListenerConcurrently() {
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                  for (MessageExt message : messages) {
                      // Print approximate delay time period
                      System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                  }
                  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
              }
          });
          // 启动消费者
          consumer.start();
      }
  • 相关阅读:
    Hook钩子程序
    KMeans笔记 K值以及初始类簇中心点的选取
    自己用C#写的一个俄罗斯方块的小程序(附源代码)。
    那些帮助你成为优秀前端工程师的讲座——《JavaScript篇》
    Mac技巧合集第二期
    WCF增加UDP绑定(应用篇)
    第一个MVC4 Web应用程序
    jQuery的页面加载事件
    通过网页进行 iOS 应用内部分发
    sql count效率
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14598023.html
Copyright © 2011-2022 走看看