zoukankan      html  css  js  c++  java
  • 20200202 ActiveMQ 3. Java编码实现ActiveMQ通讯

    ActiveMQ 3. Java编码实现ActiveMQ通讯

    3.1. 队列(Queue)

    目的地(Destination)分为:

    • 点对点的队列(Queue)
    • 一对多的主题(Topic)

    3.1.1. 上手代码

    1. pom.xml
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.9</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>3.16</version>
    </dependency>
    
    1. 生产者代码
    public class JmsProducer {
    
        public static final String ACTIVEMQ_URL = "tcp://192.168.181.128:61616/";
        public static final String QUEUE_NAME = "queue01";
    
        public static void main(String[] args) throws JMSException {
            // 1. 创建连接工厂,按照给定的URL地址,采用默认用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            // 2. 通过连接工厂,获得连接Connection并启动访问
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
    
            // 3. 创建会话Session
            // 两个参数,第一个是事务控制,第二个是签收控制
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 4. 创建目的地(具体是队列queue或主题topic)
            Queue queue = session.createQueue(QUEUE_NAME);
            // 5. 创建消息的生产者
            MessageProducer messageProducer = session.createProducer(queue);
            // 6. 通过消息生产者发送消息
            for (int i = 0; i < 3; i++) {
                // 7. 创建消息
                TextMessage textMessage = session.createTextMessage("msg---" + i);
                // 8. 发送给MQ
                messageProducer.send(textMessage);
            }
            // 9. 关闭资源
            messageProducer.close();
            session.close();
            connection.close();
    
            System.out.println("*****消息发布到MQ完成*****");
        }
    }
    
    1. 消费者代码
    public class JmsConsumer {
    
        public static final String ACTIVEMQ_URL = "tcp://192.168.181.128:61616/";
        public static final String QUEUE_NAME = "queue01";
    
        public static void main(String[] args) throws JMSException {
            // 1. 创建连接工厂,按照给定的URL地址,采用默认用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            // 2. 通过连接工厂,获得连接Connection并启动访问
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
    
            // 3. 创建会话Session
            // 两个参数,第一个是事务控制,第二个是签收控制
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 4. 创建目的地(具体是队列queue或主题topic)
            Queue queue = session.createQueue(QUEUE_NAME);
    
            // 5. 创建消费者
            MessageConsumer messageConsumer = session.createConsumer(queue);
            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive();
                if (textMessage != null) {
                    System.out.println("*****消费者收到消息:" + textMessage.getText());
                } else {
                    break;
                }
            }
    
            // 6. 关闭资源
            messageConsumer.close();
            session.close();
            connection.close();
        }
    }
    

    3.1.2. receive()方法说明

    // 收到消息前一直阻塞进程
    javax.jms.MessageConsumer#receive()
    // 超时后不再阻塞进程
    javax.jms.MessageConsumer#receive(long timeout)
    

    3.1.3. 消费者监听器方式接收消息

    监听器方式属于异步非阻塞方式,所以需要手动阻塞进程

    messageConsumer.setMessageListener(new MessageListener() {
        @SneakyThrows
        @Override
        public void onMessage(Message message) {
            if (null != message && message instanceof TextMessage) {
                System.out.println("消费者监听器监听到消息***********" + ((TextMessage) message).getText());
            }
        }
    });
    // 手动阻塞进程
    System.in.read();
    

    3.1.4. 消费者三大消费情况

    1. 先生产,只启动1号消费者。问题:1号消费者可以消费消息吗?

      可以

    2. 先生产,先启动1号消费者,再启动2号消费者。问题:2号消费者可以消费消息吗?

      1号消费者可以消费消息;2号消费者不可以消费消息;

    3. 先启动2个消费者,再生产6条消息。问题:消费情况如何?

      2个消费者各消费一半消息;

    3.1.5. 两种消费方式

    1. 同步阻塞方式(receive()

    2. 异步非阻塞方式(消费者监听器onMessage()

    3.1.6. 点对点消息传递域的特点

    1. 每个消息只能有一个消费者,类似1对1的关系,类似于快递

    2. 消息的消费者和生产者没有时间上的相关性,类似于短信

    3. 消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息

    3.2. 主题(Topic)

    3.2.1. 发布订阅消息传递域的特点

    1. 每个消息可以有多个消费者,属于一对多的关系
    2. 生产者和消费者有时间上的相关性,订阅一个主题的消费者只能消费自它订阅之后发布的消息
    3. 生产者生产时,topic不保存消息,它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者

    JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时的消息。一句话,类似微信公众号订阅

    3.2.2. 上手代码

    测试时要先启动消费者,后启动生产者。

    1. 生产者代码
    public class JmsProducer_Topic {
    
        public static final String ACTIVEMQ_URL = "tcp://192.168.181.128:61616/";
        public static final String TOPIC_NAME = "topic01";
    
        public static void main(String[] args) throws JMSException {
            // 1. 创建连接工厂,按照给定的URL地址,采用默认用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            // 2. 通过连接工厂,获得连接Connection并启动访问
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
    
            // 3. 创建会话Session
            // 两个参数,第一个是事务控制,第二个是签收控制
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 4. 创建目的地(具体是队列queue或主题topic)
            Topic topic = session.createTopic(TOPIC_NAME);
            // 5. 创建消息的生产者
            MessageProducer messageProducer = session.createProducer(topic);
            // 6. 通过消息生产者发送消息
            for (int i = 0; i < 3; i++) {
                // 7. 创建消息
                TextMessage textMessage = session.createTextMessage("topic---" + i);
                // 8. 发送给MQ
                messageProducer.send(textMessage);
            }
            // 9. 关闭资源
            messageProducer.close();
            session.close();
            connection.close();
    
            System.out.println("*****topic消息发布到MQ完成*****");
        }
    }
    
    1. 消费者代码
    public class JmsConsumer_Topic {
    
        public static final String ACTIVEMQ_URL = "tcp://192.168.181.128:61616/";
        public static final String TOPIC_NAME = "topic01";
    
        public static void main(String[] args) throws JMSException, IOException {
            System.out.println("我是1号消费者");
            // System.out.println("我是2号消费者");
            // System.out.println("我是3号消费者");
    
            // 1. 创建连接工厂,按照给定的URL地址,采用默认用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            // 2. 通过连接工厂,获得连接Connection并启动访问
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
    
            // 3. 创建会话Session
            // 两个参数,第一个是事务控制,第二个是签收控制
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 4. 创建目的地(具体是队列queue或主题topic)
            Topic topic = session.createTopic(TOPIC_NAME);
    
            // 5. 创建消费者
            MessageConsumer messageConsumer = session.createConsumer(topic);
    
    
            messageConsumer.setMessageListener(new MessageListener() {
                @SneakyThrows
                @Override
                public void onMessage(Message message) {
                    if (null != message && message instanceof TextMessage) {
                        System.out.println("消费者监听器监听到 TOPIC 消息***********" + ((TextMessage) message).getText());
                    }
                }
            });
            // 手动阻塞进程
            System.in.read();
    
            // 6. 关闭资源
            messageConsumer.close();
            session.close();
            connection.close();
        }
    }
    

    3.3. 两种模式比较

    比较项目 Topic 模式 Queue模式
    工作模式 订阅-发布”模式,如果当前没有订阅者,消息将会被丢弃;如果有多个订阅者,那么这些订阅者都会收到消息 "负载均衡"模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息只会发送给其中一个消费者,并且要求消费者ack消息
    有无状态 无状态 Queue数据默认会在MQ服务器上以文件形式保存。也可以配置成DB存储
    传递完整性 如果没有订阅者,消息会被丢弃 消息不会被丢弃
    处理效率 由于消息要按照订阅者数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异 由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会明显降低。当然不同消息协议的具体性能也是有差异的
  • 相关阅读:
    sql 存储过程
    Chrome系列 Failed to load resource: net::ERR_CACHE_MISS
    oledb 操作 excel
    [转]基于SQL脚本将数据库表及字段提取为C#中的类
    Ul li 竖排 菜单
    JS判断checkbox至少选择一项
    JS 字符串转日期格式 日期格式化字符串
    setInterval 实时驱动界面改变
    Let's Format Css Documents
    Web颜色搭配
  • 原文地址:https://www.cnblogs.com/huangwenjie/p/12251028.html
Copyright © 2011-2022 走看看