一、JMS编码总体规范
二、创建Maven工程和引入Maven依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.5</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.10</version>
</dependency>
三、队列(Queue)
1、生产者
public class JmsQueueProducer {
// tcp:一种通信协议
// 192.168.229.129:Linux主机的ip地址
// 61616:ActiveMQ默认的端口
public static final String BROKER_URL = "tcp://192.168.229.129:61616";
public static final String QUEUE_NAME = "activemq-queue";
public static final String TEXT_MESSAGE_NAME = "textMessage";
public static void main(String[] args) throws JMSException {
// 1、创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 2、通过连接工厂获取连接对象
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、通过连接对象创建一个JMS Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建Destination对象(Destination接口有两个子接口,分别是Queue、Topic)
Queue queue = session.createQueue(QUEUE_NAME);
// 6、获取生产者对象
MessageProducer producer = session.createProducer(queue);
// 发送3个TextMessage类型的消息到队列中
for (int i = 1; i < 4; i++) {
TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE_NAME + i);
producer.send(queue, textMessage);
}
System.out.println("send message to queue success!!!");
// 释放资源
producer.close();
session.close();
connection.close();
}
}
运行生产者代码之后,登录ActiveMQ客户端,可以看到消息已经推送至名称为activemq-queue的队列中了,由于我们发送了3个消息,所以这里的待处理消息就是3,我们还没有创建消费者,所以消费者数目为0,发送了3个消息,并且都进入了队列,所以入队的消息数目为3,没有消费者消费消息,那么出队列的消息数目为0.
队列相关名词解释:
名称 | 简介 |
Number Of Pending Messages |
待处理的消息数目:入队的总数-出队的总数 |
Number Of Consumers |
消费者数目:消费端的消费者数目 |
Messages Enqueued |
已经入队的消息数目,进入队列的消息的数目,这个数目只增不减,即使出队了也不会减少 |
Messages Dequeued |
已经出队的消息数目,也就是消费者消费掉的消息数目 |
2、消费者
一、消费者接收消息方式一
public class JmsQueueConsumer {
public static final String BROKER_URL = "tcp://192.168.229.129:61616";
public static final String QUEUE_NAME = "activemq-queue";
public static void main(String[] args) throws IOException, JMSException {
// 1、创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 2、通过连接工厂获取连接对象
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、通过连接对象创建一个JMS Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建Destination对象(Destination接口有两个子接口,分别是Queue、Topic)
Queue queue = session.createQueue(QUEUE_NAME);
// 6、获取消费者对象
MessageConsumer consumer = session.createConsumer(queue);
// 7、如果有消息就一直获取
while (true) {
// 一直等待接收消息,在接收到消息之前一直处于阻塞状态,是同步阻塞方式
//Message message = consumer.receive();
// 在等待 4*1000毫秒之后,如果还没有接收到消息,那么就结束阻塞状态
Message message = consumer.receive(4 * 1000);
// 生产者发送到队列的消息是TextMessage类型的,那么这里接收消息的类型也要保持一致
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("消费者接收到的消息是:" + textMessage.getText());
} else {
break;
}
}
// 8、释放资源
consumer.close();
session.close();
connection.close();
}
}
运行消费者代码消费完了消息之后,可以看出待处理的消息是0,入队的消息保持不变,还是3,可以出队的消息也变成了3,也就是代表着3条消息已经消费完了
二、消费者接收消息方式二
public class JmsQueueConsumer02 {
public static final String BROKER_URL = "tcp://192.168.229.129:61616";
public static final String QUEUE_NAME = "activemq-queue";
public static void main(String[] args) throws IOException, JMSException {
// 1、创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 2、通过连接工厂获取连接对象
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、通过连接对象创建一个JMS Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建Destination对象(Destination接口有两个子接口,分别是Queue、Topic)
Queue queue = session.createQueue(QUEUE_NAME);
// 6、获取消费者对象
MessageConsumer consumer = session.createConsumer(queue);
// 7、消费者使用消息监听的方式来消费消息,是异步非阻塞的方式
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// 生产者发送的消息是什么类型的,那么消费者接收消息的类型也需要保持一致
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者接收到的消息是:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 使主线程不要结束,如果主线程结束了,那么消息监听的线程也会被迫结束,实际应用中程序会一直启动,就不需要这一句代码了
System.in.read();
// 8、释放资源
consumer.close();
session.close();
connection.close();
}
}
消费者获取消息的方式二与方式一有一点点不同,那就是消息监听的方式,消费者会一直处于启动状态,所以一直显示有消费者在线.
四、主题(Topic)
1、生产者
public class JmsTopicProducer {
public static final String BROKER_URL = "tcp://192.168.229.129:61616";
public static final String TOPIC_NAME = "activemq-queue";
public static final String TEXT_MESSAGE_NAME = "textMessage";
public static void main(String[] args) throws JMSException {
// 1、创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 2、通过连接工厂获取连接对象
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、通过连接对象创建一个JMS Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建Destination对象(Destination接口有两个子接口,分别是Queue、Topic)
Topic topic = session.createTopic(TOPIC_NAME);
// 6、获取生产者对象
MessageProducer producer = session.createProducer(topic);
// 7、创建消息
for (int i = 1; i < 4; i++) {
TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE_NAME + i);
producer.send(textMessage);
}
// 8、 释放资源
producer.close();
session.close();
connection.close();
}
}
执行完主题的生产者之后,可以看到三条消息已经发送至名称为activemq-topic的主题中了
2、消费者
一、消费者接收消息方式一
public class JmsTopicCousumer {
public static final String BROKER_URL = "tcp://192.168.229.129:61616";
public static final String TOPIC_NAME = "activemq-topic";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("我是一号主题消费者");
// 1、创建连接工厂对象
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 2、通过连接工厂获取连接对象
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、通过连接对象获取JSM Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建目的地
Topic topic = session.createTopic(TOPIC_NAME);
// 6、创建主题消费者
MessageConsumer consumer = session.createConsumer(topic);
// 7、接收消息方式一
while (true) {
Message message = consumer.receive();
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("textMessage消息体:" + textMessage.getText());
} else {
break;
}
}
// 8、释放资源
consumer.close();
session.close();
connection.close();
}
}
二、消费者接收消息方式二
public class JmsTopicCousumer {
public static final String BROKER_URL = "tcp://192.168.229.129:61616";
public static final String TOPIC_NAME = "activemq-topic";
public static void main(String[] args) throws JMSException, IOException {
// 1、创建连接工厂对象
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 2、通过连接工厂获取连接对象
Connection connection = connectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、通过连接对象获取JSM Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建目的地
Topic topic = session.createTopic(TOPIC_NAME);
// 6、创建主题消费者
MessageConsumer consumer = session.createConsumer(topic);
// 7、接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("textMessage消息体:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 让主线程不要停止
System.in.read();
// 8、释放资源
consumer.close();
session.close();
connection.close();
}
}
五、注意事项
1、注意:consumer.receive()方法有两个,它们的区别如下
// 一直接收消息,消费者和队列不会断开连接,处于同步阻塞状态
TextMessage textMessage = (TextMessage) consumer.receive();
// 超过了时间(毫秒值),消费者将和队列自动断开连接,结束阻塞状态
TextMessage textMessage = (TextMessage) consumer.receive(10*1000);
2、消费者通过 consumer.setMessageListener(MessageListerer messageListener)的方式消费消息
// 消费者通过消息监听的方式消费消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到的消息是:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 保证在连接到MQ之前,控制台不灭,也就是消费到了消息之后才去释放资源,Dubbo中也有
System.in.read();
3、对于主题,如果我们先启动生产者,由于主题没有消费者,那么生产者发布的消息就是一条废消息,消费者是不能接收到任何消息的,所以我们应该先启动消费者,然后再启动生产者(类似于公众号订阅,你只有先订阅了之后才可以获得消息,如果没有订阅者的话,发布的消息就是一条废消息,没有任何意义)
4、如果消费者采用的是监听接收消息,一定要记得加上 System.in.read()方法