zoukankan      html  css  js  c++  java
  • RabbitMQ(四)——工作队列模式

    RabbitMQ系列

    RabbitMQ(一)——简介

    RabbitMQ(二)——模式类型

    RabbitMQ(三)——简单模式

    RabbitMQ(四)——工作队列模式

    RabbitMQ(五)——发布订阅模式

    RabbitMQ(六)——路由模式

    RabbitMQ(七)——主题模式

    RabbitMQ(八)——消息确认

    RabbitMQ(九)——消息持久化

    RabbitMQ(十)——消息优先级

     

    前言

      工作队列模式:

        一个生产者,多个消费者,每个消费者获取到的消息唯一,当您运行多个工作线程,这些消息将在工作线程之间共享,默认轮询获取。简单的说,工作队列模式和简单模式一样,只是简单模式一个生产者一个消费者一对一,而工作队列模式一个生产者多个消费者。

                      

       这里启动三个线程,分别为生产者,消费者1,消费者2其中生产者和消费者代码与上一篇简单模式基本一致,稍作修改

    实现

      生产者:

    static void Main(string[] args)
            {
                //1.创建连接工厂
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                var connection = factory.CreateConnection();
                //3.创建管道
                var channel = connection.CreateModel();
                //4.声明队列
                channel.QueueDeclare("simple", false, false, false, null);
    
                for (int i = 0; i < 20; i++)
                {
                    string msg = $"第{i + 1}条消息";
                    //5.发布消息
                    channel.BasicPublish("", "simple", null, Encoding.UTF8.GetBytes(msg));
                    Console.WriteLine($"已发送消息:{msg}");
                    Thread.Sleep(1000);
                }
                channel.Close();
                connection.Close();
    
                Console.ReadKey();
            }
    View Code

      消费者1、2:

    static void Main(string[] args)
            {
                //初始化工厂
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //创建连接
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //声明队列
                        channel.QueueDeclare("simple", false, false, false, null);
                        //创建消费者对象
                        var consumer = new EventingBasicConsumer(channel);
    
                        consumer.Received += (model, e) =>
                        {
                            byte[] message = e.Body.ToArray();
                            Console.WriteLine("接收消息:" + Encoding.UTF8.GetString(message));
                            //返回消息确认
                            channel.BasicAck(e.DeliveryTag, false);
                        };
                        //消费者开启监听
                        channel.BasicConsume("simple", false, consumer);
    
                        Console.ReadLine();
    
                    }
                }
            }
    View Code

    结果

      这时消费者P向队列写入消息,消费者1和消费者2会公平调度,如下:

      

     

     可以看到一个消费者接收到单数消息,另一个消费者接收到双数消息。这种情况是因为RabbitMQ在进入队列后就开始分发消息,它不会去检查每个消费者是否拥有未确认的消息数量,只是盲目的给每个消费者平均分发。

    消息调度

     现在我们改变这种行为,设置BasicQos

    IModel.BasicQos(0,1,false);
    

      

    这表示让RabbitMQ不给这个消费者发送新的消息,直到消费者处理并确认了前一个消息。当消费者1的消息没确认,那么将跳过消费者1,向消费者2发送。

    消费者代码现在是这样:

    static void Main(string[] args)
            {
                //初始化工厂
                ConnectionFactory factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //创建连接
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        //声明队列
                        channel.QueueDeclare("simple", false, false, false, null);
    
                        // 告知 RabbitMQ,在未收到当前 Worker 的消息确认信号时,不再分发给消息,确保公平调度。
                        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                        //创建消费者对象
                        var consumer = new EventingBasicConsumer(channel);
    
                        consumer.Received += (model, e) =>
                        {
                            byte[] message = e.Body.ToArray();
                            Console.WriteLine("接收消息:" + Encoding.UTF8.GetString(message));
                            //返回消息确认
                            channel.BasicAck(e.DeliveryTag, false);
                        };
                        //消费者开启监听
                        channel.BasicConsume("simple", false, consumer);
    
                        Console.ReadLine();
                    }
                }
            }
    View Code

      

    可以看到,现在的消息接收是哪个消费者空闲(处理完成并确认消息)就给哪个发送消息,所以消息顺序不再是一个消费者接收单数消息,一个消费者接收双数消息。

     附上Demo地址:https://github.com/1164887865/RabbitMQDemo

  • 相关阅读:
    SharePoint 2013 安装.NET Framework 3.5 报错
    SharePoint 2016 配置工作流环境
    SharePoint 2016 站点注册工作流服务报错
    Work Management Service application in SharePoint 2016
    SharePoint 2016 安装 Cumulative Update for Service Bus 1.0 (KB2799752)报错
    SharePoint 2016 工作流报错“没有适用于此应用程序的地址”
    SharePoint 2016 工作流报错“未安装应用程序管理共享服务代理”
    SharePoint JavaScript API in application pages
    SharePoint 2016 每天预热脚本介绍
    SharePoint 无法删除搜索服务应用程序
  • 原文地址:https://www.cnblogs.com/zousc/p/12725636.html
Copyright © 2011-2022 走看看