zoukankan      html  css  js  c++  java
  • RabbitMQ (三) 工作队列之轮询分发

    上一篇讲了简单队列,实际工作中,这种队列应该很少用到,因为生产者发送消息的耗时一般都很短,但是消费者收到消息后,往往伴随着对高消息的业务逻辑处理,是个耗时的过程,这势必会导致大量的消息积压在一个消费者手中,从而导致业务的积压.

    所以我们需要多个消费者一起消费队列中的消息,模型如下:(为了方便讲解,暂时隐藏掉"交换机")

    生产者

    复制代码
        public class Producer
        {
            private const string QueueName = "test_work_queue";
            public static void Send()
            {
                //获取一个连接
                using (IConnection connection = ConnectionHelper.GetConnection())
                {
                    //从连接中获取一个信道
                    using (IModel channel = connection.CreateModel())
                    {
                        //声明队列
                        channel.QueueDeclare(QueueName, false, false, false, null);
    
                        for (int i = 0; i < 50; i++)
                        {
                            //创建消息
                            string msg = "hello world " + i;
                            //发送消息
                            channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                            Console.WriteLine($"{DateTime.Now} : send {msg}");
                        }
                    }
                }
            }
        }
    复制代码

    消费者1

    复制代码
        public class Consumer1
        {
            private const string QueueName = "test_work_queue";
            public static void Receive()
            {
                //获取一个连接
                IConnection connection = ConnectionHelper.GetConnection();
    
                //从连接中获取一个信道
                IModel channel = connection.CreateModel();
    
                //声明队列
                channel.QueueDeclare(QueueName, false, false, false, null);
    
                //添加消费者
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    
                //注册消费者收消息事件
                consumer.Received += (s, e) =>
                {
                    byte[] bytes = e.Body;
                    string str = Encoding.Default.GetString(bytes);
                    Console.WriteLine("consumer1 receive : " + str);
                    Thread.Sleep(500);//休息0.5秒
                };
    
                //开启消费者监听
                channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
            }
        }
    复制代码

    消费者2

    只有一点点区别:

                    Console.WriteLine("consumer2 receive : " + str);
                    Thread.Sleep(1000);//休息1秒

    我们这里故意让两个消费者处理消息的耗时不一样,一个0.5秒,一个1秒.

    我们来看看结果:

    可以非常清楚的看到,尽管两个消费者处理消息的"耗时"不一样,但是处理的"数量"是一样的.

    这里有几个细节要说明一下:

    1.在生产者和两个消费者中都声明了同一个队列.其实,如果这个队列之前已经存在了,那么生产者和消费者都可以不用再声明了;

    2.一定要先启动两个消费者,再启动生产者.原因是,我们上面的代码中,消费者的 BasicConsume 方法的第2个参数传入的是 true,

    这个参数就是 autoAck :是否自动确认(上面文章有讲过).

    所以如果先开启生产者,那么会瞬间发送完50条消息,这时候启动消费者1,那么会立刻"消费"掉这50条消息.有朋友肯定要问,不是"睡"了0.5秒么?

    这里"睡"0.5秒,是对消息的业务逻辑处理耗时,而不是"消费"消息,消息已经在消费者启动的那一刻从队列中"拿"过来了;

    同时,由于采用的是"自动确认",所以队列看到50条都被"确认"了,就会将这些消息从队列中移除.

    这时候再启动消费者2,则不会收到任何消息.

  • 相关阅读:
    testlink安装全攻略
    软件测试过程管理脑图
    VBS: FSO对象及文件读写
    最简单的NT驱动
    过DNF TP驱动保护(二)(转载)
    DebugPrint格式输出
    ObReferenceObjectByName
    最简单的WDM驱动
    设备对象(DEVICE_OBJECT)设备名称
    ObReferenceObjectByHandle内核函数
  • 原文地址:https://www.cnblogs.com/liujunjun/p/14140879.html
Copyright © 2011-2022 走看看