------------------------------------------------
开发一个JMS的基本步骤如下:
1.创建一个JMS connection factory
2.通过connection factory来创建JMS connection
3.启动JMS connection
4.通过connection创建JMS session
5.创建JMS destination
6.创建JMS producer 或者创建JMS message,并设置destination
7.创建JMS consumer 或者注册一个JMS message listener
8.发送或者接受JMS message
9.关闭所有的JMS资源(connection、session、producer、consumer等)
可以参考下图:
非持久的Topic消息示例
对于非持久化的消息,当发送方发送消息的时候:
如果接收方不在线,则接收方永远也收不到这些消息了
如果接收方在线,则接收方会收到这些消息
1、消息发送程序
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.Destination; 4 import javax.jms.MessageProducer; 5 import javax.jms.Session; 6 import javax.jms.TextMessage; 7 8 import org.apache.activemq.ActiveMQConnectionFactory; 9 10 /** 11 * 非持久化Topic消息发送者 12 * @author Administrator 13 * 14 */ 15 public class NoPersistenceSender { 16 public static void main(String[] args) throws Exception { 17 //创建一个JMS connection factory 18 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616"); 19 //通过connection factory来创建JMS connection 20 Connection connection = connectionFactory.createConnection(); 21 //启动JMS connection 22 connection.start(); 23 //通过connection创建JMS session 24 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 25 //创建JMS destination 26 Destination destination = session.createTopic("noPersistenceTopic"); 27 //创建JMS producer 28 MessageProducer producer = session.createProducer(destination); 29 30 for(int i = 0;i < 10;i++){ 31 TextMessage message = session.createTextMessage("message-"+i); 32 //发送message 33 producer.send(message); 34 } 35 //关闭所有的JMS资源 36 session.commit(); 37 session.close(); 38 connection.close(); 39 } 40 }
运行完消息发送程序后,可以访问192.168.1.81:8161
2、消息接收程序
对于非持久的Topic消息的接收需要注意以下几点:
a.接收程序必须在线,然后消息发送方再发送消息,接收程序才能接收到消息
b.由于不知道消息发送方要发送多少条消息,所以利用while循环的方式来接收消息
c.如果接收程序不在线,此时发送程序发送了消息的话,则该消息将永远不会被接收方收到。
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.Destination; 4 import javax.jms.Message; 5 import javax.jms.MessageConsumer; 6 import javax.jms.Session; 7 import javax.jms.TextMessage; 8 9 import org.apache.activemq.ActiveMQConnectionFactory; 10 11 /** 12 * 非持久化Topic消息接收者 13 * @author Administrator 14 * 15 */ 16 public class NoPersistenceReceiver { 17 public static void main(String[] args) throws Exception { 18 //创建一个JMS connection factory 19 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616"); 20 //通过connection factory来创建JMS connection 21 Connection connection = connectionFactory.createConnection(); 22 //启动JMS connection 23 connection.start(); 24 //通过connection创建JMS session 25 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 26 //创建JMS destination 27 Destination destination = session.createTopic("noPersistenceTopic"); 28 //创建JMS consumer 29 MessageConsumer consumer = session.createConsumer(destination); 30 31 Message message = consumer.receive(); 32 while(message != null){ 33 TextMessage txtMsg = (TextMessage)message; 34 System.out.println("收到消息:"+txtMsg.getText()); 35 message = consumer.receive(); 36 } 37 //关闭所有的JMS资源 38 session.commit(); 39 session.close(); 40 connection.close(); 41 } 42 }
运行结果:
持久的Topic消息示例
1.消息发送程序
对于持久的Topic消息的发送方需要注意以下几点:
a.要用持久化订阅,发送消息者要用DeliveryMode.PERSISTENT模式来发送
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.DeliveryMode; 4 import javax.jms.Destination; 5 import javax.jms.MessageProducer; 6 import javax.jms.Session; 7 import javax.jms.TextMessage; 8 9 import org.apache.activemq.ActiveMQConnectionFactory; 10 11 /** 12 * 持久化Topic消息发送者 13 * @author Administrator 14 */ 15 public class PersistenceSender { 16 public static void main(String[] args) throws Exception { 17 //创建一个JMS connection factory 18 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616"); 19 //通过connection factory来创建JMS connection 20 Connection connection = connectionFactory.createConnection(); 21 //通过connection创建JMS session 22 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 23 //创建JMS destination 24 Destination destination = session.createTopic("PersistenceTopic"); 25 //创建JMS producer 26 MessageProducer producer = session.createProducer(destination); 27 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 28 //启动JMS connection 29 connection.start(); 30 for(int i = 0;i < 10;i++){ 31 TextMessage message = session.createTextMessage("message-"+i); 32 //发送message 33 producer.send(message); 34 } 35 //关闭所有的JMS资源 36 session.commit(); 37 session.close(); 38 connection.close(); 39 } 40 }
2.消息接收程序
对于持久的Topic消息的接收方需要注意以下几点:
a.需要在连接上设置消费者id,用来识别消费者
b.需要创建TopicSubscriber来订阅
c.一定要先运行一次该消费者程序,等于向消费服务中间件注册这个消费者,然后再运行消息发送者来发送消息,这样的话,无论消费者是否在线都会收到消息,如果不在线的话,则下次连接的时候会把没有收过的消息都接收下来。
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.Message; 4 import javax.jms.Session; 5 import javax.jms.TextMessage; 6 import javax.jms.Topic; 7 import javax.jms.TopicSubscriber; 8 9 import org.apache.activemq.ActiveMQConnectionFactory; 10 11 /** 12 * 持久化Topic消息接收者 13 * @author Administrator 14 * 15 */ 16 public class PersistenceReceiver { 17 public static void main(String[] args) throws Exception { 18 //创建一个JMS connection factory 19 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://120.76.123.81:61616"); 20 //通过connection factory来创建JMS connection 21 Connection connection = connectionFactory.createConnection(); 22 connection.setClientID("con1"); 23 //通过connection创建JMS session 24 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 25 //创建JMS destination 26 Topic destination = session.createTopic("PersistenceTopic"); 27 //创建JMS consumer 28 TopicSubscriber ts = session.createDurableSubscriber(destination, "TT"); 29 //启动JMS connection 30 connection.start(); 31 Message message = ts.receive(); 32 while(message != null){ 33 TextMessage txtMsg = (TextMessage)message; 34 session.commit(); 35 System.out.println("收到消息:"+txtMsg.getText()); 36 message = ts.receive(1000L); 37 } 38 //关闭所有的JMS资源 39 session.close(); 40 connection.close(); 41 } 42 }
关于持久化和非持久化消息
有两种方式指定传送模式:
1.使用setDeliveryMode方法,这样所有的消息都采用此传送模式;如producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
2.使用send方法为每条消息设置传送模式
持久化消息
这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但是却增加了可靠性。
非持久化消息
保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。