zoukankan      html  css  js  c++  java
  • RabbitMQ五:生产者--队列--多消费者

    一、生成者-队列-多消费者(前言)

     上篇文章,我们做了一个简单的Demo,一个生产者对应一个消费者,本篇文章就介绍 生产者-队列-多个消费者,下面简单示意图

     P 生产者    C 消费者  中间队列

     需求背景:工厂某部门需要生产n个零件,部门下面有2个小组,每个小组需要生产n/2个

    公平派遣

    每个小组的情况下,当所有奇怪的信息都很重,甚至信息很轻的时候,一个工作人员将不断忙碌,另一个工作人员几乎不会做任何工作。那么,RabbitMQ不知道什么,还会平均分配消息。

    这是因为当消息进入队列时,RabbitMQ只会分派消息。它不看消费者的未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。

    下面就由我们撸代码实现,这一需求::::

    二、代码

    P 生产者代码::: 

     static void Main(string[] args)
            {
                using (var channel = HelpConnection.GetConnection().CreateModel())
                {
                    //声明队列  
                    channel.QueueDeclare("firstQueue", true, false, false, null);
                    //声明路由
                    channel.ExchangeDeclare("firstExchange", "direct", true, false, null);
                    //绑定 建立关系
                    channel.QueueBind("firstQueue", "firstExchange", "firstQueue_Exchange");
                    //内容的基本属性
                   var properties=channel.CreateBasicProperties();
                    //设置消息内容持久化
                   properties.Persistent = true;
                    int j = 0;
                    for (int i = 0; i < 100; i++)
                    {
                        var msg = Encoding.UTF8.GetBytes("生产者-队列-多个消费者" + i);
                        channel.BasicPublish(exchange: "firstExchange",
                                             routingKey: "firstQueue_Exchange",
                                             basicProperties: properties,
                                             body: msg);
                        j = i;
                        Console.WriteLine( i);
                    }
                    Console.WriteLine("添加成功" + j + "");
                    Console.ReadKey();
                }
            }

    成功添加100条

     C 消费者代码::: 

     /// <summary>
            /// 
            /// </summary>
            /// <param name="args"></param>
            static void Main(string[] args)
            {
                using (var channel = HelpConnection.GetConnection().CreateModel())
                {
                    //声明队列
                    channel.QueueDeclare("firstQueue", true, false, false, null);
                    //声明路由
                    channel.ExchangeDeclare("firstExchange", "direct", true, false, null);
                    //绑定 建立关系
                    channel.QueueBind("firstQueue", "firstExchange", "firstQueue_Exchange");
    
                    //公平分发 同一时间只处理一个消息
                    channel.BasicQos(0, 1, true);
                    var conSumer = new EventingBasicConsumer(channel);
                    conSumer.Received += (moede, e) =>
                    {
                        var body = e.Body;
                        var msg = Encoding.UTF8.GetString(body);
                        Console.WriteLine("显示结果:"+msg);
                        //进行交付,确定此消息已经处理完成
                       // channel.BasicAck( e.DeliveryTag,  false);
                    };
                    //确认收到消息    进行消费
                    channel.BasicConsume("firstQueue", true, conSumer);//false 手动应答;true:自动应答
                  
                    Console.ReadKey();
                }
            }

    效果图(特意建立好几个项目,同事启动进行测试)

    三、总结

     本章总结注意几点:::

    1、即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true。

       var properties = channel.CreateBasicProperties();

      properties.Persistent = true;

    2、公平分发同一时间只处理一个消息

    channel.BasicQos(0,1,false)
    • 博主是利用读书、参考、引用、抄袭、复制和粘贴等多种方式打造成自己的纯镀 24k 文章,请原谅博主成为一个无耻的文档搬运工!
    • 小弟刚迈入博客编写,文中如有不对,欢迎用板砖扶正,希望给你有所帮助。
  • 相关阅读:
    自考过后的总结——如何快乐学习?
    自考总结——数据库原理第三章
    机房收费系统——用户权限和功能分析
    SQL视频总结
    学生信息管理系统总结——数据库的访问方式
    学习信息管理系统总结——数据库的连接和访问(一)
    学生信息管理系统总结——student数据库中表关系分析
    Kafka-文件管理
    Kafka-分区分配规则
    Kafka-处理请求(生产请求、获取请求)
  • 原文地址:https://www.cnblogs.com/lrzr/p/7290253.html
Copyright © 2011-2022 走看看