zoukankan      html  css  js  c++  java
  • ActiveMQ学习(三)

    Topic模型消息的持久化之JDBC

    Activemq.xml配置文件修改和二中一样

    1、Producer中发送消息修改为持久化方式

    messageProducer.send(textMessage, DeliveryMode.PERSISTENT, 4, 5 * 60 * 1000);

    完整代码:

    public class DurableProducer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    
        private static final String TOPIC_NAME = "myDTopic";
    
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = connectionFactory.createConnection();
    
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建目标队列
            Destination destination = session.createTopic(TOPIC_NAME);
    
            MessageProducer messageProducer = session.createProducer(destination);
    
            for (int i = 0; i < 5; i++) {
                TextMessage textMessage = session.createTextMessage("发送Topic持久消息" + i);
                messageProducer.send(textMessage, DeliveryMode.PERSISTENT, 4, 5 * 60 * 1000);
                System.out.println("发送Topic持久消息" + i);
            }
    
            connection.close();
        }
    
    }

    2、Consumer中给连接工厂配置一个属性clientId

     connection.setClientID("client-1");

    创建持久化订阅

     //创建目标队列
            Topic topic = session.createTopic(TOPIC_NAME);
    
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "client-1");

    Consumer完整代码示例

    public class DurableConsumer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    
        private static final String TOPIC_NAME = "myDTopic";
    
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = connectionFactory.createConnection();
    
            connection.setClientID("client-1");
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建目标队列
            Topic topic = session.createTopic(TOPIC_NAME);
    
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "client-1");
    
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("获取持久Topic消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    测试步骤:
    1.开启ActiveMQ的服务

    2.运行DurableConsumer,然后再关闭。目的就是在broker中报名订阅,名字就是subscription+ClientID组成的唯一名字。 

    3.关闭所有的listener之后,运行DurableProducer发布topic消息。(如果是非持久化订阅时,listener是接收不到消息的,持久化topic了之后,就可以接收到了。)

    4、运行DurableConsumer,可以接收到消息,控制台输出

    activemq_msgs表中的信息5分钟后被删除

  • 相关阅读:
    Python for Data Science
    Python for Data Science
    Python for Data Science
    Python for Data Science
    Python for Data Science
    Python for Data Science
    Python for Data Science
    Python for Data Science
    Python for Data Science
    软件工程实践总结
  • 原文地址:https://www.cnblogs.com/keleaiww/p/11170227.html
Copyright © 2011-2022 走看看