zoukankan      html  css  js  c++  java
  • Spring与ActiveMQ整合

    在实际的项目中如果使用原生的ActiveMQ API开发会比较麻烦,因为需要创建连接工厂,创建连接等,我们应该使用一个模板来做这些繁琐的事情,Spring帮我们做了!

    Spring提供了对JMS的支持,需要添加Spring支持jms的包,如下:

    复制代码
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>4.0.0.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.9.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.9.0</version>
    </dependency>
    复制代码

    在spring-amq.xml中配置JmsTemplate(这样的配置没啥问题,在实际的项目中就是这样配置的)

    复制代码
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:aop="http://www.springframework.org/schema/aop"
           xmlns:tx="http://www.springframework.org/schema/tx"
           xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-3.0.xsd
            http://www.springframework.org/schema/aop
            http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
            http://www.springframework.org/schema/tx
            http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
    
        <context:component-scan base-package="com.winner.spring"/>
    
        <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
              destroy-method="stop">
            <property name="connectionFactory">
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL">
                        <value>tcp://192.168.0.129:61616</value>
                    </property>
                </bean>
            </property>
            <property name="maxConnections" value="100"></property>
        </bean>
    
        <!--使用缓存可以提升效率-->
        <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="jmsFactory"/>
            <property name="sessionCacheSize" value="1"/>
        </bean>
    
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="cachingConnectionFactory"/>
            <property name="messageConverter">
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
            </property>
        </bean>
    
        <!--测试Queue,队列的名字是spring-queue-->
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <!--<constructor-arg index="0" value="spring-queue"/>-->
            <constructor-arg name="name" value="spring-queue"/>
        </bean>
    
        <!--测试Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="spring-topic"/>
        </bean>
    
    </beans>
    复制代码

    生产者

    复制代码
    @Service
    public class AMQSenderServiceImpl implements AmqSenderService {
    
        private static final Logger logger = LoggerFactory.getLogger(AMQSenderServiceImpl.class);
    
        @Resource(name = "jmsTemplate")
        private JmsTemplate jmsTemplate;
    
        //目的地队列的明证,我们要向这个队列发送消息
        @Resource(name = "destinationQueue")
        private Destination destination;
    
        //向特定的队列发送消息
        @Override
        public void sendMsg(MqParamDto mqParamDto) {
            final String msg = JSON.toJSONString(mqParamDto);
            try {
                logger.info("将要向队列{}发送的消息msg:{}", destination, msg);
                jmsTemplate.send(destination, new MessageCreator() {
                    @Override
                    public Message createMessage(Session session) throws JMSException {
                        return session.createTextMessage(msg);
                    }
                });
    
            } catch (Exception ex) {
                logger.error("向队列{}发送消息失败,消息为:{}", destination, msg);
            }
    
        }
    }
    复制代码

    运行结果:

    如果是Topic的话就换一下,下面的spring-topic是主题的名字。

    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-topic" />
    </bean>

    其他不用改!

    如果想要在Spring中配置消费者的话,就不需要再启动接收的客户端了,这样在测试的时候可以不需要写消费者的代码,因为我们要么是生产者要么是消费者!

    可以通过配置一个listener来实现,实际项目中采用的是这种方式

    复制代码
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:aop="http://www.springframework.org/schema/aop"
           xmlns:tx="http://www.springframework.org/schema/tx"
           xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-3.0.xsd
            http://www.springframework.org/schema/aop
            http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
            http://www.springframework.org/schema/tx
            http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
    
        <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
              destroy-method="stop">
            <property name="connectionFactory">
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                    <property name="brokerURL">
                        <value>tcp://192.168.0.129:61616</value>
                    </property>
                </bean>
            </property>
            <property name="maxConnections" value="100"></property>
        </bean>
    
        <!--使用缓存可以提升效率-->
        <bean id="cachingConnectionFactory"
              class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="jmsFactory"/>
            <property name="sessionCacheSize" value="1"/>
        </bean>
    
        <!--测试Queue,队列的名字是spring-queue-->
        <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg index="0" value="spring-queue"/>
        </bean>
    
        <!--测试Topic-->
        <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="spring-topic"/>
        </bean>
    
        <bean id="jmsContainer"
              class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="cachingConnectionFactory"/>
            <property name="destination" ref="destinationQueue"/>
            <property name="messageListener" ref="messageListener"/>
        </bean>
    
        <!--消息监听器-->
        <bean id="messageListener" class="com.winner.spring.MyMessageListener">
        </bean>
    
    </beans>
    复制代码

     监听器:

    复制代码
    public class MyMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message msg) {
            if (msg instanceof TextMessage) {
                try {
                    TextMessage txtMsg = (TextMessage) msg;
                    String message = txtMsg.getText();
                    //实际项目中拿到String类型的message(通常是JSON字符串)之后,
                    //会进行反序列化成对象,做进一步的处理
                    System.out.println("receive txt msg===" + message);
                } catch (JMSException e) {
                    throw new RuntimeException(e);
                }
            } else {
                throw new IllegalArgumentException("Message must be of type TextMessage");
            }
        }
    }
    复制代码

    不需要写消费者的代码就可以知道消息有没有推送成功

     

     ActiveMQ结合Spring开发最佳实践和建议:

    1:Camel框架支持大量的企业集成模式,可以大大简化集成组件间的大量服务和复杂的消息流。而Spring框架更注重简单性,仅仅支持基本的最佳实践。
    2:Spring消息发送的核心架构是JmsTemplate,隔离了像打开、关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑。但是
    JmsTemplate损害了ActiveMQ的PooledConnectionFactory对session和消息producer的缓存机制而带来的性能提升。
    3:新的Spring里面,可以设置org.springframework.jms.connection.CachingConnectionFactory的sessionCacheSize,或者干脆使用ActiveMQ的PooledConnectionFactory
    4:不建议使用JmsTemplate的receive()调用,因为在JmsTemplate上的所有调用都是同步的,这意味着调用线程需要被阻塞,直到方法返回,这对性能影响很大
    5:请使用DefaultMessageListenerContainer,它允许异步接收消息并缓存session和消息consumer,而且还可以根据消息数量动态的增加或缩减监听器的数量

  • 相关阅读:
    acm寒假特辑1月20日 CodeForces
    acm寒假特辑1月24日 HDU
    acm寒假特辑1月25日HDU
    acm寒假特辑1月26日HDU
    acm寒假特辑1月22日HDU
    acm寒假特辑1月28日HDU
    ubuntu14.04安装notepadqq
    ntpd vs. ntpdate
    centos7 防火墙firewalld
    git 自动补全 (git auto completion)
  • 原文地址:https://www.cnblogs.com/a1304908180/p/10240455.html
Copyright © 2011-2022 走看看