zoukankan      html  css  js  c++  java
  • ActiveMQ的消息模式——队列模式(Queue)

    前言

    此处的代码只是简化理解,实际项目会结合Spring使用。

    一、队列模式特点

    1. 客户端包括生产者和消费者
    2. 队列中的消息只能被一个消费者消费
    3. 消费者可以随时消费队列中的消息

    二、创建过程

    1.创建连接Connection
    2.创建会话Session
    3.通过Session来创建其它的(MessageProducer、MessageConsumer、Destination、TextMessage)
    4.将生产者 MessageProducer 和消费者 MessageConsumer 都会指向目标 Destination
    5.生产者向目标发送TextMessage消息send()
    6.消费者设置监听器,监听消息。

    三、代码实现

    1. 创建Maven项目

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.jms</groupId>
        <artifactId>jms-test</artifactId>
        <version>1.0-SNAPSHOT</version>
        
        <!-- activemq依赖 -->
        <dependencies>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.9.0</version>
            </dependency>
        </dependencies>
    
    </project>

    2. 生产者 AppProducer.java

    public class AppProducer {
        private static final String url = "tcp://127.0.0.1:61616";
        private static final String queueName = "queue-test";
    
        public static void main(String[] args) throws JMSException {
            //1.创建ConnectionFactory
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            //2.创建Connection
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目标
            Destination destination = session.createQueue(queueName);
            //6.创建一个生产者
            MessageProducer producer = session.createProducer(destination);
            for (int i = 0; i < 10; i++) {
                //7.创建消息
                TextMessage textMessage = session.createTextMessage("test" + i);
                //8.发布消息
                producer.send(textMessage);
    
                System.out.println("发送消息"+textMessage.getText());
            }
    
            //9.关闭连接
            connection.close();
    
        }
    }
    

    3. 消费者 AppConsumer.java

    消费者的连接Connection是不能关闭的,因为消息的接收是异步的,会导致消息不能被消费。

    
    public class AppConsumer {
        private static final String url = "tcp://127.0.0.1:61616";
        private static final String queueName = "queue-test";
    
        public static void main(String[] args) throws JMSException {
            //1. 创建ConnectionFactory
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            //2. 创建Connection
            Connection connection = connectionFactory.createConnection();
            //3. 启动连接
            connection.start();
            //4. 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5. 创建一个目标
            Destination destination = session.createQueue(queueName);
            //6. 创建一个消费者
            MessageConsumer consumer = session.createConsumer(destination);
            //7. 创建一个监听器
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        System.out.println("接收消息  = [" + ((TextMessage) message).getText() + "]");
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            //8.关闭连接(消费者的连接不允许关闭的,因为消息的接收是异步的,会导致消息不能被消费)
            //connection.close();
        }
    }
    

    四、运行查看

    1. 运行生产者 AppProducer

    运行AppProducer.java后会发现队列中添加了10条消息,如下图:

    2. 开启消费者 AppConsumer

    运行AppConsumer.java后会发现队列中的10条消息被消费了,如下图:

    3.开启两个消费者后,运行生产者

    会发现生产者发送的10个消息,被两个消费者平分了。
    AppConsumer1

    接收消息  = [test1]
    接收消息  = [test3]
    接收消息  = [test5]
    接收消息  = [test7]
    接收消息  = [test9]

    AppConsumer2

    接收消息  = [test0]
    接收消息  = [test2]
    接收消息  = [test4]
    接收消息  = [test6]
    接收消息  = [test8]

    五、队列模式和主题模式的区别

    1. 是否需要提前订阅
      队列模式:消费者不需要提前订阅也可以消费消息
      主题模式:只有提前进行订阅的消费者才能成功消费消息
    2. 多个消费者如何分配消息
      队列模式:只能平均消费消息,被别的消费者消费的消息不能重复被其他的消费者消费
      主题模式:每个订阅者都可以消费主题模式中的每一条消息
    阅读 7.3k更新于 2018-04-24
  • 相关阅读:
    matplotlib 进阶之origin and extent in imshow
    Momentum and NAG
    matplotlib 进阶之Tight Layout guide
    matplotlib 进阶之Constrained Layout Guide
    matplotlib 进阶之Customizing Figure Layouts Using GridSpec and Other Functions
    matplotlb 进阶之Styling with cycler
    matplotlib 进阶之Legend guide
    Django Admin Cookbook-10如何启用对计算字段的过滤
    Django Admin Cookbook-9如何启用对计算字段的排序
    Django Admin Cookbook-8如何在Django admin中优化查询
  • 原文地址:https://www.cnblogs.com/telwanggs/p/13183529.html
Copyright © 2011-2022 走看看