zoukankan      html  css  js  c++  java
  • rabbitmq ACK消息确认机制

    代码示例

    生产者 p.php

    <?php
    // 生产者 p.php
    //配置信息
    $config = [
        'host'     => 'localhost',
        'port'     => '5672',
        'login'    => 'guest',
        'password' => 'guest',
        'vhost'    => '/'
    ];
    
    $exchangeName = 'e_1';
    $queueName    = 'q_1';
    $routeKey     = 'order';
     
    //创建连接和channel
    $connect = new AMQPConnection($config);
    
    if (!$connect->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    
    $channel  = new AMQPChannel($connect);
    $exchange = new AMQPExchange($channel);
    
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_DIRECT);
    
    // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
    $exchange->setFlags(AMQP_DURABLE);
    
    // 声明交换机
    $exchange->declareExchange();
    
    // 创建消息队列
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    
    // 设置持久性
    $queue->setFlags(AMQP_DURABLE);
    
    // 声明消息队列
    $queue->declareQueue();
    
    $queue->bind($exchange->getName(), $routeKey);
    
    // 向服务器队列推送10条消息
    for ($i = 0; $i < 10; $i++) {
        $msg = 'hello world ' . $i;
        $exchange->publish($msg, $routeKey, AMQP_NOPARAM, ['delivery_mode' => 2]);
    }
    

      

    消费者 c.php

    <?php
    // 消费者 c.php
    //配置信息
    $config = [
        'host'     => 'localhost',
        'port'     => '5672',
        'login'    => 'guest',
        'password' => 'guest',
        'vhost'    => '/'
    ];
    
    $exchangeName = 'e_1';
    $queueName    = 'q_1';
    $routeKey     = 'order';
     
    //创建连接和channel
    $connect = new AMQPConnection($config);
    
    if (!$connect->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    
    $channel  = new AMQPChannel($connect);
    $exchange = new AMQPExchange($channel);
    
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_DIRECT);
    
    // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
    $exchange->setFlags(AMQP_DURABLE);
    
    // 声明交换机
    $exchange->declareExchange();
    
    // 创建消息队列
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    
    // 设置持久性
    $queue->setFlags(AMQP_DURABLE);
    
    // 声明消息队列
    $queue->declareQueue();
    
    $queue->bind($exchange->getName(), $routeKey);
    
    // 接收消息并处理回调
    $queue->consume('receive');
    
    // 处理回调的方法
    function receive($envelop, $queue){
        echo $envelop->getBody() . "
    ";
        $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
    }
    

    学习ack机制前,先了解ack机制使用和不使用两种情况下区别,这里我们在生产者中向服务器循环推送10条消息

    消费者使用ack机制

    // 接收消息并处理回调
    $queue->consume('receive');
    
    // 处理回调的方法
    function receive($envelop, $queue){
        echo $envelop->getBody() . "
    ";
    
        // 通知生产者任务完成
        $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
    }
    

    不使用ack机制

    通过对比两种执行结果,我们可以得知,服务器会连续推送3个消息,没有收到消费者返回时,停止向该消费者推送消息

    在RabbitMQ中有一个prefetch_count的概念,这个参数的意思是允许Consumer最多同时处理几个任务。我的版本的RabbitMQ默认这个参数是3,也就是说如果某一个Consumer在收到消息后没有发送ACK确认包,RabbitMQ就会任务Consumer还在处理任务,当有3个消息都没有发送ACK确认包时,RabbitMQ就不会再发送消息给该Consumer。

     在控制台确实3条消息未确认,7条准备发送

    如果Consumer数量很多或者希望每个Consumer同时只处理一个任务可以通过在Consumer中设置PrefetchCount来实现更加均匀的任务分发。

    $channel = new AMQPChannel($connection);
    $channel->setPrefetchCount(1);

    以上情况是只一个消费者,如果我们有两个相同的消费者订阅相同的队列,那么服务器又是如何将消息推送给两个消费者的呢?

    复制出 c1.php、 c2.php两个消费者

    c1.php 和 c2.php 都开启ACK消息确认机制,

    控制台 显示有两个连接

    执行结果

     可以看出服务器是平均将任务发给两个生产者

    第二种情况,假如c2.php出故障或者代码错误导致ACk机制失效。

     

    下面关掉c2.php脚本,这样c2.php和服务器断开连接

     

    可以得知 c2.php 没有返回消息确认时,这三个消息是Unacked状态,其他7个消息由c1.php成功执行时,当c2.php断开连接时,服务器将Unacked消息交由c1.php完成。

  • 相关阅读:
    caffe BUG
    Ubuntu安装低版本gcc
    OpenCV的CV :: findHomography运行时错误
    opencv中使用 SURF算法匹配的遇到的问题
    C++ 类中特殊的成员变量(常变量、引用、静态)的初始化方法 --转
    SIFI与surf
    opencv复习
    opencv老版本的IplImage使用
    python学习笔记-对象持久化保存与恢复
    Styles and Themes
  • 原文地址:https://www.cnblogs.com/xiangdongsheng/p/14258421.html
Copyright © 2011-2022 走看看