1. Queue
1.1 Producer
生产者:生产消息,发送端。
把jar包添加到工程中。
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
1 /** 2 * 点到点形式发送消息 3 */ 4 @Test 5 public void testQueueProducer() throws Exception{ 6 //1、创建一个连接工厂对象,需要指定服务的ip及端口。 7 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.83.102:61616"); 8 //2、使用工厂对象创建一个Connection对象。 9 Connection connection = connectionFactory.createConnection(); 10 //3、开启连接,调用Connection对象的start方法。 11 connection.start(); 12 //4、创建一个Session对象。 13 //第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。 14 //第二个参数:应答模式。自动应答或者手动应答。一般自动应答。 15 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 16 //5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue 17 Queue queue = session.createQueue("test-queue"); 18 //6、使用Session对象创建一个Producer对象。 19 MessageProducer producer = session.createProducer(queue); 20 //7、创建一个Message对象,可以使用TextMessage。 21 /*TextMessage textMessage = new ActiveMQTextMessage(); 22 textMessage.setText("hello Activemq");*/ 23 TextMessage textMessage = session.createTextMessage("hello activemq"); 24 //8、发送消息 25 producer.send(textMessage); 26 //9、关闭资源 27 producer.close(); 28 session.close(); 29 connection.close(); 30 }
1.2. Consumer
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
1 @Test 2 public void testQueueConsumer() throws Exception { 3 //创建一个ConnectionFactory对象连接MQ服务器 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.83.102:61616"); 5 //创建一个连接对象 6 Connection connection = connectionFactory.createConnection(); 7 //开启连接 8 connection.start(); 9 //使用Connection对象创建一个Session对象 10 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 11 //创建一个Destination对象。queue对象 12 Queue queue = session.createQueue("test-queue"); 13 //使用Session对象创建一个消费者对象。 14 MessageConsumer consumer = session.createConsumer(queue); 15 //接收消息 16 consumer.setMessageListener(new MessageListener() { 17 18 @Override 19 public void onMessage(Message message) { 20 //打印结果 21 TextMessage textMessage = (TextMessage) message; 22 String text; 23 try { 24 text = textMessage.getText(); 25 System.out.println(text); 26 } catch (JMSException e) { 27 e.printStackTrace(); 28 } 29 30 } 31 }); 32 //等待接收消息 33 System.in.read(); 34 //关闭资源 35 consumer.close(); 36 session.close(); 37 connection.close(); 38 }
2. Topic
2.1. Producer
使用步骤:
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
1 @Test 2 public void testTopicProducer() throws Exception { 3 //1、创建一个连接工厂对象,需要指定服务的ip及端口。 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.83.102:61616"); 5 //2、使用工厂对象创建一个Connection对象。 6 Connection connection = connectionFactory.createConnection(); 7 //3、开启连接,调用Connection对象的start方法。 8 connection.start(); 9 //4、创建一个Session对象。 10 //第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。 11 //第二个参数:应答模式。自动应答或者手动应答。一般自动应答。 12 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 13 //5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic 14 Topic topic = session.createTopic("test-topic"); 15 //6、使用Session对象创建一个Producer对象。 16 MessageProducer producer = session.createProducer(topic); 17 //7、创建一个Message对象,可以使用TextMessage。 18 /*TextMessage textMessage = new ActiveMQTextMessage(); 19 textMessage.setText("hello Activemq");*/ 20 TextMessage textMessage = session.createTextMessage("topic message"); 21 //8、发送消息 22 producer.send(textMessage); 23 //9、关闭资源 24 producer.close(); 25 session.close(); 26 connection.close(); 27 }
2.2. Consumer
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源。
Topic模式接收消息消费者必须处于启动状态。
1 @Test 2 public void testTopicConsumer() throws Exception { 3 //创建一个ConnectionFactory对象连接MQ服务器 4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.83.102:61616"); 5 //创建一个连接对象 6 Connection connection = connectionFactory.createConnection(); 7 //开启连接 8 connection.start(); 9 //使用Connection对象创建一个Session对象 10 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 11 //创建一个Destination对象。topic对象 12 Topic topic = session.createTopic("test-topic"); 13 //使用Session对象创建一个消费者对象。 14 MessageConsumer consumer = session.createConsumer(topic); 15 //接收消息 16 consumer.setMessageListener(new MessageListener() { 17 18 @Override 19 public void onMessage(Message message) { 20 //打印结果 21 TextMessage textMessage = (TextMessage) message; 22 String text; 23 try { 24 text = textMessage.getText(); 25 System.out.println(text); 26 } catch (JMSException e) { 27 e.printStackTrace(); 28 } 29 30 } 31 }); 32 System.out.println("topic消费者1启动。。。。"); 33 //等待接收消息 34 System.in.read(); 35 //关闭资源 36 consumer.close(); 37 session.close(); 38 connection.close(); 39 }
总结:
Queue(点对点):若消息没有被接收,服务器会一直保存
Topic(广播):不会持久化,只负责发送
若想要持久化保存消息,需要进行订阅。