zoukankan      html  css  js  c++  java
  • RabbitMQ 工作队列

    1.工作队列

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Text;
    using System.Threading;
    
    namespace Worker
    {
        class Program
        {
            public static void Main(string[] args)
            {
                //1.prefetchCount  每个接收者任务上限设置
                //2.autoAck 是否自动确认任务
                Console.WriteLine("接收消息服务启动:");
                var factory = new ConnectionFactory() { HostName = "localhost" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    //durable 持久性
                    channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//告诉RabbitMQ一次不要给工人一个以上的消息
                    Console.WriteLine(" [*] Waiting for messages.");
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body.ToArray();
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] Received {0}", message);
    
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
    
                        Console.WriteLine(" [x] Done");
                        // Note: it is possible to access the channel via
                        //       ((EventingBasicConsumer)sender).Model here
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//确认当前任务完成 即队列中会删除该条任务
                    };
                    channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);//autoAck 是否自动确认任务 并删除
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
    
                Console.WriteLine("结束");
            }
    
        }
    }
    using RabbitMQ.Client;
    using System;
    using System.Text;
    
    namespace NewTask
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    //durable 持久性  设置为true rabbitmq重启后该条消息还会存在,如果为false,重启后将不会保存这条消息
                    channel.QueueDeclare(queue: "task_queue1", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    var message = GetMessage(args);
                    var body = Encoding.UTF8.GetBytes(message);
    
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
    
                    channel.BasicPublish(exchange: "",
                                         routingKey: "task_queue1",
                                         basicProperties: properties,
                                         body: body);
                }
            }
            private static string GetMessage(string[] args)
            {
                return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
            }
        }
    }

     1.循环调度

         默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环
    2.消息确认

        autoAck: true 时RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。在这种情况下,如果您杀死一个程序,我们将丢失正在处理的消息。我们还将丢失发送给该特定工作人员但尚未处理的所有消息。
        autoAck:false时用channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//确认当前任务完成

    3.讯息持久性

         我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是,如果RabbitMQ服务器停止,我们的任务仍然会丢失。

    RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告知不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久性。

    //durable 持久性  设置为true rabbitmq重启后该条消息还会存在,如果为false,重启后将不会保存这条消息
                    channel.QueueDeclare(queue: "task_queue1", durable: true, exclusive: false, autoDelete: false, arguments: null);

    4.公平派遣

        在有两名工人的情况下,当所有奇怪的消息都很重,甚至消息很轻时,一位工人将一直忙碌而另一位工人将几乎不做任何工作。好吧,RabbitMQ对此一无所知,并且仍将平均分派消息。

    发生这种情况是因为RabbitMQ在消息进入队列时才调度消息。它不会查看消费者的未确认消息数。它只是盲目地将每第n条消息发送给第n个使用者。

    为了更改此行为,我们可以将BasicQos方法与 prefetchCount = 1设置一起使用。这告诉RabbitMQ一次不要给工人一个以上的消息。换句话说,在处理并确认上一条消息之前,不要将新消息发送给工作人员。而是将其分派给不忙的下一个工作程序。

    channel.BasicQos(0, 1, false);
  • 相关阅读:
    leetcode & lintcode 题解
    部署 Flask 应用时,为什么会需要 gunicorn 或 uWSGI?
    ubuntu vim python配置
    深度学习Momentum(动量方法)
    spark shuffle原理
    c++多态特性总结
    FM/FFM原理
    hadoop streaming怎么设置key
    归一化的优点和方法
    九章算法强化
  • 原文地址:https://www.cnblogs.com/lbonet/p/14462045.html
Copyright © 2011-2022 走看看