zoukankan      html  css  js  c++  java
  • C# 消息队列之 RabbitMQ 进阶篇

    Ø  简介

    在之前的 C# 消息队列之 RabbitMQ 基础入门 中介绍了 RabbitMQ 的基本用法,其实要更全面的掌握 RabbitMQ 这个消息队列服务,我们还需要掌握以下内容:

    1.   轮询分发

    2.   消息响应

    3.   公平分发

    4.   消息持久化

     

    1)   轮询分发

    默认情况下,RabbitMQ 会按照消息的顺序依次分发给每个消费者,也就是每个消费者接收到的消息基本是平均的,这种分发方式称之为轮询分发。话不多说看示例:

    1)   生产者代码(其他代码省略)

    //随机一个“生产者”名称

    string pname = $"[P{(new Random()).Next(1, 1000)}]";

    Console.WriteLine($"生产者{pname}已启动:");

    for (int i = 0; i < 6; i++)

    {

        string message;

        if (i == 1) //第二条消息,需要耗时10

            message = $"{pname}, task{i + 1}, time of 10 seconds";

        else

            message = $"{pname}, task{i + 1}, time of 1 seconds";

        byte[] body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish("", "myQueue1", properties, body);

        Console.WriteLine($"生产者{message} {DateTime.Now.ToString("HH:mm:ss fff")}");

    }

     

    2)   消费者代码(其他代码省略)

    //随机一个“消费者”名称

    string cname = $"[C{(new Random()).Next(1, 1000)}]";

    Console.WriteLine($"消费者{cname}已开启");

    consumer.Received += (sender, e) =>

    {

        byte[] body = e.Body;   //消息字节数组

        string message = Encoding.UTF8.GetString(body); //消息内容

        Console.WriteLine($"消费者{cname}接收到消息:{message} {DateTime.Now.ToString("HH:mm:ss fff")},开始处理...");

     

        //模拟处理耗时操作

        string second = Regex.Replace(message, ".+time of ", "");

        second = Regex.Replace(second, " seconds", "");

        System.Threading.Thread.Sleep(1000 * int.Parse(second));

    };

     

    3)   运行代码

    首先,开启两个消费者,再打开一个生产者发送6条消息,运行结果如下:

    clip_image002[4]

    从以上结果中可以得出以下结论:

    1.   一共6条消息,2个消费者接收的消息数量是一致的(各3条);

    2.   尽管 Task2 消息处理时间较长,也会等待该消息处理完成之后,再处理被依次分发的消息,所以导致了 Task4 的处理时间在 Task5 之后;

    3.   同一时间段一个消费者只会处理一条消息,只有当该消息处理完成之后,才会处理下一条消息(或者说接收下一条消息),并不会同时处理多条消息。

     

    2)   消息响应

    什么叫消息响应?这问题问的clip_image003[3]

    问题:如果一个消费者在接收到消息后,处理到一半出现了异常,没有正常完成消息的处理,比如:给用户发送短信。这时 RabbitMQ 认为消息已经被接收,就将该消息删除了。这样一来是不是导致数据丢失了呢?因为没有正常的完成业务流程呐!

     

    好,这时消息响应就可以大展拳脚了,它就可以解决以上这种丢失数据的问题。就是当一条消息发送给消费者后,该消息必须得到消费者的“确认”后,RabbitMQ 才会将该消息删除。

     

    实现该功能比较简单,首先将“消费者”中的代码:

    channel.BasicConsume("myQueue1", true, consumer);

    改为

    channel.BasicConsume("myQueue1", false, consumer);  //表示开启消息响应的功能

     

    然后,在消息处理完成后回传该消息标记,添加以下代码:

    channel.BasicAck(e.DeliveryTag, false); //只有当响应此消息标记后,该消息才会在消息队列中删除

     

    3)   公平分发

    在之前的“轮询分发”模式下,似乎发现不是很合理吧?因为如果一个消费者当前正在处理比较耗时的“消息”,再次将消息发送给它,该消息就进入了等待被处理的状态。此时,另一个消费者正处于闲置状态。这样就照成了分发不合理(好比工作中:小张开发一个功能需要一周,而小王现在没事儿干,领导还会把任务分配给小张吗?肯定不会吧!)。

     

    理论就是这样的,那在 RabbitMQ 中如何去实现这样的分发机制呢,其实要借助于之前讲的“消息响应”机制。只有当消费者回传消息标记后,才会将下一个消息发送给它,否则将消息分发给其它空闲的消费者。讲了这么多,其实只需一行代码就可以完成,设置消息通道的基础 Qos 参数:

    channel.BasicQos(0, prefetchCount: 1, false);   //prefetchCount:1 表示告诉 RabbitMQ, 在未接收到消费者确认消息之前,不在分发消息

    clip_image005[4]

    从图中可以看到,Task3456没有轮询分发了,而是一直发给了比较空闲的消费者(P549),这样就达到了合理分发的目的。

     

    4)   消息持久化

    在之前的“消息响应”机制中其实隐藏了另一个功能,就是当消息发送给消费者后,未回传该消息标记情况下,该消息就不会被删除。那么这些消息就一直会保存在 RabbitMQ Server 中吗,当然不是。当 RabbitMQ Server 奔溃或者重启后,这些消息任然会丢失。要将这些消息持久化保存在磁盘中,只需修改两个地方:

    1)   设置 QueueDeclare() 方法的 durable 参数为 true

    channel.QueueDeclare("myQueue1", durable: true, false, false, null);

    注意:一个消息队列不允许有不同的参数进行设置,所以可以创建另一个消息队列,或者先删除当前消息队列在进行设置:

    channel.QueueDelete("myQueue1");    //注意:当前消息队列被删除后,正在接收该队列的消费者将再也不会接收到消息,就算再次创建同名的队列

     

    2)   设置 BasicProperties 类的 Persistent 属性为 true

    var properties = channel.CreateBasicProperties() as BasicProperties;

    properties.Persistent = true;   //默认为false

     

    l  需要注意的是,消息持久化并不代表一定不会丢失任何消息,在消息持久化的过程中也会存在一小段的时间间隔,在此之间发生 RabbitMQ 服务奔溃、服务器断电等情况,任然可能丢失少量的消息。但是在消息存储实时性没那么高的情况下,这已经足够了。如果对消息持久化有更高要求,可以使用:publisher confirms

     

    Ø  更多参考

    https://www.cnblogs.com/wudequn/p/10886733.html

    https://www.xin3721.com/ArticlecSharp/c13325.html

  • 相关阅读:
    Window如何查看cpu核数,更改CPU开启的核数?
    Mysql5.6.47开放远程访问(修改远程访问密码)
    CentOS7.6新增或修改SSH端口号的步骤
    虚拟机下安装Centos设置静态ip,并通过桥接连接
    windows下安装mysql5.6.47版本
    微软官方安装介质Windows10系统安装教程
    【测试编码URI的函数】
    【JavaScript函数】
    【JavaScript运算符与表达式】
    【JavaScript声明变量的规则】
  • 原文地址:https://www.cnblogs.com/abeam/p/11886185.html
Copyright © 2011-2022 走看看