zoukankan      html  css  js  c++  java
  • RabbitMQ详解

    消息队列:RabbitMQ

    全名为:Message Queue

    消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。就是一个先进先出的队列,只是队列中存放的是message而已,因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

    常见的MQ产品

    ActiveMQ:基于JMS

    RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

    RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会

    Kafka:分布式消息系统,高吞吐量

    • 如上所说的JMS和AMQP:

      • MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS,具体百度

      • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式

      • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。

      • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

    RabbitMQ环境搭建及相关设置

    安装Erlang

    yum install esl-erlang_17.3-1~centos~6_amd64.rpm
    yum install esl-erlang-compat-R14B-1.el6.noarch.rpm

    安装RabbitMQ

    首先安装包下载并上传:链接:https://pan.baidu.com/s/1XM24RprcaXMAFHPdctkEIw 提取码:1490

    我是上传到 /usr/local/rabbitmq/ ,你们随意;

    进入到安装包上传目录:

    rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

    修改配置文件

    #将默认的配置文件模版 复制到 etc目录下
    cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
    #编辑配置问价
    vim /etc/rabbitmq/rabbitmq.config

      

    注意:打开注解,删掉末尾的逗号,保存退出即可;

    chkconfig rabbitmq-server on   #设置为开机启动
    service rabbitmq-server start   #启动服务
    service rabbitmq-server stop    #关闭服务
    service rabbitmq-server restart #服务重启

    开启Web管理页面

    rabbitmq-plugins enable rabbitmq_management   #通过命令开启
    service rabbitmq-server restart              # 服务重启,配置生效

    端口是15672,自行开放,我是直接关闭了防火墙的;

    下面我们我们既可以王文Web管理页面:账号密码默认为:guest

    浏览器没有弹出翻译页面,我们自翻译

      

    • connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况

    • channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。

    • Exchanges:交换机,用来实现消息的路由

    • Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

      

    用户的添加

      

    用户的角色指定,对应不同权限:

    • 超级管理员(administrator)

      可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

    • 监控者(monitoring)

      可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

    • 策略制定者(policymaker)

      可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

    • 普通管理者(management)

      仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

    • 其他

      无法登陆管理控制台,通常就是普通的生产者和消费者。

    创建虚拟主机

    RabbitMQ为了实现每个用户互不干扰,通过虚拟主机的方式,不同用户使用不同的路径,各自有各自的队列、交换机

      

    虚拟机就创建好了,然后我们可以给用户分配权限:

    消息模型—基本模型

    RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,我们就说说前面五种消息模型

    基本的消息模型:

    P:消息生产者

    C:消息消费者

    queue:消息队列,消费者投递消息,消费者取出消息并消费

    •  <!--RabbitMQ-->
       <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-amqp</artifactId>
             <version>2.1.4.RELEASE</version>
       </dependency>
    • java的连接MQ工具类:

    • public class ConnectionUtil {
       //建立与RabbitMQ的连接
       public static Connection getConnection() throws Exception {
           //定义连接工厂
           ConnectionFactory factory = new ConnectionFactory();
           //设置服务地址
           factory.setHost("192.168.159.159");
           //端口
           factory.setPort(5672);
           //设置账号信息,用户名、密码、vhost
           factory.setVirtualHost("/new1");
           factory.setUsername("/admin");
           factory.setPassword("admin");
           // 通过工程获取连接
           Connection connection = factory.newConnection();
           return connection;
       }
      }

    生产者发送消息

    package com.mq.start;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    /**生产者**/
    public class send {
        //确定队列的标识
        private final static String QUEUE_NAME = "simple_queue";
    ​
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = connectionUtils.getConnection();
            // 从连接中创建通道,使用通道才能完成消息相关的操作
            Channel channel = connection.createChannel();
            // 声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 消息内容
            String message = "Hello World!";
            // 向指定的队列中发送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    ​
            System.out.println(" [服务提供者] Send '" + message + "'");
    ​
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }

    这个时候我们切换到刚刚创建的用户上 /admin 上查看信息:

      

    消费者获取消息:

    package com.mq.start;
    ​
    import com.rabbitmq.client.*;
    import java.io.IOException;
    ​
    public class get {
        //队列name  要达成通信  必须和发送的队列name 一致
        private final static String QUEUE_NAME = "simple_queue";
    ​
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = connectionUtils.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [服务消费者] get : " + msg + "!");
                }
            };
            // 监听队列,第二个参数:是否自动进行消息确认。
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    控制台打印:因为我发送了两次

     

    再次查看Web管理页面,没有消息了:

    消费者的消息确认机制

    经过刚刚的小Demo,我能发现一旦消息从队列中被消费者拉取消费后,队列中的消息就会删除,

    这里就涉及到一个MQ是通过消息确认机制知道消息何时被消费,当消费者获取到信息后,回想MQ返回一个ACK回执告知已被接受,可以删除。不过ACK回执分问两种情况:

    • 手动ACK:消息接收后,一般在消费者消费掉该消息后手动发送ACK

    • 自动ACK:消息接受后立即就会自动发送ACK

    至于如何选择:根据信息的重要程度区分

    • 消息不太重要,即使丢失影响也不大,自动ACK比较巴适

    • 消息很重要,不允许丢失,那就等我们消费者消费完这个信息后手动发送回执

    java实现:部分实现

    DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                    String msg = new String(body);
                    System.out.println(" [服务消费者] get1 : " + msg + "!");
                    //在消息消费完后,手动发送ACK回执给MQ
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 监听队列,第二个参数:是否自动进行消息确认。
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }

    消息模型—work消息模型 [ 任务模型 ]

    当消息处理比较耗时的时候,可呢个生产消息的速度回远远大于消息的消费速度,随着时间的推移,队列中的消息就会堆积如山无法及时的处理,此时work模型横空出世,让多个消费者绑定到一个队列上,共同消费同一个队列中的消息

    • 一个生产者,一个队列,两个或者更多的消费者

    消息的生产者:连续发送50个消息去队列

    package com.mq.start.work模型;
    ​
    import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    ​
    public class send {
    private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循环发布任务 for (int i = 0; i < 50; i++) { // 消息内容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); ​ Thread.sleep(i * 2); } // 关闭通道和连接 channel.close(); connection.close(); } }

    两个消费者:一次只能处理接收一个消息处理:

    package com.mq.start.work模型;
    import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    /**
     * get1消费者有Thread.sleep(1000),模拟更耗时
     */
    public class get1 {
        private final static String QUEUE_NAME = "test_work_queue";
    ​
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = connectionUtils.getConnection();
            // 获取通道
            final Channel channe1 = connection.createChannel();
            // 声明队列
            channe1.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channe1) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] get : " + msg + "!");
                    // 手动ACK
                    channe1.basicAck(envelope.getDeliveryTag(), false);
                     try {
                        //模拟这个消费者消费一个消息很耗时
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            // 监听队列。
            channe1.basicConsume(QUEUE_NAME, false, consumer);    
    }
    package com.mq.start.work模型;
    import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    /**
     * get2处理比价快
     */
    public class get2 {
        private final static String QUEUE_NAME = "test_work_queue";
    ​
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = connectionUtils.getConnection();
            // 获取通道
            final Channel channe2 = connection.createChannel();
            // 声明队列
            channe2.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channe2) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] get : " + msg + "!");
                    // 手动ACK
                    channe2.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 监听队列。
            channe2.basicConsume(QUEUE_NAME, false, consumer);
        }
    }

    温馨提示:优先启动两个消费者,随后再启动消息发布者

    然后我们看下面的控制台:get1慢吞吞的在消费,get2快速的消费完便在休息了,一人消费一半

    • 在上面这种情况下,消费者get1的消费效率是要比消费者get2的效率要低的

    • 可是两个消费者最终的消费消费的信息数量确实一样的,是任务均分的;

    • 消费者get1一直在忙碌于消费,消费者get2处理完分配的一半后便处于空闲状态

    能者多劳

    消费者同一时间只会接受一条消息,在处理完之前不会接新的消息,让处理快的人接受更多的消息:

    两个消费者都修改设置如下:

    // 设置每个消费者同时只能处理一条消息
      channel.basicQos(1);

    让我们看看效果如何:

    消息模型—订阅模型

    示意图:

      

    • P:生产者,发送消息给X(交换机)

    • Exchange:交换机,图中的X。接收生产者发送的消息。知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:

      • Fanout:广播,将消息交给所有绑定到交换机的队列

      • Direct:定向,把消息交给符合指定routing key 的队列

      • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

    • Queue:消息队列,接收消息、缓存消息。

    • C:消费者,消息的消费者,会一直等待消息到来。

    注意:交换机只负责转发消息,不具备消息储存的能力,如果没有队列与其进行对接,消息会丢失

    消息模型—订阅模型—广播 [ Fanout ]

    流程图:

      

    在广播模式下,消息发送流程是这样的:

    • 1) 可以有多个消费者

    • 2) 每个消费者有自己对接的queue(队列)

    • 3) 每个队列都要对接到Exchange(交换机)

    • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

    • 5) 交换机把消息发送给绑定过的所有队列

    • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

    生产者:

    package com.mq.start.订阅模型_广播;
    import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    ​
    public class send {
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    ​
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = connectionUtils.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
    ​
            // 声明exchange,指定类型为fanout
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    ​
            // 消息内容
            String message = "四川新闻广播电视台为你播报:今天...";
            // 发布消息到Exchange
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [生产者] Send '" + message + "'");
    ​
            channel.close();
            connection.close();
        }
    }

    消费者1:

    package com.mq.start.订阅模型_广播;
    import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    ​
    public class get1 {
        
        private final static String QUEUE_NAME = "fanout_queue_1";
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    ​
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = connectionUtils.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    ​
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    ​
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                }
            };
            // 监听队列,自动返回完成
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    消费者2:

    package com.mq.start.订阅模型_广播;
    import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    ​
    public class get2 {
       
        private final static String QUEUE_NAME = "fanout_queue_2";
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    ​
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = connectionUtils.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    ​
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    ​
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                }
            };
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    然后查看控制台:一条消息被所有订阅的队列都消费

    消息模型—订阅模型— [ Direct ]

    广播是一条消息被所有与交换机对接的队列都消费,但有时候,我们想不同的信息被不同的队列说消费,这是就要用到Direct类型的交换机

      

    • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

    • C1:消费者,其所在队列指定了需要routing key 为 error 的消息

    • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

    在Direct模式下:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

    • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

    • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

    消息生产者:

    package com.mq.start.订阅模型_Direct;
    import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    /**
     * 我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete
     */
    public class send {
        
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    ​
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = connectionUtils.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明exchange,指定类型为direct
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            // 消息内容
            String message = "商品新增, id = 1001";
            
            // 发送消息,并且指定routing key 为:insert ,代表新增商品
            channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
           
            System.out.println(" [商品服务:] Send '" + message + "'");
            channel.close();
            connection.close();
        }
    }

    记住我们的roting key 是insert噢!

    消息消费者1:get1 ,他能接受routing key 为 "update"、"delete"的消息

    package com.mq.start.订阅模型_Direct;
    ​import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.*;
    ​import java.io.IOException;
    ​
    public class get1 {
    private final static String QUEUE_NAME = "direct_exchange_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); ​ // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者1] git : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }

    消息消费者2:get2,他能接受routing key 为 "insert"、"update"、"delete" 的消息

    package com.mq.start.订阅模型_Direct;
    import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    ​
    public class get2 {
       
        private final static String QUEUE_NAME = "direct_exchange_queue_2";
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    ​
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = connectionUtils.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    ​
            // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    ​
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] get : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    我们分别设置routing key 为 insert、update、delete,逐一测试:

    其他我们就执行测试吧,我就测了routing key为insert,最终被get2所消费

    消息模型—订阅模型— [ Topic ]

    Topic类型yuDirect相比,其实差不多的,都是根据rounting key 把消息路由到不同的队列,就是Topic类型的交换机支在匹配的时候支持rounting key的通配符

    通配符规则:

    #:匹配一个或多个词

    *:匹配不多不少恰好1个词

    举例:

    audit.#:能够匹配audit.irs.corporate或者 audit.irs

    audit.*:只能匹配audit.irs

    消息生产者:Rounting key 为: item.insert / update / delete

    package com.mq.start.订阅模型_Topic;
    import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    ​
    public class send {
    private final static String EXCHANGE_NAME = "topic_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息内容 String message = "新增商品 : id = 1001"; // 发送消息,并且指定routing key 为:insert ,代表新增商品 channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes()); System.out.println(" [商品服务:] Send '" + message + "'"); ​ channel.close(); connection.close(); } }

    消息消费者1:get1,匹配的Rounting Key为 item.update / delete

    package com.mq.start.订阅模型_Topic;
    import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.*;
    import java.io.IOException;
    ​
    public class get1 {
        
        private final static String QUEUE_NAME = "topic_exchange_queue_1";
        private final static String EXCHANGE_NAME = "topic_exchange_test";
    ​
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = connectionUtils.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    ​
            // 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
    ​
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [Get1] get : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    消息消费者2:get2 ,通过通配符的方式,消费所有item打头,后拼一个单词的所有消息

    package com.mq.start.订阅模型_Topic;
    import com.mq.start.utils.connectionUtils;
    import com.rabbitmq.client.*;
    ​import java.io.IOException;
    ​
    public class get2 {
    private final static String QUEUE_NAME = "topic_exchange_queue_2"; private final static String EXCHANGE_NAME = "topic_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*"); ​ // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 ​ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [get2] get : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }

    自测,效果如出一辙

    如何避免消息丢失

    手动ACK

    消费者的手动ACK机制,可有效的避免消息的丢失

    消息持久化

    若想支持消息持久化,队列和交换机都得持久化

    交换机的持久化:

    // 声明exchange,指定类型为topic,其后跟一个true参数目标是开启交换机的持久化
    channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);

    队列的持久化:

    // 声明队列,第二个参数表示是否开启队列持久化
    channe1.queueDeclare(QUEUE_NAME, false, false, false, null);

    消息持久化:

    // 发送消息,并且指定routing key 为:insert ,第三个参数表示开启信息持久化
    channel.basicPublish(EXCHANGE_NAME, "item.update",MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

    Spring AMQP

    Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

    依赖和配置:

    pom.xml

     <properties>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.3.2</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
        </dependencies>

    application.yml

    spring:
      rabbitmq:
        host: 192.168.159.159
        username: /admin
        password: admin
        virtual-host: /new1
        template:                           #有关Template的配置
          retry:                            #失败重试
            enabled: true                   #失败重试_开启失败重试
            initial-interval: 10000ms       #失败重试_第一次重试的间隔时长
            max-interval: 300000ms          #失败重试_最长重试间隔
            multiplier: 2                   #失败重试_下次重试间隔的倍速
          exchange: spring.test.exchange    #指定交换机,发送消息若不指定交换机就使用配置的交换机
        publisher-confirms: true            #生产者确认机制,确保消息正确发送,发送失败会有错误回执

    服务的监听者:在SpringAMQP中,普通方法 + 注解,就可以成为一个消费者。

    package com.mq.start.SpringAMQP;
    ​import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    ​
    /**
     * 这是一个消费者 / 监听者
     */
    @Component
    @RabbitListener(queues = "spring.test.queue" )
    public class Listener {
    ​
        /**
         * - @Componet`:类上的注解,注册到Spring容器
         * - `@RabbitListener`:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
         *   - `bindings`:指定绑定关系,可以有多个。值是`@QueueBinding`的数组。`@QueueBinding`包含下面属性:
         *     - `value`:这个消费者关联的队列。值是`@Queue`,代表一个队列
         *     - `exchange`:队列所绑定的交换机,值是`@Exchange`类型
         *     - `key`:队列和交换机绑定的`RoutingKey`
         */
    ​
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "spring.test.queue", durable = "true"),
                exchange = @Exchange(
                        value = "spring.test.exchange",
                        ignoreDeclarationExceptions = "true",
                        type = ExchangeTypes.TOPIC
                ),
                key = {"#.#"}))
        public void listen(String msg){
            System.out.println("接收到消息:" + msg);
        }
    }

    消息的发送者:AmqpTemplate

    Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:

    package com.mq.start.SpringAMQP;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    ​
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class mqSend {
    ​
        @Autowired
        private AmqpTemplate amqpTemplate;
    ​
        @Test
        public void testSend() throws InterruptedException {
            String msg = "hello, Spring boot amqp";
            this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
            // 等待10秒后再结束
            Thread.sleep(10000);
        }
    }

    外加一个SpringBoot项目的启动类:

    package com.mq.start;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    ​
    @SpringBootApplication
    public class Run {
        public static void main(String[] args) {
            SpringApplication.run(Run.class, args);
        }
    }

    内容就这么多,整合就算完成,我们首先启动SpringBoot项目,然后启动测试类生产消息,消息的监听者自会监听到消息后处理:

  • 相关阅读:
    override new virtual 的比较
    c#页面无内容解决方案
    插入排序
    排序算法(转)
    treenode遍历文件夹
    案例篇(1)
    索引器(转)
    迭代器的实现
    抽象类和接口的区别
    索引器与迭代器,属性的区别
  • 原文地址:https://www.cnblogs.com/msi-chen/p/10502097.html
Copyright © 2011-2022 走看看