稍微改一下代码就好啦
package activeMQ2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /* * 消息生产者-发布者 */ public class JMSProducer { static String userName=ActiveMQConnection.DEFAULT_USER; static String password=ActiveMQConnection.DEFAULT_PASSWORD; static String brokerURL=ActiveMQConnection.DEFAULT_BROKER_URL; static int sendnum=10; public static void main(String[] args) { // TODO Auto-generated method stub ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(userName, password, brokerURL); Connection connection = null ; Session session;// 会话,接受或发送消息的线程 Destination destination; MessageProducer messageProducer; try { connection=connectionFactory.createConnection(); connection.start(); /* * Session.AUTO_ACKNOWLEDGE 用户成功从receive方法返回的时候,或从MessageListener.onMessage方法 * 成功返回时,会话会自动确认客户收到消息 * Session.CLIENT_ACKNOWLEDGE 客户通过消息的acknowledge方法确认,注意一个被消费的消息将自动确认所 * 有已被会话消费的消息。 * Session.SESSION_TRANSACTED 会话迟钝的确认消息的提交 */ session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE); destination=session.createTopic("FirstTopic1");//创建消息 messageProducer=session.createProducer(destination); sendMessage(session, messageProducer);//发送消息 session.commit();//提交 } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(connection!=null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } /* * 产生消息并发消息 */ public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException { for(int i=0;i<sendnum;i++){ TextMessage textMessage=session.createTextMessage("Message"+i); messageProducer.send(textMessage); System.out.println("开始发布消息Message"+i); } // TODO Auto-generated method stub } }
package activeMQ2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /* * 消息消费-订阅者1 */ public class JMSConsumer { static String userName=ActiveMQConnection.DEFAULT_USER; static String password=ActiveMQConnection.DEFAULT_PASSWORD; static String brokerURL=ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(userName, password, brokerURL); Connection connection = null ; Session session;// 会话,接受或发送消息的线程 Destination destination; MessageConsumer messageConsumer; try { connection=connectionFactory.createConnection(); connection.start(); session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination=session.createTopic("FirstTopic1");//创建连接的消息队列 messageConsumer=session.createConsumer(destination); messageConsumer.setMessageListener(new Listener());//注册消息监听 } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } // while(true){ // // } } }
package activeMQ2; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /* * 消息监听-订阅者1 */ public class Listener implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("订阅者1收到消息"+((TextMessage) message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
再来一个
package activeMQ2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /* * 消息消费-订阅者2 */ public class JMSConsumer2 { static String userName=ActiveMQConnection.DEFAULT_USER; static String password=ActiveMQConnection.DEFAULT_PASSWORD; static String brokerURL=ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(userName, password, brokerURL); Connection connection = null ; Session session;// 会话,接受或发送消息的线程 Destination destination; MessageConsumer messageConsumer; try { connection=connectionFactory.createConnection(); connection.start(); session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination=session.createTopic("FirstTopic1");//创建连接的消息队列 messageConsumer=session.createConsumer(destination); messageConsumer.setMessageListener(new Listener2());//注册消息监听 } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } // while(true){ // // } } }
package activeMQ2; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /* * 消息监听-订阅者2 */ public class Listener2 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("订阅者2收到消息"+((TextMessage) message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
先订阅
在发布消息