zoukankan      html  css  js  c++  java
  • RabbitMQ及其.NET客户端——几个小例子

    一、简单生产者-消费者(使用direct交换器)

    1、生产者

    var factory = new ConnectionFactory();//实例化一个工厂
    factory.HostName = "localhost";
    factory.UserName = "honnnnl";
    factory.Password = "honnnnl";
    
    using (var connection = factory.CreateConnection())//用工厂创建连接器
    using (var channel = connection.CreateModel()) //创建信道
    {
    //在Rabbit服务上声明消息队列。如果不存在,自动创建。
    channel.QueueDeclare(
    queue: "test", //消息队列名称
    durable: false,//消息队列是否持久化
    exclusive: false,//消息队列是否被本次连接connection独享。(本次连接connection创建的信道可以共用).排外的queue在当前连接被断开的时候会自动消失(清除)无论是否设置了持久化.
    autoDelete: false,//消息队列是否自动删除。也就是说queue会清理自己,但是是在最后一个connection断开的时候。
    arguments: null);//参数对
    
    //创建一条消息,并转为字节数组。 
    string message = "Hello World";
    var body = Encoding.UTF8.GetBytes(message);
    
    //发布消息。。 交换器,路由键,
    channel.BasicPublish("", "test", null, body);//注意路由键在用direct交换器时,要指定为队列名
    
    Console.WriteLine($"发送: {message}");
    
    }

    2、消费者

    var factory = new ConnectionFactory();//实例化连接器创建工厂
    factory.HostName = "localhost";
    factory.UserName = "honnnnl";
    factory.Password = "honnnnl";
    
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare("test", false, false, false, null);
    
        var consumer = new EventingBasicConsumer(channel);//实例化一个事件型消费者
    
        //订阅消费者接收消息的事件
        consumer.Received += (model, ea) =>
        {
            //获取并解析数据
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            
            Console.WriteLine($"收到: {message}");
        };
        
    
        //信道开始消费。。   消息队列名称,     是否自动回复响应,   消费者
        channel.BasicConsume(queue: "test", autoAck: true, consumer: consumer);
    
    }

    二、简单任务队列

    1、任务发布者

    主要代码与第一节的生产者代码一样。。只不过需要将发给工作者执行的任务放到消息里。

    2、工作者

    主要代码与第一节的消费者代码一样。。只不过工作者要解析任务,执行任务。

    默认RabbitMQ会将每个消息按照顺序依次分发给下一个消费者(工作者)。所以每个消费者接收到的消息个数大致是平均的。 这种消息分发的方式称之为轮询(round-robin)。

    使用工作队列的一个好处就是它能够并行处理队列。如果任务发布的快工作者处理的慢,堆积了很多任务,我们只需要添加更多的工作者(workers)——再打开几个工作者进程就可以了,扩展很简单。

    四、消息响应

    1、为什么、如何进行消息确认

    当处理一个比较耗时得任务的时候,也许想知道消费者(consumers)是否运行到一半就挂掉。在上面的例子中,当RabbitMQ将消息发送给消费者(consumers)之后,马上就会将该消息从队列中移除。此时,如果把处理这个消息的工作者(worker)停掉,正在处理的这条消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。

    我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望该消息会重新发送给其他的工作者(worker)。

    为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)机制。也就是说消费者接收到的每一条消息都必须进行确认。要么消费者调用BasicAck()方法显式地向RabbitMQ发送一个确认,要么当初在调用BasicConsume()开始消费消息队列时,将autoAct参数设置为true。总之消费者通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ才会释放并删除这条消息。

      方式(1)写代码调用BasicAck()方法

    //在工作者代码,处理消息后
    channel.BasicAck(ea.DeliveryTag, false);//响应给RabbitMQ服务:收到并处理了消息。

      方式(2)调用BasicConsume()时,将autoAct参数设置为true

      当autoAct设置为true时,一旦消费者接收到消息,RabbitMQ自动视为其完成消息确认。

    如果消费者(consumer)挂掉前没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。

    消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

    消息响应默认是开启的。在之前的例子中使用了autoAck=True标识把它关闭。现在是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。

    1 //以下在工作者代码中
    2 //信道开始消费。。     消息队列名称,    是否自动回复响应,      消费者
    3 channel.BasicConsume(queue: "test", autoAck: false, consumer: consumer);

    2、注意

    (1)需要记住,消费者对消息的确认(消费者对RabbitMQ)和告诉生产者消息已经被接收(RabbitMQ对生产者),这两件事毫不相干。

    (2)如果消费者收到一条消息,然后在确认之前从RabbitMQ断开连接(或者从队列上取消订阅),RabbitMQ会认为这条消息没有分发成功,然后重新分发给下一个订阅的消费者。如果你的应用程序(消费者)崩溃了,这种机制可以确保消息会被发送给另一个消费者进行处理。

      另一方面假如你的应用程序(消费者)存在bug,比如忘记确认消息,RabbitMQ将不会再向该消费者发送更多消息(但不影响发给其他消费者)。这是因为在上一条消息被确认前,RabbitMQ会认为这个消费者并没有准备好接收下一条消息。你可以好好利用这一点,如果处理消息内容非常耗时,则你的应用程序应该处理完成再确认。这样可以防止RabbitMQ持续不断的消息涌向你的应用程序(消费者)而导致过载。。。

    3、拒绝消费消息

    你的消费者在收到消息后,如果遇到某种问题无法处理消息,但仍希望其他消费者处理它。你可以调用BasicReject()方法拒收该消息,方法参数requeue设置为true,RabbitMQ会将消息重新发送给下一个订阅的消费者。如果设置为false,RabbitMQ立即从消息队列中移除它(成为死信),而不会把它发给新的消费者。

    如果你的消费者收到一条消息,检测到消息格式错误(意味着其他消费者也不可能解析处理),实际上你应该直接BasicAck()确认,而不做处理(应该记录到错误日志中或者报错)。这么干(直接BasicAck()确认)确认比较节省大家时间、资源。(就不要再你拒收,再让MQ发给其他消费者,其他消费者也得拒收... 这不是浪费时间么。)

    4、死信队列dead letter

    当你的消费者调用BasicReject()方法拒收消息,且方法参数requeue设置为false时,RabbitMQ将消息从消息队列中移除,并存入死信队列(拒收或未送达的消息)。死信队列提供给消费者发现问题的途径。

    五、消息持久化

    前面已经搞定了即使消费者down掉,任务也不会丢失,但是,如果RabbitMQ Server停掉了,那么这些消息还是会丢失。当RabbitMQ Server 关闭或者崩溃,那么里面存储的队列和消息默认是不会保存下来的。

    如果要让RabbitMQ保存住消息,需要在两个地方同时设置:需要保证队列和消息都是持久化的。

    首先,要保证RabbitMQ不会丢失队列,所以要做如下设置:

    bool durable = true;
    channel.QueueDeclare("hello", durable, false, false, null);
    虽然在语法上是正确的,但是在目前阶段是不正确的,因为我们之前已经定义了一个非持久化的hello队列。RabbitMQ不允许我们使用不同的参数重新定义一个已经存在的同名队列,如果这样做就会报错。现在,定义另外一个不同名称的队列:

    bool durable = true;
    channel.queueDeclare("task_queue", durable, false, false, null);
    queueDeclare 这个改动需要在发送端和接收端同时设置。

    现在保证了task_queue这个消息队列即使在RabbitMQ Server重启之后,队列不会丢失。 下面需要保证消息也是持久化的, 这可以通过设置IBasicProperties.SetPersistent 为true来实现:

    var properties = channel.CreateBasicProperties();
    properties.SetPersistent(true);
    需要注意的是,将消息设置为持久化并不能完全保证消息不丢失。虽然他告诉RabbitMQ将消息保存到磁盘上,但是在RabbitMQ接收到消息和将其保存到磁盘上这之间仍然有一个小的时间窗口。 RabbitMQ 可能只是将消息保存到了缓存中,并没有将其写入到磁盘上。持久化是不能够一定保证的,但是对于一个简单任务队列来说已经足够。如果需要消息队列持久化的强保证,可以使用publisher confirms功能。

    六、 公平分发

    你可能也注意到了,分发机制不是那么优雅。默认状态下,RabbitMQ将第n个Message分发给第n个消费者。当然n是取余后的。它不管消费者是否还有unacked Message,只是按照这个默认机制进行分发。

    那么如果有个消费者工作比较重,那么就会导致有的消费者基本没事可做,有的消费者却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?

    通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。Qos即服务质量 。设置方法如下:

    channel.BasicQos(0, 1, false);

    注意,这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的消费者,或者创建更多的virtualHost来细化你的设计。

  • 相关阅读:
    Centos6.8下设置gitlab服务开机自启动,关闭防火墙开机自启动
    gitlab设置SSH key
    在centos6.8下安装gitlab遇到的坑
    recyclerView中的方法
    ListView中的方法
    tcp断开时分几步
    get,post区别
    cookie是什么,在什么地方会用到
    http和https的区别
    keystore是个嘛东西
  • 原文地址:https://www.cnblogs.com/DreamRecorder/p/10025850.html
Copyright © 2011-2022 走看看