发布订阅模式和PTP方式不同之处为后者依赖于一个Topic话题:
package com.thunisoft.jms.mine.topic; import java.util.HashMap; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TopicPublisher; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * JMS生产者 * * @author zhangxsh * */ public class Producer { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //根据Topic创建目标地 Destination destination = session.createTopic("TestTopic"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 1; i <= 10; i++) { ObjectMessage message = session.createObjectMessage(); HashMap m = new HashMap(); m.put("key" + i, i); message.setObject(m); // 发送消息到目的地方 // System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); producer.send(message); } session.commit(); } }
订阅者(同样需要注册一个Listener):
package com.thunisoft.jms.mine.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicSubscriber { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( "tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("TestTopic"); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { ObjectMessage tm = (ObjectMessage) message; try { System.out.println("Received message: " + tm.getObject()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); // session.close(); // connection.stop(); // connection.close(); } }