zoukankan      html  css  js  c++  java
  • ActiveMQ学习(三)

    Topic模型消息的持久化之JDBC

    Activemq.xml配置文件修改和二中一样

    1、Producer中发送消息修改为持久化方式

    messageProducer.send(textMessage, DeliveryMode.PERSISTENT, 4, 5 * 60 * 1000);

    完整代码:

    public class DurableProducer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    
        private static final String TOPIC_NAME = "myDTopic";
    
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = connectionFactory.createConnection();
    
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建目标队列
            Destination destination = session.createTopic(TOPIC_NAME);
    
            MessageProducer messageProducer = session.createProducer(destination);
    
            for (int i = 0; i < 5; i++) {
                TextMessage textMessage = session.createTextMessage("发送Topic持久消息" + i);
                messageProducer.send(textMessage, DeliveryMode.PERSISTENT, 4, 5 * 60 * 1000);
                System.out.println("发送Topic持久消息" + i);
            }
    
            connection.close();
        }
    
    }

    2、Consumer中给连接工厂配置一个属性clientId

     connection.setClientID("client-1");

    创建持久化订阅

     //创建目标队列
            Topic topic = session.createTopic(TOPIC_NAME);
    
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "client-1");

    Consumer完整代码示例

    public class DurableConsumer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    
        private static final String TOPIC_NAME = "myDTopic";
    
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = connectionFactory.createConnection();
    
            connection.setClientID("client-1");
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建目标队列
            Topic topic = session.createTopic(TOPIC_NAME);
    
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "client-1");
    
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("获取持久Topic消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    测试步骤:
    1.开启ActiveMQ的服务

    2.运行DurableConsumer,然后再关闭。目的就是在broker中报名订阅,名字就是subscription+ClientID组成的唯一名字。 

    3.关闭所有的listener之后,运行DurableProducer发布topic消息。(如果是非持久化订阅时,listener是接收不到消息的,持久化topic了之后,就可以接收到了。)

    4、运行DurableConsumer,可以接收到消息,控制台输出

    activemq_msgs表中的信息5分钟后被删除

  • 相关阅读:
    Java线程池之ThreadPoolExecutor
    React Native开发环境的搭建
    Android Lint——内嵌于Android Studio的代码优化工具
    Android异步处理技术
    NavigationView的头部的事件监听
    进程间通信之AIDL
    跨进程通信之Messenger
    Android 进程增加存活率
    android MVP模式思考
    Vim学习
  • 原文地址:https://www.cnblogs.com/keleaiww/p/11170227.html
Copyright © 2011-2022 走看看