ActiveMQ入门代码案例:
pom依赖:
<!-- activemq 所需要的jar 包--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> <!-- activemq 和 spring 整合的基础包 --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency>
JMS编码总体规范:
Destination(目的地)简介:
Destination是目的地。下面拿jvm和mq,做个对比。目的地,我们可以理解为是数据存储的地方。
Destination分为两种:队列和主题。下图介绍:
Destination之队列(Queue)
消息队列生产者
案例代码:
public static final String ACTIVEMQ_URL = "tcp://www.zhangzhixi.top:61616"; public static final String ACTIVE_NAME = "active1"; public static void main(String[] args) throws Exception { /*1、创建连接工厂*/ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); /*2、打开连接*/ final Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); /*3、创建Session会话:参数1:事务,参数2:签收*/ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); /*4、创建目的地:具体是队列还是主题topic*/ Queue queue = session.createQueue(ACTIVE_NAME); /*5、创建消息的消费者*/ final MessageProducer producer = session.createProducer(queue); for (int i = 1; i <= 60; i++) { final TextMessage textMessage = session.createTextMessage("生产者生产消息:Message=>" + i); producer.send(textMessage); } /*6、关闭资源*/ producer.close(); session.close(); connection.close(); System.out.println("*****生产者生产完成*****"); }
控制台:
Number Of Pending Messages:
等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
Number Of Consumers:
消费者数量,消费者端的消费者数量。
Messages Enqueued:
进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
Messages Dequeued:
出队消息数,可以理解为是消费者消费掉的数量。
总结:
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。
消息队列消费者
案例代码:
public static final String ACTIVEMQ_URL = "tcp://www.zhangzhixi.top:61616"; public static final String ACTIVE_NAME = "active1"; public static void main(String[] args) throws Exception { /*1、创建连接工厂*/ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); /*2、打开连接*/ final Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); /*3、创建Session会话:参数1:事务,参数2:签收*/ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); /*4、创建目的地:具体是队列还是主题topic*/ Queue queue = session.createQueue(ACTIVE_NAME); final MessageConsumer consumer = session.createConsumer(queue); /*5、消费者消费方式一:订阅(同步阻塞)*/ while (true) { final TextMessage receive = (TextMessage) consumer.receive(30000); if (receive != null) { System.out.println("*******消费者收到消息*******" + receive.getText()); }else{ break; } } consumer.close(); session.close(); connection.close(); System.out.println("****消费者结束消费*****"); }
控制台:
异步监听式消费者(MessageListener)
1 public static final String ACTIVEMQ_URL = "tcp://www.zhangzhixi.top:61616"; 2 public static final String ACTIVE_NAME = "active1"; 3 public static void main(String[] args) throws Exception { 4 /*1、创建连接工厂*/ 5 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); 6 /*2、打开连接*/ 7 final Connection connection = activeMQConnectionFactory.createConnection(); 8 connection.start(); 9 /*3、创建Session会话:参数1:事务,参数2:签收*/ 10 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 11 /*4、创建目的地:具体是队列还是主题topic*/ 12 Queue queue = session.createQueue(ACTIVE_NAME); 13 final MessageConsumer consumer = session.createConsumer(queue); 14 /*6、消费者消费方式二:监听器(异步非阻塞)*/ 15 consumer.setMessageListener(new MessageListener() { 16 @Override 17 public void onMessage(Message message) { 18 if (message instanceof TextMessage) { 19 final TextMessage textMessage = (TextMessage) message; 20 try { 21 System.out.println("*******消费者收到消息:" + textMessage.getText()); 22 } catch (JMSException e) { 23 e.printStackTrace(); 24 } 25 } 26 } 27 }); 28 System.in.read(); 29 consumer.close(); 30 session.close(); 31 connection.close(); 32 System.out.println("****消费者结束消费*****"); 33 }
队列消息(Queue)总结
两种消费方式:
同步阻塞方式(receive())
订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
异步非阻塞方式(监听器onMessage())
订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。
队列的特点:
消息消费情况
情况1:只启动消费者1。
结果:消费者1会消费所有的数据。
情况2:先启动消费者1,再启动消费者2。
结果:消费者1消费所有的数据。消费者2不会消费到消息。
情况3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2。
结果:消费者1和消费者2平摊了消息。各自消费3条消息。
疑问:怎么去将消费者1和消费者2不平均分摊呢?而是按照各自的消费能力去消费。我觉得,现在activemq就是这样的机制。