之前介绍了ActiveMQ下载与安装,并且启动了。下面进行ActiveMQ的Demo
1. JMS入门Demo
1.1 点对点模式
点对点模式主要建立在一个队列上面,当连接一个队列的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发送接收端,如果没有接收端接收,则会保存在activeMQ服务器,直到接收端接收消息。点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端就接收不到那条消息。
先引入ActiveMQ的依赖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency>
1.1.1 消息生产者
public class QueueProducer {
public static void main(String[] args) throws Exception {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
//2.创建连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session(会话对象) 参数transacted:是否启动事务;参数acknowledgeMode:消息的确认模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息生产者对象
MessageProducer producer = session.createProducer(queue);
//7.创建消息对象(文本消息)
TextMessage textMessage = session.createTextMessage("欢迎来到ActiveMQ");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
}
上述代码中第4步创建session 的两个参数:
第1个参数 是否使用事务
第2个参数 消息的确认模式
- AUTO_ACKNOWLEDGE = 1 自动确认
- CLIENT_ACKNOWLEDGE = 2 客户端手动确认
- DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
- SESSION_TRANSACTED = 0 事务提交并确认
运行后通过ActiveMQ管理界面查询
1.1.2 消息消费者
public class QueueConsumer { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616"); //2.创建连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.创建会话对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息消费者对象 MessageConsumer consumer = session.createConsumer(queue); //7.设置监听 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("提取的消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } }
执行后看到控制台输出
1.1.3运行测试
同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现只有一个消费者会接收到消息。
1.2 发布/订阅模式
1.2.1 消息生产者
/** * 发布订阅模式,消息生产者 * @author Administrator * */ public class TopicProducer { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616"); //2.创建连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.创建会话对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建生产者对象 MessageProducer producer = session.createProducer(topic); //7.创建消息对象(文本消息) TextMessage textMessage = session.createTextMessage("欢迎来到ActiveMQ"); //8.发送消息 producer.send(textMessage); //9.关闭资源 producer.close(); session.close(); connection.close(); } }
运行效果如下:
1.2.2 消息消费者
public class TopicConsumer { public static void main(String[] args) throws Exception { //1.创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616"); //2.创建连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.创建会话对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消费者对象 MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息为:" + textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } }
1.2.3运行测试
同时开启2个以上的消费者,再次运行生产者,观察每个消费者控制台的输出,会发现每个消费者会接收到消息。