zoukankan      html  css  js  c++  java
  • RabbitMQ六种队列模式-工作队列模式

    前言

    RabbitMQ六种队列模式-简单队列
    RabbitMQ六种队列模式-工作队列 [本文]
    RabbitMQ六种队列模式-发布订阅
    RabbitMQ六种队列模式-路由模式
    RabbitMQ六种队列模式-主题模式

    上文我们了解了 RabbitMQ 六种队列模式中的简单队列,代码也是非常的简单,比较容易理解。

    但是简单队列有个缺点,简单队列是一一对应的关系,即点对点,一个生产者对应一个消费者,按照这个逻辑,如果我们有一些比较耗时的任务,也就意味着需要大量的时间才能处理完毕,显然简单队列模式并不能满足我们的工作需求,我们今天再来看看工作队列。

    文章目录

    1. 什么是工作队列2. 代码部分2.1 生产者2.2 消费者3. 循环分发3.1 启动生产者3.2 启动两个消费者3.3 公平分发4. 消息持久化4.1 问题背景4.2 参数配置5. 工作队列总结

    1. 什么是工作队列

    工作队列:用来将耗时的任务分发给多个消费者(工作者)

    主要解决问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。

    工作队列也称为公平性队列模式,怎么个说法呢?

    循环分发,假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询。

    看代码吧。

    2. 代码部分

    2.1 生产者

    创建50个消息

    public class Producer2 {

        /** 队列名称 */
        private static final String QUEUE_NAME = "test_queue";

        public static void main(String[] args) throws IOException, TimeoutException {
            /** 1.获取连接 */
            Connection newConnection = MQConnectionUtils.newConnection();
             /** 2.创建通道 */
            Channel channel = newConnection.createChannel();
             /**3.创建队列声明 */
            channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
             /**保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
            channel.basicQos(1);
            for (int i = 1; i <= 50; i++) {
                String msg = "生产者消息_" + i;
                System.out.println("生产者发送消息:" + msg);
             /**4.发送消息 */
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            }
            channel.close();
            newConnection.close();
        }

    }

    2.2 消费者

    public class Customer2_1 {

        /**
         * 队列名称
         */

        private static final String QUEUE_NAME = "test_queue";

        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("001");
            /** 1.获取连接 */
            Connection newConnection = MQConnectionUtils.newConnection();
            /** 2.获取通道 */
            final Channel channel = newConnection.createChannel();
            channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
            /** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
            channel.basicQos(1);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException 
    {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {

                    } finally {
                        /** 手动回执消息 */
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            /** 3.监听队列 */
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        }

    }

    3. 循环分发

    3.1 启动生产者

    3.2 启动两个消费者

    在生产者中我们发送了50条消息进入队列,而上方消费者启动图里很明显的看到轮询的效果,就是每个消费者会分到相同的队列任务。

    3.3 公平分发

    由于上方模拟的是非常简单的消息队列的消费,假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。

    再举一个例子,一个1年的程序员,跟一个3年的程序员,分配相同的任务量,明显3年的程序员处理起来更加得心应手,很快就无所事事了,但是3年的程序员拿着非常高的薪资!显然3年的程序员应该承担更多的责任,那怎么办呢?

    公平分发。

    其实发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量,类似于TCP/UDP中的UDP,面向无连接。

    因此我们可以使用 basicQos 方法,并将参数 prefetchCount 设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。

    关键性代码:

    /** 2.获取通道 */
    final Channel channel = newConnection.createChannel();
    channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
    /** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
    channel.basicQos(1);

    4. 消息持久化

    4.1 问题背景

    上边我们提到的公平分发是由消费者收取消息时确认解决的,但是这里面又会出现被 kill 的情况。

    当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

    这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

    但是在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。

    怎么办呢?

    4.2 参数配置

    参数配置一:生产者创建队列声明时,修改第二个参数为 true

    /**3.创建队列声明 */
    channel.queueDeclare(QUEUE_NAME, truefalsefalsenull);

    参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN

    for (int i = 1; i <= 50; i++) {
        String msg = "生产者消息_" + i;
        System.out.println("生产者发送消息:" + msg);
        /**4.发送消息 */
        channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    }

    5. 工作队列总结

    1、循环分发:消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息。

    2、消息持久化:服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证RabbitMQ重启,队列和消息也不会丢失。

    3、公平分发:指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题。

    案例代码:https://www.lanzous.com/i5ydu6d

    18年专科毕业后,期间一度迷茫,最近我创建了一个公众号用来记录自己的成长。 
  • 相关阅读:
    网页a标签:导航制作 怎么让鼠标经过A标签的时候显示块状背景?
    从头开始,慢慢来,今天工作日志
    想看所有的美国系列电影
    百分比宽度并排元素浮动之后,设置margin,padding换行的问题
    TP5.1 首页路由
    关于Layui 响应式移动端轮播图的问题
    BootStrap 栅格化换行问题
    VS code 格式化插件, 仅需一步, 无须配置
    PHPStorm 批量选择,多光标同时编辑相同的内容
    使用Cmder 安装 Composer 出现 "attempt to call a nil value"
  • 原文地址:https://www.cnblogs.com/niceyoo/p/11448106.html
Copyright © 2011-2022 走看看