zoukankan      html  css  js  c++  java
  • ActiveMQ学习(三)

    Topic模型消息的持久化之JDBC

    Activemq.xml配置文件修改和二中一样

    1、Producer中发送消息修改为持久化方式

    messageProducer.send(textMessage, DeliveryMode.PERSISTENT, 4, 5 * 60 * 1000);

    完整代码:

    public class DurableProducer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    
        private static final String TOPIC_NAME = "myDTopic";
    
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = connectionFactory.createConnection();
    
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建目标队列
            Destination destination = session.createTopic(TOPIC_NAME);
    
            MessageProducer messageProducer = session.createProducer(destination);
    
            for (int i = 0; i < 5; i++) {
                TextMessage textMessage = session.createTextMessage("发送Topic持久消息" + i);
                messageProducer.send(textMessage, DeliveryMode.PERSISTENT, 4, 5 * 60 * 1000);
                System.out.println("发送Topic持久消息" + i);
            }
    
            connection.close();
        }
    
    }

    2、Consumer中给连接工厂配置一个属性clientId

     connection.setClientID("client-1");

    创建持久化订阅

     //创建目标队列
            Topic topic = session.createTopic(TOPIC_NAME);
    
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "client-1");

    Consumer完整代码示例

    public class DurableConsumer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    
        private static final String TOPIC_NAME = "myDTopic";
    
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = connectionFactory.createConnection();
    
            connection.setClientID("client-1");
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建目标队列
            Topic topic = session.createTopic(TOPIC_NAME);
    
            TopicSubscriber consumer = session.createDurableSubscriber(topic, "client-1");
    
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("获取持久Topic消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    测试步骤:
    1.开启ActiveMQ的服务

    2.运行DurableConsumer,然后再关闭。目的就是在broker中报名订阅,名字就是subscription+ClientID组成的唯一名字。 

    3.关闭所有的listener之后,运行DurableProducer发布topic消息。(如果是非持久化订阅时,listener是接收不到消息的,持久化topic了之后,就可以接收到了。)

    4、运行DurableConsumer,可以接收到消息,控制台输出

    activemq_msgs表中的信息5分钟后被删除

  • 相关阅读:
    Asp.net中文本框全选的实现
    如何在Sql Server中读取最近一段时间的记录,比如取最近3天的或最近3个月的记录。
    asp.net存储过程分页+GridView控件 几百万数据 超快
    Gridview各种功能+AspNetPager+Ajax实现无刷新存储过程分页 (留着用)
    asp.net导出EXCEL的好方法!(好用,导出全部数据)
    GridView分页的实现 ASP.NET c#(转)特好用
    asp.net c# repeater或gridview导出EXCEL的详细代码。
    c#实现word,excel转pdf代码及部分Office 2007文件格式转换为xps和pdf代码整理
    C# WORD操作实现代码(转载)
    如何提高程序员的生产率 (2)
  • 原文地址:https://www.cnblogs.com/keleaiww/p/11170227.html
Copyright © 2011-2022 走看看