zoukankan      html  css  js  c++  java
  • RabbitMQ(四)——工作队列模式

    RabbitMQ系列

    RabbitMQ(一)——简介

    RabbitMQ(二)——模式类型

    RabbitMQ(三)——简单模式

    RabbitMQ(四)——工作队列模式

    RabbitMQ(五)——发布订阅模式

    RabbitMQ(六)——路由模式

    RabbitMQ(七)——主题模式

    RabbitMQ(八)——消息确认

    RabbitMQ(九)——消息持久化

    RabbitMQ(十)——消息优先级

     

    前言

      工作队列模式:

        一个生产者,多个消费者,每个消费者获取到的消息唯一,当您运行多个工作线程,这些消息将在工作线程之间共享,默认轮询获取。简单的说,工作队列模式和简单模式一样,只是简单模式一个生产者一个消费者一对一,而工作队列模式一个生产者多个消费者。

                      

       这里启动三个线程,分别为生产者,消费者1,消费者2其中生产者和消费者代码与上一篇简单模式基本一致,稍作修改

    实现

      生产者:

    static void Main(string[] args)
            {
                //1.创建连接工厂
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                var connection = factory.CreateConnection();
                //3.创建管道
                var channel = connection.CreateModel();
                //4.声明队列
                channel.QueueDeclare("simple", false, false, false, null);
    
                for (int i = 0; i < 20; i++)
                {
                    string msg = $"第{i + 1}条消息";
                    //5.发布消息
                    channel.BasicPublish("", "simple", null, Encoding.UTF8.GetBytes(msg));
                    Console.WriteLine($"已发送消息:{msg}");
                    Thread.Sleep(1000);
                }
                channel.Close();
                connection.Close();
    
                Console.ReadKey();
            }
    View Code

      消费者1、2:

    static void Main(string[] args)
            {
                //初始化工厂
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //创建连接
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //声明队列
                        channel.QueueDeclare("simple", false, false, false, null);
                        //创建消费者对象
                        var consumer = new EventingBasicConsumer(channel);
    
                        consumer.Received += (model, e) =>
                        {
                            byte[] message = e.Body.ToArray();
                            Console.WriteLine("接收消息:" + Encoding.UTF8.GetString(message));
                            //返回消息确认
                            channel.BasicAck(e.DeliveryTag, false);
                        };
                        //消费者开启监听
                        channel.BasicConsume("simple", false, consumer);
    
                        Console.ReadLine();
    
                    }
                }
            }
    View Code

    结果

      这时消费者P向队列写入消息,消费者1和消费者2会公平调度,如下:

      

     

     可以看到一个消费者接收到单数消息,另一个消费者接收到双数消息。这种情况是因为RabbitMQ在进入队列后就开始分发消息,它不会去检查每个消费者是否拥有未确认的消息数量,只是盲目的给每个消费者平均分发。

    消息调度

     现在我们改变这种行为,设置BasicQos

    IModel.BasicQos(0,1,false);
    

      

    这表示让RabbitMQ不给这个消费者发送新的消息,直到消费者处理并确认了前一个消息。当消费者1的消息没确认,那么将跳过消费者1,向消费者2发送。

    消费者代码现在是这样:

    static void Main(string[] args)
            {
                //初始化工厂
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //创建连接
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //声明队列
                        channel.QueueDeclare("simple", false, false, false, null);
    
                        // 告知 RabbitMQ,在未收到当前 Worker 的消息确认信号时,不再分发给消息,确保公平调度。
                        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                        //创建消费者对象
                        var consumer = new EventingBasicConsumer(channel);
    
                        consumer.Received += (model, e) =>
                        {
                            byte[] message = e.Body.ToArray();
                            Console.WriteLine("接收消息:" + Encoding.UTF8.GetString(message));
                            //返回消息确认
                            channel.BasicAck(e.DeliveryTag, false);
                        };
                        //消费者开启监听
                        channel.BasicConsume("simple", false, consumer);
    
                        Console.ReadLine();
                    }
                }
            }
    View Code

      

    可以看到,现在的消息接收是哪个消费者空闲(处理完成并确认消息)就给哪个发送消息,所以消息顺序不再是一个消费者接收单数消息,一个消费者接收双数消息。

     附上Demo地址:https://github.com/1164887865/RabbitMQDemo

  • 相关阅读:
    RootMotionComputer 根运动计算机
    tar压缩解压缩命令详解
    解决有关flask-socketio中服务端和客户端回调函数callback参数的问题
    flask-sqlalchemy中Datetime的创建时间、修改时间,default,server_default,onupdate
    sqlalchemy和flask-sqlalchemy的几种分页方法
    Flask路由报错:raise FormDataRoutingRedirect(request)
    解决Python自带的json不能序列化data,datetime类型数据问题
    Python中将字典转换为有序列表、无序列表的方法
    flask-sqlalchemy 一对一,一对多,多对多操作
    python2 UnicodeDecodeError: 'ascii' codec can't decode byte 0xce in position 7: ordinal not in range(128)
  • 原文地址:https://www.cnblogs.com/zousc/p/12725636.html
Copyright © 2011-2022 走看看