一、非持久的Topic
Topic 发送
public class NoPersistenceSender { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination topic=session.createTopic("myTopic"); MessageProducer producer=session.createProducer(topic); for(int i=0 ; i<3 ; i++){ TextMessage message=session.createTextMessage("message"+i); //message.setStringProperty("queue", "queue"+i); //message.setJMSType("1"); producer.send(message); } session.commit(); session.close(); connection.close(); } }
Topic 接收
public class NoPersistenceRecever { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination topic=session.createTopic("myTopic"); MessageConsumer consumer = session.createConsumer(topic); Message message=consumer.receive(); while (message !=null){ TextMessage textMessage=(TextMessage) message; //System.out.println(message.getStringProperty("queue")); System.out.println(textMessage.getText()); session.commit(); message = consumer.receive(1000L); } session.close(); connection.close(); } }
二、持久化得Topic
Topic 发送
public class PersistenceSender { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616"); Connection connection = connectionFactory.createConnection(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination topic=session.createTopic("myTopic1"); MessageProducer producer=session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for(int i=0 ; i<3 ; i++){ TextMessage message=session.createTextMessage("message"+i); //message.setStringProperty("queue", "queue"+i); //message.setJMSType("1"); producer.send(message); } session.commit(); session.close(); connection.close(); } }
- 要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在连接之前设定
- 一定要设置完成后,再start 这个 connection
Topic 接收
public class PersistenceRecever { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616"); Connection connection = connectionFactory.createConnection(); connection.setClientID("cc1"); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic=session.createTopic("myTopic1"); TopicSubscriber ts = session.createDurableSubscriber(topic, "t1"); connection.start(); Message message=ts.receive(); while (message !=null){ TextMessage textMessage=(TextMessage) message; //System.out.println(message.getStringProperty("queue")); System.out.println(textMessage.getText()); session.commit(); message = ts.receive(1000L); } session.close(); connection.close(); } }
- 需要在连接上设置消费者id,用来识别消费者
- 需要创建TopicSubscriber来订阅
- 要设置好了过后再start 这个 connection
- 一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候,无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。