zoukankan      html  css  js  c++  java
  • spring集成ActiveMQ

    在实际的项目中如果使用原生的ActiveMQ API开发会比较麻烦,因为需要创建连接工厂,创建连接等,我们应该使用一个模板来做这些繁琐的事情,Spring帮我们做了!

    Spring提供了对JMS的支持,需要添加Spring支持jms的包,如下:

    复制代码
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>4.0.0.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.9.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.9.0</version>
    </dependency>
    复制代码

    在spring-amq.xml中配置JmsTemplate(这样的配置没啥问题,在实际的项目中就是这样配置的)

    复制代码
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:aop="http://www.springframework.org/schema/aop"
           xmlns:tx="http://www.springframework.org/schema/tx"
           xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-3.0.xsd
            http://www.springframework.org/schema/aop
            http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
            http://www.springframework.org/schema/tx
            http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
    
        <context:component-scan base-package="com.winner.spring"/>
    
        <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
              destroy-method="stop">
            <property name="connectionFactory">
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL">
                        <value>tcp://192.168.0.129:61616</value>
                    </property>
                </bean>
            </property>
            <property name="maxConnections" value="100"></property>
        </bean>
    
        <!--使用缓存可以提升效率-->
        <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="jmsFactory"/>
            <property name="sessionCacheSize" value="1"/>
        </bean>
    
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="cachingConnectionFactory"/>
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
        </bean>
    
        <!--测试Queue,队列的名字是spring-queue-->
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!--<constructor-arg index="0" value="spring-queue"/>-->
            <constructor-arg name="name" value="spring-queue"/>
        </bean>
    
        <!--测试Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="spring-topic"/>
        </bean>
    
    </beans>
    复制代码

    生产者

    复制代码
    @Service
    public class AMQSenderServiceImpl implements AmqSenderService {
    
        private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);
    
        @Resource(name = "jmsTemplate")
        private JmsTemplate jmsTemplate;
    
        //目的地队列的明证,我们要向这个队列发送消息
        @Resource(name = "destinationQueue")
        private Destination destination;
    
        //向特定的队列发送消息
        @Override
        public void sendMsg(MqParamDto mqParamDto) {
            final String msg = JSON.toJSONString(mqParamDto);
            try {
                logger.info("将要向队列{}发送的消息msg:{}", destination, msg);
                jmsTemplate.send(destination, new MessageCreator() {
                    @Override
                    public Message createMessage(Session session) throws JMSException {
                        return session.createTextMessage(msg);
                    }
                });
    
            } catch (Exception ex) {
                logger.error("向队列{}发送消息失败,消息为:{}", destination, msg);
            }
    
        }
    }
    复制代码

    运行结果:

    如果是Topic的话就换一下,下面的spring-topic是主题的名字。

    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-topic" />
    </bean>

    其他不用改!

    如果想要在Spring中配置消费者的话,就不需要再启动接收的客户端了,这样在测试的时候可以不需要写消费者的代码,因为我们要么是生产者要么是消费者!

    可以通过配置一个listener来实现,实际项目中采用的是这种方式

    复制代码
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:aop="http://www.springframework.org/schema/aop"
           xmlns:tx="http://www.springframework.org/schema/tx"
           xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-3.0.xsd
            http://www.springframework.org/schema/aop
            http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
            http://www.springframework.org/schema/tx
            http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
    
        <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
              destroy-method="stop">
            <property name="connectionFactory">
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL">
                        <value>tcp://192.168.0.129:61616</value>
                    </property>
                </bean>
            </property>
            <property name="maxConnections" value="100"></property>
        </bean>
    
        <!--使用缓存可以提升效率-->
        <bean id="cachingConnectionFactory"
              class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="jmsFactory"/>
            <property name="sessionCacheSize" value="1"/>
        </bean>
    
        <!--测试Queue,队列的名字是spring-queue-->
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg index="0" value="spring-queue"/>
        </bean>
    
        <!--测试Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="spring-topic"/>
        </bean>
    
        <bean id="jmsContainer"
              class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="cachingConnectionFactory"/>
            <property name="destination" ref="destinationQueue"/>
            <property name="messageListener" ref="messageListener"/>
        </bean>
    
        <!--消息监听器-->
        <bean id="messageListener" class="com.winner.spring.MyMessageListener">
        </bean>
    
    </beans>
    复制代码

     监听器:

    复制代码
    public class MyMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message msg) {
            if (msg instanceof TextMessage) {
                try {
                    TextMessage txtMsg = (TextMessage) msg;
                    String message = txtMsg.getText();
                    //实际项目中拿到String类型的message(通常是JSON字符串)之后,
                    //会进行反序列化成对象,做进一步的处理
                    System.out.println("receive txt msg===" + message);
                } catch (JMSException e) {
                    throw new RuntimeException(e);
                }
            } else {
                throw new IllegalArgumentException("Message must be of type TextMessage");
            }
        }
    }
    复制代码

    不需要写消费者的代码就可以知道消息有没有推送成功

     

     ActiveMQ结合Spring开发最佳实践和建议:

    1:Camel框架支持大量的企业集成模式,可以大大简化集成组件间的大量服务和复杂的消息流。而Spring框架更注重简单性,仅仅支持基本的最佳实践。
    2:Spring消息发送的核心架构是JmsTemplate,隔离了像打开、关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑。但是
    JmsTemplate损害了ActiveMQ的PooledConnectionFactory对session和消息producer的缓存机制而带来的性能提升。
    3:新的Spring里面,可以设置org.springframework.jms.connection.CachingConnectionFactory的sessionCacheSize,或者干脆使用ActiveMQ的PooledConnectionFactory
    4:不建议使用JmsTemplate的receive()调用,因为在JmsTemplate上的所有调用都是同步的,这意味着调用线程需要被阻塞,直到方法返回,这对性能影响很大
    5:请使用DefaultMessageListenerContainer,它允许异步接收消息并缓存session和消息consumer,而且还可以根据消息数量动态的增加或缩减监听器的数量

  • 相关阅读:
    mysql报错:java.sql.SQLException: The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents more than one time zone.
    MD5登陆密码的生成
    15. 3Sum、16. 3Sum Closest和18. 4Sum
    11. Container With Most Water
    8. String to Integer (atoi)
    6. ZigZag Conversion
    5. Longest Palindromic Substring
    几种非线性激活函数介绍
    AI初探1
    AI初探
  • 原文地址:https://www.cnblogs.com/cnndevelop/p/9908263.html
Copyright © 2011-2022 走看看