zoukankan      html  css  js  c++  java
  • ActiveMQ学习总结(10)——ActiveMQ采用Spring注解方式发送和监听

    对于ActiveMQ消息的发送,原声的api操作繁琐,而且如果不进行二次封装,打开关闭会话以及各种创建操作也是够够的了。那么,Spring提供了一个很方便的去收发消息的框架,spring jms。整合Spring后,代码不仅变得非常优雅,而且易用性和扩展性更好。

    1. maven依赖

    <!-- activemq -->
    <dependency>
    	<groupId>org.apache.xbean</groupId>
    	<artifactId>xbean-spring</artifactId>
    	<version>3.16</version>
    </dependency>
    <dependency>
    	<groupId>org.springframework</groupId>
    	<artifactId>spring-jms</artifactId>
    	<version>${springframework.version}</version>
    </dependency>
    <dependency>
    	<groupId>org.apache.activemq</groupId>
    	<artifactId>activemq-all</artifactId>
    	<version>${activemq.version}</version>
    </dependency>
    

    2.命名空间引入

    <?xml version="1.0" encoding="UTF-8"?>
    <!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    	xmlns:jms="http://www.springframework.org/schema/jms"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans   
    		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
    		http://www.springframework.org/schema/context   
    		http://www.springframework.org/schema/context/spring-context-3.2.xsd
    		http://www.springframework.org/schema/jms
    		http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
    		http://activemq.apache.org/schema/core
    		http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
    

    3. Xml配置

    	<amq:connectionFactory id="jmsConnectionFactory" brokerURL="tcp://${activemq.ip}:61616" userName="${activemq.username}" password="${activemq.password}" />
    <bean id="jmsConnectionFactoryExtend" class="org.springframework.jms.connection.CachingConnectionFactory">
    	<constructor-arg ref="jmsConnectionFactory" />
    	<property name="sessionCacheSize" value="100" />
    </bean>
    <!-- 消息处理器 -->
    <bean id="jmsMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter" />
    <!-- ====Producer side start==== -->
    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
    	<constructor-arg ref="jmsConnectionFactoryExtend" />
    	<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
    	<property name="pubSubDomain" value="false" />
    	<property name="messageConverter" ref="jmsMessageConverter"></property>
    </bean>
    <!-- 定义JmsTemplate的Topic类型 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
    	<constructor-arg ref="jmsConnectionFactoryExtend" />
    	<!-- pub/sub模型(发布/订阅) -->
    	<property name="pubSubDomain" value="true" />
    	<property name="messageConverter" ref="jmsMessageConverter"></property>
    </bean>
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="jmsConnectionFactoryExtend" acknowledge="auto" concurrency="5-10">
    	<jms:listener destination="testqueue" ref="queueReciver" />
    </jms:listener-container>
    
    
    

    第一个是配置我们的mq连接,ip+端口号,帐号密码的信息。

    第二个是引入spring的mq连接池。可以配置缓存的连接数。

    第三个是消息处理器,Spring默认提供了基于Jdk Serializable的消息处理和MappingJackson2MessageConventer,其实这两个挺常用,在Spring Redis中,在Spring MVC中,都有着这几种conventer的身影。

    下面是两个发送消息的模版类,类似于之前讲过的RedisTemplate。向其注入上面定义的消息处理器,代码中我们会用到。(其实类中已经判断如果不进行注入就设置一个默认的,但是自己注入的话,方便我们控制)

    listener-container是Spring提供的一个监听器容器,用于统一控制我们的监听类来接收处理消息。这里面有一些配置,schema有说明。可以配置响应模式,消费者数量等。开启多消费者,有助于加快队列处理速度。

    4.注解方式的实现

    如果要用注解的方式,就不需要在xml中自己定义消息监听容器了。只需要加入以下的代码:

    <bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
            <property name="connectionFactory" ref="jmsConnectionFactoryExtend"/>
        </bean>
        
        <!-- 监听注解支持 -->
        <jms:annotation-driven/>

    这样,配置我们消费处理类上的@listener注解,即可监听对应的queue或者topic消息。

    5.生产者代码

    队列消息:

    @Resource
    @Component("queueSender")
    public class QueueSender {
    	@Resource(name = "jmsQueueTemplate")
    	private JmsTemplate jmsQueueTemplate;// 通过@Qualifier修饰符来注入对应的bean
    	public void send(String destination, final Object message) {
    		jmsQueueTemplate.send(destination, new MessageCreator() {
    			@Override
    			public Message createMessage(Session session) throws JMSException {
    				return jmsQueueTemplate.getMessageConverter().toMessage(message, session);
    			}
    		});
    	}
    }
    

    订阅消息:

    @Component
    public class TopicSender {
    	@Resource(name="jmsTopicTemplate")
    	private JmsTemplate jmsTemplate;
    	/**
    	 * 发送一条消息到指定的队列(目标)
    	 * @param queueName 队列名称
    	 * @param message 消息内容
    	 */
    	public void publish(String destination,final Object message){
    		jmsTemplate.send(destination, new MessageCreator() {
    			@Override
    			public Message createMessage(Session session) throws JMSException {
    				return jmsTemplate.getMessageConverter().toMessage(message, session);
    			}
    		});
    	}
    }
    

    6.消费者代码

    package cn.test.activemq.consumer.queue;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.jms.listener.adapter.MessageListenerAdapter;
    import org.springframework.jms.support.converter.MessageConversionException;
    import org.springframework.stereotype.Component;
    import cn.test.MqBean;
    import cn.test.activemq.message.types.QueueDefination;
    /**
     * @author Han
     */
    @Component("spqueueconsumertest")
    public class SpringQueueReciverTest extends MessageListenerAdapter{
    	private static final Logger log = LoggerFactory.getLogger(SpringQueueReciverTest.class);
    	@JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")
    	public void onMessagehehe(Message message, Session session) throws JMSException {
    		try {
    			MqBean bean = (MqBean) getMessageConverter().fromMessage(message);
    			System.out.println(bean.getName());
    			System.out.println(session);
    			message.acknowledge();
    			message.acknowledge();
    		} catch (MessageConversionException | JMSException e) {
    			e.printStackTrace();
    		}
    	}
    }
    

    上面的@JmsListener(destination=QueueDefination.TEST_QUEUE,concurrency="5-10")是在用注解方式监听的时候加入。如果用xml配置容易,可以忽略。

    附上MqBean

    public class MqBean implements Serializable{
    	private Integer age;
    	private String name;
    	public Integer getAge() {
    		return age;
    	}
    	public void setAge(Integer age) {
    		this.age = age;
    	}
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    }
    

    运行效果截图:

  • 相关阅读:
    Leetcode Binary Tree Level Order Traversal
    Leetcode Symmetric Tree
    Leetcode Same Tree
    Leetcode Unique Paths
    Leetcode Populating Next Right Pointers in Each Node
    Leetcode Maximum Depth of Binary Tree
    Leetcode Minimum Path Sum
    Leetcode Merge Two Sorted Lists
    Leetcode Climbing Stairs
    Leetcode Triangle
  • 原文地址:https://www.cnblogs.com/zhanghaiyang/p/7212787.html
Copyright © 2011-2022 走看看