1.在这个项目中新增两个java类,主题生产者和主题消费者:
2.和点对点的代码差别并不大,所以将消费者和生产者的分别代码拷入新增的java类中,再修改就好了。
appProducerTopic代码:标红字体是做出了修改,由创建队列改为了创建主题。
package com.liu.jms; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class appProducerTopic { private static final String url = "tcp://127.0.0.1:61616";//actvemq的服务器tcp连接方式 private static final String topicName = "topic-test";//定义主题的名称 public static void main(String[] args) throws JMSException { //1.创建connectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2.创建connection Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.创建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建destination Destination destination = session.createTopic(topicName); //6.创建生产者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { TextMessage textMessage = session.createTextMessage("test" + i); //7.发送消息 producer.send(textMessage); System.out.println("发送消息" + textMessage.getText()); } //8.关闭连接 connection.close(); } }
appConsumerTopic代码:标红字体是做出了修改,由创建队列改为了创建主题。
package com.liu.jms; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class appConsumerTopic { private static final String url = "tcp://127.0.0.1:61616"; private static final String topicName = "topic-test";//定义主题的名称 public static void main(String[] args) throws JMSException { //1.创建connectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2.创建connection Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.创建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建destination Destination destination = session.createTopic(topicName); //6.创建消费者 MessageConsumer consumer = session.createConsumer(destination); //7.创建一个监听器 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收到的消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.关闭连接(监听器是异步的还没有监听到消息的时候,就关闭连接了) //connection.close(); } }
3.测试
首先启动消费者这个java类,观察控制台,如下图:
接着启动生产者的java类,观察控制台,如下图:生产了一百条消息。
此时切换至消费的控制台,观察控制台,如下图:已经打印出了一百条消息了,说明消费者已经接受到全部一百条消息。
6.打开activemq的控制台查看topics:(http://127.0.0.1:8161/admin/topics.jsp)如下图所示:有一个名字是我们设置的queue-test的主题,消费者也有一个就是我们创建的那个消费者类,主题中有一百条消息,被移除了一百条,也就是上面所说的,消费者接收到了这100条全部的消息。
7.那么如果我启动了两个订阅相同的消费者呢?为了结果能清晰,重启activemq服务,关掉之前的Java类启动,然后启动两边消费者,再启动一个生产者。如下图:生产者生产了100条消息。
8.分别看看两个消费者的接收消息,如下两张图:两个消费者都接受到了一模一样的100条消息。
9.总结:主题订阅发布模式,有多个消费的订阅相同时,消费者不会相互相互影响,都会分别接收到生产者的全部消息。