Apache ActiveMQ是最流行和最强大的开源消息集成模式服务器。
Apache ActiveMQ是速度快,支持多跨语言的客户端和协议,带有易于使用企业集成模式和许多先进的功能在充分支持JMS 1.1和J2EE 1.4。ActiveMQ是Apache下发布Apache 2许可证。
Apache ActiveMQ主要用于模块应用数据交互和分布式应用,支持消息队列,消息发布/订阅,用于异步和服务器交换数据
code案例:
properties 文件配置
1 # activeMq 地址 端口 2 jms_url = tcp://127.0.0.1:61616 3 4 #队列名称 5 jms_test_monitor_data_queue = test_queue
spring xml 配置
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:amq="http://activemq.apache.org/schema/core" 5 xmlns:jms="http://www.springframework.org/schema/jms" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans-4.1.xsd 8 http://www.springframework.org/schema/jms 9 http://www.springframework.org/schema/jms/spring-jms-4.1.xsd 10 http://activemq.apache.org/schema/core 11 http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd 12 "> 13 <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 14 <property name="locations"> 15 <value>classpath:propertiesConfig/test.properties</value> 16 </property> 17 </bean> 18 19 <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 20 <constructor-arg index="0" value="${jms_url}"/> 21 <property name="useAsyncSend" value="true"/> 22 </bean> 23 24 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 25 <constructor-arg ref="amqConnectionFactory"/> 26 <property name="sessionCacheSize" value="20" /> 27 </bean> 28 29 <!-- 定义JmsTemplate的Queue类型 --> 30 <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> 31 <constructor-arg ref="connectionFactory" /> 32 <!-- 非pub/sub模型(发布/订阅),即队列模式 --> 33 <property name="pubSubDomain" value="false" /> 34 </bean> 35 36 <!-- 定义JmsTemplate的Topic类型 --> 37 <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> 38 <constructor-arg ref="connectionFactory" /> 39 <!-- pub/sub模型(发布/订阅) --> 40 <property name="pubSubDomain" value="true" /> 41 </bean> 42 43 <!-- 消息消费者 start--> 44 45 <!-- 定义Queue监听器 --> 46 <!--<bean id="testMessageReceiver" class="com.maven.project.web.jmsMessageOper.TestMonitorQueue"/> 47 <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> 48 <jms:listener destination="${jms_test_monitor_data_queue}" ref="testMessageReceiver"/> 49 </jms:listener-container> --> 50 51 <!-- 定义Topic监听器 --> 52 <!--<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> 53 <jms:listener destination="test.topic" ref="topicReceiver1"/> 54 <jms:listener destination="test.topic" ref="topicReceiver2"/> 55 </jms:listener-container> --> 56 57 <!-- 定义activme 队列监听 --> 58 <!-- 对应的监听类, com.maven.project.web.jmsMessageOper.TestMonitorQueue --> 60 <bean id="testMessageReceiver" class="com.maven.project.web.jmsMessageOper.TestMonitorQueue"/> 61 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 62 <property name="connectionFactory" ref="connectionFactory"/> 63 <property name="destinationName" value="${jms_test_monitor_data_queue}"/> 64 <property name="messageListener" ref="testMessageReceiver"/> 65 </bean> 66 67 <!-- 消息消费者 end --> 68 </beans>
activemq 监听类
1 package com.maven.project.web.jmsMessageOper; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 public class TestMonitorQueue implements MessageListener { 9 10 public void onMessage(Message message) { 11 if (message instanceof TextMessage) { 12 TextMessage textMessage = (TextMessage) message; 13 try { 14 System.out.println("============"+textMessage.getText()); 15 } catch (JMSException e) { 16 e.printStackTrace(); 17 } 18 } 19 } 20 }
activemq 消息发送类
1 package com.maven.project.web.jmsMessageOper; 2 3 import java.io.Serializable; 4 import java.util.Map; 5 6 import javax.jms.BytesMessage; 7 import javax.jms.JMSException; 8 import javax.jms.MapMessage; 9 import javax.jms.Message; 10 import javax.jms.Session; 11 import javax.jms.StreamMessage; 12 13 import org.springframework.beans.factory.annotation.Autowired; 14 import org.springframework.jms.core.JmsTemplate; 15 import org.springframework.jms.core.MessageCreator; 16 import org.springframework.stereotype.Controller; 17 18 @Controller 19 public class MessageSender { 20 21 @Autowired 22 private JmsTemplate jmsQueueTemplate; 23 24 public void sendTextMessage(final String queueName, final String txtMessage) { 25 jmsQueueTemplate.send(queueName, new MessageCreator() { 26 public Message createMessage(Session session) throws JMSException { 27 return session.createTextMessage(txtMessage); 28 } 29 }); 30 } 31 32 public void sendObjectMessage(final String queueName, final Object objectMessage) { 33 jmsQueueTemplate.send(queueName, new MessageCreator() { 34 public Message createMessage(Session session) throws JMSException { 35 return session.createObjectMessage((Serializable) objectMessage); 36 } 37 }); 38 } 39 40 public void sendMapMessage(final String queueName, final Map<String, Object> mapMessage) { 41 jmsQueueTemplate.send(queueName, new MessageCreator() { 42 @SuppressWarnings("unchecked") 43 public Message createMessage(Session session) throws JMSException { 44 MapMessage mapMessage = session.createMapMessage(); 45 for (Map.Entry<String, Object> entry : ((Map<String, Object>) mapMessage).entrySet()) { 46 mapMessage.setObject(entry.getKey(), entry.getValue()); 47 } 48 return mapMessage; 49 } 50 }); 51 } 52 53 public void sendByteMessage(final String queueName, final byte[] message) { 54 jmsQueueTemplate.send(queueName, new MessageCreator() { 55 public Message createMessage(Session session) throws JMSException { 56 BytesMessage bytesMessage = session.createBytesMessage(); 57 bytesMessage.writeBytes(message); 58 return bytesMessage; 59 } 60 }); 61 } 62 63 public void sendStreamMessage(final String queueName, final Object message) { 64 jmsQueueTemplate.send(queueName, new MessageCreator() { 65 public Message createMessage(Session session) throws JMSException { 66 StreamMessage streamMessage = session.createStreamMessage(); 67 streamMessage.writeObject(message); 68 return streamMessage; 69 } 70 }); 71 } 72 }
消息发送调用
1 package com.maven.project.web.action; 2 3 import javax.servlet.http.HttpServletRequest; 4 import javax.servlet.http.HttpServletResponse; 5 6 import org.springframework.beans.factory.annotation.Autowired; 7 import org.springframework.stereotype.Controller; 8 import org.springframework.web.bind.annotation.RequestMapping; 9 13 import com.maven.project.tools.utils.SenderMessageQueueName; 14 import com.maven.project.web.jmsMessageOper.MessageSender; 15 16 @Controller 17 @RequestMapping("/user") 18 public class UserLoginAction { 19 20 @Autowired 21 private MessageSender messageSender; 22 32 @RequestMapping("/login") 33 public void login(HttpServletRequest request,HttpServletResponse response){ 35 messageSender.sendTextMessage("test_queue","消息发送内容"); // test_queue 消息队列名称 37 } 38 }
maven pom 配置
1 <!-- activemq相关依赖 --> 2 <dependency> 3 <groupId>org.apache.activemq</groupId> 4 <artifactId>activemq-core</artifactId> 5 <version>5.7.0</version> 6 </dependency>