一、本文章包含的内容
1、列举了ActiveMQ中通过Topic方式发送、消费队列的代码(监听者有两个,分别是topicMessageListener1、topicMessageListener2)
2、spring+activemq方式
二、配置信息
1、activemq的pom.xml信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<!--activemq Begin--> < dependency > < groupId >org.springframework</ groupId > < artifactId >spring-jms</ artifactId > < version >${spring.version}</ version > </ dependency > <!-- <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>${spring.version}</version> </dependency>--> < dependency > < groupId >org.apache.activemq</ groupId > < artifactId >activemq-all</ artifactId > < version >5.14.0</ version > </ dependency > <!--activemq End--> |
2、activemq的配置文件:spring-jms.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
<!-- 启用spring mvc 注解 --> < context:component-scan base-package = "org.soa.test.activemq" /> <!-- 配置JMS连接工厂 --> < bean id = "connectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory" > <!--解决接收消息抛出异常:javax.jms.JMSException: Failed to build body from content. Serializable class not available to broke--> < property name = "trustAllPackages" value = "true" /> <!-- 是否异步发送 --> < property name = "useAsyncSend" value = "true" /> </ bean > <!-- Topic模式 Begin --> <!-- 定义消息队列名称 --> < bean id = "topicDestination" class = "org.apache.activemq.command.ActiveMQTopic" > < constructor-arg > < value >topic1</ value > </ constructor-arg > </ bean > <!-- 配置JMS模板,Spring提供的JMS工具类,它发送、接收消息。(Topic) --> < bean id = "jmsTemplateTopic" class = "org.springframework.jms.core.JmsTemplate" > < property name = "connectionFactory" ref = "connectionFactory" /> < property name = "defaultDestination" ref = "topicDestination" /> <!-- 订阅发布模式 --> < property name = "pubSubDomain" value = "true" /> </ bean > <!-- 消息主题监听者 和 主题监听容器 可以配置多个,即多个订阅者 --> <!-- 消息主题监听者(Topic) --> < bean id = "topicMessageListener1" class = "org.soa.test.activemq.topics.TopicMessageListener1" /> < bean id = "topicMessageListener2" class = "org.soa.test.activemq.topics.TopicMessageListener2" /> <!-- Topic接收监听(Topic)Topic的第1个监听者 --> < bean id = "topicJmsContainer" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" > < property name = "connectionFactory" ref = "connectionFactory" /> < property name = "destination" ref = "topicDestination" /> < property name = "messageListener" ref = "topicMessageListener1" /> </ bean > <!-- Topic接收监听(Topic)Topic的第2个监听者--> < bean id = "topicJmsContainer2" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" > < property name = "connectionFactory" ref = "connectionFactory" /> < property name = "destination" ref = "topicDestination" /> < property name = "messageListener" ref = "topicMessageListener2" /> </ bean > <!-- Topic模式 End --> |
三、队列发送端及测试程序
1、发送代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package org.soa.test.activemq.topics; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * Created by JamesC on 16-9-22. */ @Component public class TopicProvider { @Autowired @Qualifier ( "jmsTemplateTopic" ) private JmsTemplate topicJmsTemplate; /** * 向指定的topic发布消息 * * @param topic * @param msg */ public void publish( final Destination topic, final String msg) { topicJmsTemplate.send(topic, new MessageCreator() { public Message createMessage(Session session) throws JMSException { System.out.println( "topic name 是" + topic.toString() + ",发布消息内容为: " + msg); return session.createTextMessage(msg); } }); } } |
2、监听代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
//TopicMessageListener1 package org.soa.test.activemq.topics; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * Created by JamesC on 16-9-22. */ @Component public class TopicMessageListener1 implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println( "TopicMessageListener_1 " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } } //TopicMessageListener2 package org.soa.test.activemq.topics; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * Created by JamesC on 16-9-22. */ @Component public class TopicMessageListener2 implements MessageListener { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println( "TopicMessageListener_2 " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } } |
3、测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package org.soa.test.activemq.topics; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.jms.Destination; /** * Created by JamesC on 16-9-22. */ @RunWith (SpringJUnit4ClassRunner. class ) @ContextConfiguration ( "/spring-jms.xml" ) public class TopicTest { @Autowired private Destination queueDestination; @Autowired private TopicProvider provider; //向默认Topic发消息 @Test public void send() { //坑爹的是:这里不要用ActiveMQQueue,会默认按Queue发送;要使用ActiveMQTopic,按Topic发送 //ActiveMQQueue des = new ActiveMQQueue("topic1"); ActiveMQTopic des = new ActiveMQTopic( "topic1" ); provider.publish(des, "topic消息示例" ); } } |