zoukankan      html  css  js  c++  java
  • ActiveMQ 笔记(二)部署和DEMO(队列、主题)

    个人博客网:https://wushaopei.github.io/    (你想要这里多有)

    一、部署操作

    1. 部署在linux 上的acvtiveMQ 要可以通过前台windows 的页面访问,必须把linux 的IP和 windows的 IP 地址配置到同一个网关下 。这种情况一般都是修改 linux 的IP 地址,修改网卡文件对应的IP 地址

    修改linux 的ip 地址:

    cd   /etc/sysconfig/network-scripts
    
    vi  ifcfg-eth0 

             

    这是修改之后的网卡文件配置,IP 地址为:192.168.17.3

    配置成功后 ,可以用 windows ping linux , linux ping windows ,当全部ping 通后,可以使用图形化界面访问activeMQ

    // ActiveMQ 的前台端口为 8161 , 提供控制台服务 后台端口为61616 ,提供 JMS 服务

               

    // 192.168.17.3 为 linux 的IP 地址, 使用 IP+端口 访问了ActiveMQ , 登陆之后的样子如上。(能访问成功首先得在linux 上启动activeMQ 的服务),首次登录的默认账户密码为 账号:admin 密码:admin  ,默认端口号:8161

    2、JMS

    Java 消息中间件的服务接口规范,activemq 之上是 mq , 而 mq 之上是JMS 定义的消息规范 。 activemq 是mq 技术的一种理论实现(与之相类似的实现还有 Kafka RabbitMQ RockitMQ ),而 JMS 是更上一级的规范。

         

    JMS 的两种模式:

    在点对点的消息传递时,目的地称为 队列 queue
    
    在发布订阅消息传递中,目的地称为 主题 topic

            

    类比JDBC编码套路:

    第一步:注册驱动(仅仅只做一次)
    Class.forName("com.mysql.jdbc.com");
    第二步:建立连接(Connection)
    DriverManager.getConnection(url,user,password);
    第三步:创建运行SQL语句(Statement)
    connection.createStatement();
    第四步:运行语句
    rs.executeQuery(sql);
    第五步:处理结果集(ResultSet)
    第六步:释放资源
    

    3、工程创建与配置

    • IDEA创建Maven工程
    • 配置POM.xml文件

    pom.xml 依赖:

    <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.11</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
            <dependency>
                <groupId>org.apache.xbean</groupId>
                <artifactId>xbean-spring</artifactId>
                <version>4.15</version>
            </dependency>

    4、队列模式与案例讲解

    在点对点的消息传递域中,目的地被称为队列(queue)

    点对点消息传递域的特点如下:

    • 每个消息只能有一个消费者,类似于1对1的关系。好比个人快递自己领自己的。
    • 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。
    • 消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。

            

    (1)demo 队列的消费生产者

    package com.demo.activemq.queue;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    public class JmsProduce {
        private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
        private static final String QUEUE_NAME = "queue01";
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2.通过连接工厂,获得connection并启动访问
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3.创建会话session
            //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4.创建目的地(具体是队列queue还是主题topic)
            Queue queue = session.createQueue(QUEUE_NAME);
            //5.创建消息的生产者
            MessageProducer messageProducer = session.createProducer(queue);
            //6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面
            for (int i = 0; i < 3; i++) {
                //7.创建消息
                TextMessage textMessage = session.createTextMessage("msg---hello" + i);//理解为一个字符串
                //8.通过messageProducer发送给MQ队列
                messageProducer.send(textMessage);
            }
            //9.关闭资源
            messageProducer.close();
            session.close();

    以及在页面上的显示:

        

    控制说明:

    Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
    Number Of Consumers=消费者数量,消费者端的消费者数量。
    Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
    Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。
    总结:
    当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
    当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。

    (2)与之相对应的消息消费者(处理消息的系统)代码及运行

    ① 阻塞式消费者

    package com.demo.activemq.queue;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    /**
     * 简单消息消费者
     */
    public class JmsConsumer {
        private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
        private static final String QUEUE_NAME = "queue01";
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2.通过连接工厂,获得connection并启动访问
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3.创建会话session
            //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4.创建目的地(具体是队列queue还是主题topic)
            Queue queue = session.createQueue(QUEUE_NAME);
            //5.创建消息的消费者,指定消费哪一个队列里面的消息
            MessageConsumer messageConsumer = session.createConsumer(queue);
            //循环获取
            while (true) {
                //6.通过消费者调用方法获取队列里面的消息(发送的消息是什么类型,接收的时候就强转成什么类型)
                TextMessage textMessage = (TextMessage) messageConsumer.receive();
                if (textMessage != null) {
                    System.out.println("****消费者接收到的消息:  " + textMessage.getText());
                }else {
                    break;
                }
            }
            //7.关闭资源
            messageConsumer.close();
            session.close();
            connection.close();
        }
    }
    

    ②异步监听式消费者

    package com.demo.activemq.queue;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    import java.io.IOException;
    /**
     * 监听模式下的消费者
     */
    public class JmsConsumer2 {
        private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
        private static final String QUEUE_NAME = "queue01";
        public static void main(String[] args) throws JMSException, IOException {
            //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2.通过连接工厂,获得connection并启动访问
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3.创建会话session
            //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4.创建目的地(具体是队列queue还是主题topic)
            Queue queue = session.createQueue(QUEUE_NAME);
            //5.创建消息的消费者,指定消费哪一个队列里面的消息
            MessageConsumer messageConsumer = session.createConsumer(queue);
            //6.通过监听的方式消费消息
            /*
            异步非阻塞式方式监听器(onMessage)
            订阅者或消费者通过创建的消费者对象,给消费者注册消息监听器setMessageListener,
            当消息有消息的时候,系统会自动调用MessageListener类的onMessage方法
            我们只需要在onMessage方法内判断消息类型即可获取消息
             */
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if (message != null && message instanceof TextMessage) {
                        //7.把message转换成消息发送前的类型并获取消息内容
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("****消费者接收到的消息:  " + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            System.out.println("执行了39行");
            //保证控制台不关闭,阻止程序关闭
            System.in.read();
            //关闭资源
            messageConsumer.close();
            session.close();
            connection.close();
        }
    }
    

    控制台显示结果:

     

    (3)JMS开发的基本步骤:

                  

    (4)两种消费方式的比较:

    • 同步阻塞方式(receive):订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
    •  异步非阻塞方式(监听器onMessage()):订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。

    5、主题模式与案例讲解

    在发布订阅消息传递域中,目的地被称为主题(topic)

    发布/订阅消息传递域的特点如下:

    • 生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
    • 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
    • 生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。

    JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅

    (1)发布主题生产者

     
    package com.demo.activemq.topic; 
    import org.apache.activemq.ActiveMQConnectionFactory; 
    import javax.jms.*; 
    public class JmsProducer_Topic {
        public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
        public static final String TOPIC_NAME = "topic01";
     
        public static void main(String[] args) throws JMSException {
     
            //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2.通过连接工厂,获得connection并启动访问
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3.创建会话session
            //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4.创建目的地(具体是队列queue还是主题topic)
            Topic topic = session.createTopic(TOPIC_NAME);
            //5.创建消息的生产者
            MessageProducer messageProducer = session.createProducer(topic);
            //6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面
            for (int i = 0; i < 3; i++) {
                //7.通过session创建消息
                TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i);
                //8.使用指定好目的地的消息生产者发送消息
                messageProducer.send(textMessage);
            }
            //9.关闭资源
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("****TOPIC_NAME消息发布到MQ完成");
        }
    }

    控制台展示结果:

        

    (2)订阅主题消费者

    package com.demo.activemq.topic;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    import java.io.IOException;
    public class JmsConsumer_Topic {
        public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
        public static final String TOPIC_NAME = "topic01";
     
        public static void main(String[] args) throws JMSException, IOException {
            System.out.println("我是1号消费者");
            //1.创建连接工厂,按照给定的URL,采用默认的用户名密码
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //2.通过连接工厂,获得connection并启动访问
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3.创建会话session
            //两个参数transacted=事务,acknowledgeMode=确认模式(签收)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4.创建目的地(具体是队列queue还是主题topic)
            Topic topic = session.createTopic(TOPIC_NAME);
            //5.创建消息的消费者
            MessageConsumer messageConsumer = session.createConsumer(topic);
            //5.创建消息的消费者,指定消费哪一个队列里面的消息
            messageConsumer.setMessageListener(message -> {
                if (message instanceof TextMessage){
                    try {
                        String text = ((TextMessage) message).getText();
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.in.read();
        }
    }

    控制台展示结果:

    注意:先启动订阅者再启动生产者,不然发送的消息是废消息

    控制台消费结果:

    6、小总结

    重点注意:activemq 好像自带负载均衡,当先启动两个队列(Queue)的消费者时,在启动生产者发出消息,此时的消息平均的被两个消费者消费。 并且消费者不会消费已经被消费的消息(即为已经出队的消息)

    但是当有多个主题(Topic)订阅者时,发布者发布的消息,每个订阅者都会接收所有的消息。topic 更像是被广播的消息,但是缺点是不能接受已经发送过的消息。

    先要有订阅者,生产者才有意义。

  • 相关阅读:
    windows下安装python模块
    红包demo
    如何查看python 的api
    vscode 与 python 的约会
    默认构造函数
    关于重载
    转类型转换
    asm-offset.h 生成
    debian 7 安装
    emacs 定制进缩风格
  • 原文地址:https://www.cnblogs.com/wushaopei/p/12288660.html
Copyright © 2011-2022 走看看