点对点发送模式queue模式
发送方
TestQueueProducer
package cn.itcast.test.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 测试点对点发送模式(queue模式) -- 发送方
*/
public class TestQueueProducer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session (参数1:是否启动事务,参数2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
}
接收方
TestQueueConsumer
package cn.itcast.test.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 测试点对点发送模式(queue模式) -- 接收方 */ public class TestQueueConsumer { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(queue); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } }
订阅发布模式
/**
* 测试topic模式(订阅发布模式) -- 发送方
*/
package cn.itcast.test.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 测试topic模式(订阅发布模式) -- 发送方 */ public class TestTopicProducer { public static void main(String[] args) throws Exception{ //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息生产者 MessageProducer producer = session.createProducer(topic); //7.创建消息 TextMessage textMessage = session.createTextMessage("欢迎来到神奇的品优购世界"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); } }
/**
* 测试topic模式(订阅发布模式) -- 接收方
*/
package cn.itcast.test.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 测试topic模式(订阅发布模式) -- 接收方 */ public class TestTopicConsumer1 { public static void main(String[] args) throws Exception{ //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(topic); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } }
/**
* 测试topic模式(订阅发布模式) -- 接收方
*/
package cn.itcast.test.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 测试topic模式(订阅发布模式) -- 接收方 */ public class TestTopicConsumer2 { public static void main(String[] args) throws Exception{ //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 //Queue queue = session.createQueue("test-queue"); Topic topic = session.createTopic("test-topic"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(topic); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } }