zoukankan      html  css  js  c++  java
  • Spring整合ActiveMQ持久化到Mysql数据库

    一、ActiveMQ为什么需要持久化

      消息持久化就是将消息保存到磁盘上,这样的好处就是即使Activemq服务挂了,消息还保存在磁盘不会丢失,服务重新启动之后还能找到消息并再次发送,消息的持久化和消息的发送模型是没有关系的.

    二、队列的持久化(队列默认是持久化的)

      1、Spring的配置文件(配置文件路径和名称 resources/conf/spring/applicationContext.xml)

    <!--配置包扫描,spring扫描该包下面除了@Controller之外的所有注解-->
        <context:component-scan base-package="com.bocom">
            <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
        </context:component-scan>
    
        <!--ActiveMQ与Spring整合相关配置-->
        <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory">
                <!--真正可以生产Connection的ConnectionFactory,由对应的JMS服务商提供-->
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="tcp://192.168.229.129:61618"/>
                </bean>
            </property>
            <property name="maxConnections" value="100"/>
        </bean>
    
        <!--这个是队列目的地,点对点的Queue-->
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!--通过构造注入Queue名-->
            <constructor-arg index="0" value="spring-active-queue"/>
        </bean>
    
        <!--Spring提供的JMS工具类,他可以进行消息发送,接收等-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--传入连接工厂-->
            <property name="connectionFactory" ref="connectionFactory"/>
            <!--传入目的地-->
            <property name="defaultDestination" ref="destinationQueue"/>
            <!--消息自动转换器-->
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
        </bean>
    

      2、队列消息生产者

    @Service
    public class JmsProducer {
        @Autowired
        private JmsTemplate jmsTemplate;
    
        public static void main(String[] args) {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("/conf/spring/applicationContext.xml");
            JmsProducer jmsProducer = (JmsProducer) context.getBean("jmsProducer");
    
            for (int i = 1; i < 7; i++) {
                jmsProducer.jmsTemplate.send(new MessageCreator() {
                    @Override
                    public Message createMessage(Session session) throws JMSException {
                        TextMessage textMessage = session.createTextMessage(
                                "queue:::" + UUID.randomUUID().toString().replace("-", ""));
                        return textMessage;
                    }
                });
            }
            System.out.println("JmsProducer send message ok!!!");
        }
    }
    

      3、执行生产者代码之后,打开ActiveMQ的后台管理界面,可以看到消息已经发送到了ActiveMQ的队列中

      4、由于队列默认是持久化的,所以当消息发送成功之后,Mysql中会保存队列消息的相关记录.

      5、队列消息消费者

    @Service
    public class JmsConsumer {
        @Autowired
        private JmsTemplate jmsTemplate;
    
        public static void main(String[] args) {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("/conf/spring/applicationContext.xml");
            JmsConsumer jmsConsumer = (JmsConsumer) context.getBean("jmsConsumer");
            while (true) {
                String textMessage = (String) jmsConsumer.jmsTemplate.receiveAndConvert();
                System.out.println("接收到的消息是:" + textMessage);
            }
        }
    }
    

      6、执行完消费者代码之后,刷新ActiveMQ后台管理页面,可以看到已经没有待处理的消息了

      7、消费者消费完了消息之后,Mysql将不会保存队列的消息记录

      8、控制台

      

    三、主题的持久化(主题默认是非持久化的,并且需要先启动消费者注册)

      这里设置一个生产者,两个消费者,并且使用自定义消息监听类的方式监听消息.生产者(JmsPorducer01)、消费者1(JmsConsumer01)、消费者2(JmsConsumer02),他们对应的配置文件分别是applicationContext.xml、applicationContext01.xml、applicationContext02.xml

      1、自定义监听类MyMessageListerer(定义了监听之后,消费者只需要开始注册一下,然后就不需要启动消费者了)

    @Service
    public class MyMessageListerer implements MessageListener {
        @Override
        public void onMessage(Message message) {
            if (message != null && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("自定义消息监听接收到的消息是:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

      2、消费者1代码和配置文件(applicationContext01.xml)

      代码:

    @Service
    public class JmsConsumer01 {
        public static void main(String[] args) {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext01.xml");
        }
    }
    

      配置文件:

    <!--配置包扫描,spring扫描该包下面除了@Controller之外的所有注解-->
        <context:component-scan base-package="com">
            <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
        </context:component-scan>
        <!--ActiveMQ与Spring整合相关配置-->
        <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory">
                <!--真正可以生产Connection的ConnectionFactory,由对应的JMS服务商提供-->
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="tcp://192.168.229.129:61618"/>
                </bean>
            </property>
            <property name="maxConnections" value="100"/>
        </bean>
    
        <!--Spring提供的JMS工具类,他可以进行消息发送,接收等-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--传入连接工厂-->
            <property name="connectionFactory" ref="connectionFactory"/>
            <!--传入目的地-->
            <property name="defaultDestination" ref="destinationTopic"/>
            <!--消息自动转换器-->
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
        </bean>
    
        <!--这个是主题的目的地,发布订阅的主题Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <!--通过构造注入Topic名-->
            <constructor-arg index="0" value="spring-active-topic"/>
        </bean>
    
        <!--配置监听程序-->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"></property>
            <!--监听的目的地-->
            <property name="destination" ref="destinationTopic"></property>
            <!--自定义消息监听类MyMessageListener implements MessageListener-->
            <property name="messageListener" ref="myMessageListerer"></property>
            <!--发布订阅模式-->
            <property name="pubSubDomain" value="true"/>
            <!--消息持久化值设置为true-->
            <property name="subscriptionDurable" value="true"/>
            <!--消息接收超时-->
            <property name="receiveTimeout" value="10000"/>
            <!-- 接收者ID -->
            <property name="clientId" value="clientId_001"/>
            <property name="durableSubscriptionName" value="clientId_001"/>
        </bean>

      3、消费者2代码和配置文件(applicationContext02.xml)

      代码

    @Service
    public class JmsConsumer02 {
        public static void main(String[] args) {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext02.xml");
        }
    }

      配置文件

    <!--配置包扫描,spring扫描该包下面除了@Controller之外的所有注解-->
        <context:component-scan base-package="com">
            <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
        </context:component-scan>
    
        <!--ActiveMQ与Spring整合相关配置-->
        <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory">
                <!--真正可以生产Connection的ConnectionFactory,由对应的JMS服务商提供-->
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="tcp://192.168.229.129:61618"/>
                </bean>
            </property>
            <property name="maxConnections" value="100"/>
        </bean>
    
        <!--Spring提供的JMS工具类,他可以进行消息发送,接收等-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--传入连接工厂-->
            <property name="connectionFactory" ref="connectionFactory"/>
            <!--传入目的地-->
            <property name="defaultDestination" ref="destinationTopic"/>
            <!--消息自动转换器-->
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
        </bean>
    
        <!--这个是队列目的地,发布订阅的主题Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <!--通过构造注入Topic名-->
            <constructor-arg index="0" value="spring-active-topic"/>
        </bean>
    
        <!--配置监听程序-->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"></property>
            <!--监听的目的地-->
            <property name="destination" ref="destinationTopic"></property>
            <!--自定义消息监听类MyMessageListener implements MessageListener-->
            <property name="messageListener" ref="myMessageListerer"></property>
            <!--发布订阅模式-->
            <property name="pubSubDomain" value="true"/>
            <!--消息持久化值设置为true-->
            <property name="subscriptionDurable" value="true"/>
            <!--消息接收超时-->
            <property name="receiveTimeout" value="10000"/>
            <!-- 接收者ID -->
            <property name="clientId" value="clientId_002"/>
            <property name="durableSubscriptionName" value="clientId_002"/>
        </bean>

      4、生产者代码和配置文件

      代码

    @Service
    public class JmsProducer01 {
        @Autowired
        private JmsTemplate jmsTemplate;
    
        public static void main(String[] args) {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
            JmsProducer01 jmsProducer = (JmsProducer01) context.getBean("jmsProducer01");
    
            for (int i = 1; i < 7; i++) {
                jmsProducer.jmsTemplate.send(new MessageCreator() {
                    @Override
                    public Message createMessage(Session session) throws JMSException {
                        TextMessage textMessage = session.createTextMessage(
                                "topic:::" + UUID.randomUUID().toString().replace("-", ""));
                        return textMessage;
                    }
                });
            }
            System.out.println("JmsProducer01 send message ok!!!");
        }
    }
    

      配置文件

     <!--配置包扫描,spring扫描该包下面除了@Controller之外的所有注解-->
        <context:component-scan base-package="com">
            <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
        </context:component-scan>
    
        <!--ActiveMQ与Spring整合相关配置-->
        <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory">
                <!--真正可以生产Connection的ConnectionFactory,由对应的JMS服务商提供-->
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL" value="tcp://192.168.229.129:61618"/>
                </bean>
            </property>
            <property name="maxConnections" value="100"/>
        </bean>
    
        <!--Spring提供的JMS工具类,他可以进行消息发送,接收等-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--传入连接工厂-->
            <property name="connectionFactory" ref="connectionFactory"/>
            <!--传入目的地-->
            <property name="defaultDestination" ref="destinationTopic"/>
            <!--消息自动转换器-->
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
            <!--开启订阅模式-->
            <property name="pubSubDomain" value="false"/>
            <property name="sessionAcknowledgeMode" value="1"/>
            <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置explicitQosEnabled为true,默认false-->
            <property name="explicitQosEnabled" value="true"/>
            <!-- 发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
            <property name="deliveryMode" value="2"/>
        </bean>
    
        <!--这个是主题的目的地,发布订阅的主题Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <!--通过构造注入Topic名-->
            <constructor-arg index="0" value="spring-active-topic"/>
        </bean>

      5、ActiveMQ后台管理页面

      6、Mysql中activemq_msgs

      我们启动持久化生产者发布3个数据,ACTIVEMQ_MSGS数据表新增3条数据,消费者消费所有的数据后,ACTIVEMQ_MSGS数据表的数据并没有消失.持久化topic的消息不管是否被消费,是否有消费者,产生的数据永远都存在,且只存储一条.这个是要注意的,持久化的topic大量数据后可能导致性能下降.这里就像公众号一样,消费者消费完后,消息还会保留.

      7、Mysql中activemq_acks(ACTIVEMQ_ACKS数据表,多了一个消费者的身份信息.一条记录代表:一个持久化topic消费者)

     

    四、总结

      

  • 相关阅读:
    Android Studio 2.3.1导出jar文件不能生成release解决办法
    AndroidStudio 3.0 生成jar包的方法
    Android Studio如何打jar包
    Android Studio 如何打JAR包(修订版)
    6款程序员必备的开源中文处理工具
    Qt5.8 下链接 Mysql 错误以及解决方法(无论 Mysql 是什么版本的,64 位 Qt 要用 64 位的 Mysql 驱动,32 位的 Qt 要用 32 位的Mysql 驱动)
    Go 语言如果按这样改进,能不能火过 Java?
    基于 CSP 的设计思想和 OOP 设计思想的异同
    DELPHI下多线程编程的几个思维误区(QDAC)
    如何使用表单
  • 原文地址:https://www.cnblogs.com/xiaomaomao/p/13741357.html
Copyright © 2011-2022 走看看