zoukankan      html  css  js  c++  java
  • ActiveMQ消息的延时和定时投递

    ActiveMQ对消息延时和定时投递做了很好的支持,其内部启动Scheduled来对该功能支持,也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,只需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息。

    Property nametypedescription
    AMQ_SCHEDULED_DELAY long 延迟投递的时间
    AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
    AMQ_SCHEDULED_REPEAT int 重复投递次数
    AMQ_SCHEDULED_CRON String Cron表达式
    下面我们演示一下间隔性重复投递;

    生产者:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package cn.slimsmart.study.activemq;
     
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
     
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.ScheduledMessage;
     
    public class Producer {
     
        public static final String broker_url = "failover:(tcp://10.1.199.169:61616)";
        private static String queue_name = "test.queue";
     
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
            // 通过工厂创建一个连接
            Connection connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话 事务 自动ack
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue(queue_name);
            // 创建生产者
            MessageProducer producer = session.createProducer(destination);
            // 消息持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            TextMessage message = session.createTextMessage("test delay message:" + System.currentTimeMillis());
            long time = 60 * 1000;// 延时1min
            long period = 10 * 1000;// 每个10s
            int repeat = 6;// 6次
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
            message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
            // 发送消息
            producer.send(message);
            session.commit();
            producer.close();
            session.close();
            connection.close();
        }
    }
    消费者代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package cn.slimsmart.study.activemq;
     
    import java.util.concurrent.CountDownLatch;
     
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
     
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
     
    public class Consumer {
     
        public static final String broker_url = "failover:(tcp://10.1.199.169:61616)";
        private static String queue_name = "test.queue";
     
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, broker_url);
            // 通过工厂创建一个连接
            Connection connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话 事务 自动ack
            Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue(queue_name);
            // 创建消费者
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("receive message :" + ((TextMessage) message).getText());
                        message.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            new CountDownLatch(1).await();
        }
    }
  • 相关阅读:
    软件测试中桩模块与驱动模块的概念与区别(转载),打桩
    DataFactory使用和注意,排列组合
    SCWS中文分词,功能函数实例应用
    按指定长度截取中英文混合字符串
    CSS截取中英文混合字符串长度
    使DIV相对窗口大小左右拖动始终水平居中
    浮动5-常用列表显示(案例)
    多选项卡切换原理
    使当前对象相对于上层DIV 水平、垂直居中定位
    使图片相对于上层DIV始终水平、垂直都居中
  • 原文地址:https://www.cnblogs.com/wanghuaijun/p/6676513.html
Copyright © 2011-2022 走看看