一、非持久的Topic消息示例
注意 此种方式消费者只能接收到 消费者启动之后,发送者发送的消息。
发送者
package com.lhy.mq.helloworld; import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class NoPersistenceTopicSender { public static void main(String[] args) throws Exception { //第一步:建立ConnectionFactory工厂对象。需要填入用户名、密码、连接地址,均使用默认即可,默认端口为"tcp://localhost:61616" ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "lhy","123456", //ActiveMQConnectionFactory.DEFAULT_USER, //ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("NB-NB"); //队列名称 MessageProducer producer = session.createProducer(null);// // 第六步:可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode) //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("我是消息内容 -333- "+i); producer.send(destination, message); System.err.println("生产者发送消息:"+message.getText()); } session.commit(); if(connection != null){ connection.close(); } } }
接收者
package com.lhy.mq.helloworld; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class NoPersitenceTopicReceiver { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "lhy","123456", "tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("NB-NB"); MessageConsumer consumer = session.createConsumer(destination); Message message = consumer.receive(); while(message != null){ TextMessage textMsg = (TextMessage)message; System.err.println("消费消息:"+textMsg.getText()); //接收下一个消息 message = consumer.receive(1000L); } //提交一下事务,否则不确认消息,消息不会出队列 session.commit(); session.close(); connection.close(); } }
二、持久订阅例子程序
发送者
package com.lhy.mq.helloworld; import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class PersistenceTopicSender { public static void main(String[] args) throws Exception { //第一步:建立ConnectionFactory工厂对象。需要填入用户名、密码、连接地址,均使用默认即可,默认端口为"tcp://localhost:61616" ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "lhy","123456", "tcp://127.0.0.1:61616"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic destination = session.createTopic("Persistence-Topic"); //队列名称 MessageProducer producer = session.createProducer(null);// //默认为持久订阅,注意这个一定在start之前设置 producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("我是消息内容 -666- "+i); producer.send(destination, message); System.err.println("生产者发送-topic-消息:"+message.getText()); } session.commit(); if(connection != null){ connection.close(); } } }
消费者,可以有多个消费者
1, 消费者需要在Connection上设置消费者id,来识别消费者
2,需要创建TopicSubscriber 来订阅
3,设置好之后再start 这个Connection
4,一定要先运行一次消费者,来向ActiveMQ注册这个消费者,然后再运行发送消息,这样无论消费者是否在线,都会接收到消息。否则只能接收到注册之后的消息。
package com.lhy.mq.helloworld; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消费者需要先运行一次,向producer注册一下 * @author dell * */ public class PersitenceTopicReceiver { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "lhy","123456", "tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); //设置消费者的id,向发送者先注册一下,producer就知道谁在订阅 connection.setClientID("client2"); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic destination = session.createTopic("Persistence-Topic"); TopicSubscriber consumer = session.createDurableSubscriber(destination, "T1");//创建一个持久订阅 //最后start connection.start(); Message message = consumer.receive(); while(message != null){ TextMessage textMsg = (TextMessage)message; System.err.println("消费消息:"+textMsg.getText()); //接收下一个消息 message = consumer.receive(1000L); } //提交一下事务,否则不确认消息,消息不会出队列 session.commit(); session.close(); connection.close(); } }
分别修改消费者的clientID为 client1、client2运行,相当于2个消费者。
管控台:2个消费者,