Topic消息
非持久的 Topic消息示例
对于非持久的 Topic消息的发送
基本跟前而发送队列信息是一样的,以是把创建 Destination的地方,由创
建队列替换成创建 Topic,例如:
Destination destination =session createTopic( "mytopic");
对于非持久的 Topic消息的接收
1:必须要接收方在线,然后客户端再发送信息,接收方才能接收到消息
2:同样把创建 Destination的地方,由创建队列替换成创建 Topic,例如:
Destination destination session. createTopic("mytopic");
3:由于不知道客户端发送多少信息,因此改成 while循环的方式了,例如:
Message message consumer. receive():
while (message!=null){
TextMessage txtMsg =(TextMessage)message;
System.out.printIn("收到消息:”+ txtMsg. getText());
message= consumer. receive(1000L);
}
生产者
package test.mq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { public static void main(String[] args) throws JMSException { ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory( "tcp://localhost:61616" ); Connection connection=ConnectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination=session.createTopic("mytopic"); MessageProducer messageProducer=session.createProducer(destination); for(int i=1;i<=5;i++){ TextMessage textMessage=session.createTextMessage(); textMessage.setText("我是TOM ID为"+i); messageProducer.send(textMessage); System.out.println("生产者:"+textMessage.getText()); } session.commit(); session.close(); connection.close(); } }
控制台信息
消费者
package test.mq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.ConsumerBrokerExchange; public class Receiver { public static void main(String[] args) throws Exception { ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory( "tcp://localhost:61616" ); Connection connection=ConnectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination=session.createTopic("mytopic"); MessageConsumer messageConsumer=session.createConsumer(destination); Message msg= messageConsumer.receive(); while(msg!=null){ TextMessage msgs=(TextMessage) msg; System.out.println("接受信息----》"+msgs.getText()); msg= messageConsumer.receive(1000L); } session.commit(); session.close(); connection.close(); } }
控制台信息
对于持久的 Topic消息的发送
ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection=ConnectionFactory.createConnection();
Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination=session.createTopic("mytopic2");
MessageProducer Producer=session.createProducer(destination);
Producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for(int i=1;i<=5;i++){
TextMessage textMessage=session.createTextMessage();
textMessage.setText("我是tom ID为"+i);
messageProducer.send(textMessage);
System.out.println("生产者:"+textMessage.getText());
}
session.commit();
session.close();
connection.close();
1:要用持久化订阅,发送消息者要用 DeliveryMode, PERSISTENT模式发现,在连接之前设定
2:一定要设置完成后,再 start这个 connection
对于持久的 Topic消息的接收
ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection=ConnectionFactory.createConnection();
connection.setClientID("cc1");
Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic destination=session.createTopic("mytopic2");
TopicSubscriber ts=session.createDurableSubscriber(destination, "T1");
connection.start();
Message msg= ts.receive();
while(msg!=null){
TextMessage msgs=(TextMessage) msg;
System.out.println("接受信息----》"+msgs.getText());
msg= ts.receive(1000L);
}
session.commit();
session.close();
connection.close();
1:需要在连接上设置消费者id,用来识别消费者
2:需要创建 TopicSubscriber来订阅
3:要设置好了过后再 start这个 connection
4:一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候,
无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。
生产者
package test.mq.topic1; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { public static void main(String[] args) throws JMSException { ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory( "tcp://localhost:61616" ); Connection connection=ConnectionFactory.createConnection(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination=session.createTopic("mytopic2"); MessageProducer messageProducer=session.createProducer(destination); messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for(int i=1;i<=5;i++){ TextMessage textMessage=session.createTextMessage(); textMessage.setText("我是tom ID为"+i); messageProducer.send(textMessage); System.out.println("生产者:"+textMessage.getText()); } session.commit(); session.close(); connection.close(); } }
消费者
package test.mq.topic1; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.ConsumerBrokerExchange; public class Receiver { public static void main(String[] args) throws Exception { ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory( "tcp://localhost:61616" ); Connection connection=ConnectionFactory.createConnection(); connection.setClientID("cc1"); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic destination=session.createTopic("mytopic2"); TopicSubscriber ts=session.createDurableSubscriber(destination, "T1"); connection.start(); Message msg= ts.receive(); while(msg!=null){ TextMessage msgs=(TextMessage) msg; System.out.println("接受信息----》"+msgs.getText()); msg= ts.receive(1000L); } session.commit(); session.close(); connection.close(); } }
控制台
总结:
关于持久化和非持久化消息
持久化消息
这是 Active的默认传送模式,此模式保证这些消息只被传送一次和成功使用一
次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消
息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息
这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消
息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然
这样增加了消息传送的开销,但却增加了可靠性。
非持久化消息
保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。此模
式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢
失。有两种方法指定传送模式:
1.使用 set DeliveryMode方法,这样所有的消息都采用此传送模式:如:
roducer set DeliveryMode( DeliveryMode NON PERSISTENT)
2.使用send方法为每一条消息设置传送模式