zoukankan      html  css  js  c++  java
  • PHP RabbitMQ实现简单的延迟队列

    1.TTL+死信队列(DLX)实现

    TTL(x-message-ttl)是指队列中的消息在丢弃之前的可存活时间。死信队列是放置没有被成功消费且超过了TTL生存时间消息的队列,如果消息没有在指定的TTL时间内被成功消费,并且给需要延迟执行的队列绑定了死信交换机和死信队列,将信息publish到死信交换机中后可被绑定交换机的死信队列消费,利用这一特性可以实现延迟队列。

    消息队列中的消息会在一下几种情况下变成死信

    消息被拒绝(basic.reject / basic.nack),并且requeue = false;
    消息TTL过期;
    队列达到最大长度;
    

    在声明被延迟的任务队列前,需要配置如下参数。x-message-ttl设置队列中消息的生存期,超过这个时间消息将变成死信,也可以在单条消息publish的时候设置ttl,rabbitmq会取两者中较小者。

    $arguments = [
        'x-message-ttl' => 6000, //消息在丢弃之前的可存活时间
        'x-dead-letter-exchange' => $deadExchangeName, //死信发送的交换机名字
        'x-dead-letter-routing-key' => $deadRouteKey, //死信的路由键
    ];
    $queue->setArguments($arguments);
     
    

     

    消费者代码

    //创建连接和channel
    $connect = new AMQPConnection($config);
    
    if (!$connect->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    
    $channel  = new AMQPChannel($connect);
    
    //**********************创建一个用于存放死信的交换机和队列*************
    $deadExchangeName = 'dead_exchange';
    $deadQueueName    = 'delayed_order';
    $deadRouteKey     = 'delayed_order';
    
    $deadExchange = new AMQPExchange($channel);
    $deadExchange->setName($deadExchangeName);
    $deadExchange->setType(AMQP_EX_TYPE_DIRECT);
    $deadExchange->declareExchange();
    
    $deadQueue = new AMQPQueue($channel);
    $deadQueue->setName($deadQueueName);
    $deadQueue->declareQueue();
    $deadQueue->bind($deadExchange->getName(), $deadRouteKey);
    
    
    //***********************创建被延迟的交换机和消息队列********************
    $exchangeName = 'exchange1';
    $queueName    = 'order';
    $routeKey     = 'order';
    $exchange = new AMQPExchange($channel);
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_DIRECT);
    
    // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
    // $exchange->setFlags(AMQP_DURABLE);
    
    // 声明交换机
    $exchange->declareExchange();
    
    // 创建消息队列
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    $arguments = [
        'x-message-ttl' => 6000,
        'x-dead-letter-exchange' => $deadExchangeName, //死信发送的交换机
        'x-dead-letter-routing-key' => $deadRouteKey, //死信routeKey
    ];
    
    // 设置持久性
    // $queue->setFlags(AMQP_DURABLE);
    
    $queue->setArguments($arguments);
    // 声明消息队列
    $queue->declareQueue();
    
    $queue->bind($exchange->getName(), $routeKey);
    
    // 向服务器队列推送10条消息
    $msg = 'hello world 1';
    $exchange->publish($msg, $routeKey, AMQP_NOPARAM, ['delivery_mode' => 2]);
    

    生产者代码

    $exchangeName = 'dead_exchange';
    $queueName    = 'delayed_order';
    $routeKey     = 'delayed_order';
     
    //创建连接和channel
    $connect = new AMQPConnection($config);
    
    if (!$connect->connect()) {
        die("Cannot connect to the broker!
    ");
    }
    
    $channel  = new AMQPChannel($connect);
    $exchange = new AMQPExchange($channel);
    
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_DIRECT);
    
    // 1:不持久化到磁盘,宕机数据消失 2:持久化到磁盘
    // $exchange->setFlags(AMQP_DURABLE);
    
    // 声明交换机
    $exchange->declareExchange();
    
    // 创建消息队列
    $queue = new AMQPQueue($channel);
    $queue->setName($queueName);
    // $queue->setArgument('x-message-ttl', 5000);
    // 设置持久性
    // $queue->setFlags(AMQP_DURABLE);
    
    // 声明消息队列
    $queue->declareQueue();
    
    $queue->bind($exchange->getName(), $routeKey);
    
    // 接收消息并处理回调
    $queue->consume('receive');
    
    // 处理回调的方法
    function receive($envelop, $queue){
        echo $envelop->getBody() . "
    ";
    
        // ACK 通知生产者任务完成
        $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
    }
    

      

      

  • 相关阅读:
    C++常用库
    如何学好VC和MFC(各前辈学*方法及感受整理)(五)
    如何学好VC和MFC(各前辈学习方法及感受整理)(一)
    基于Winsock API的VC网络编程实战
    const int *p和int * const p的区别(常量指针与指向常量的指针) .
    java中的“包”与C#中的“命名空间
    vc加载lib文件的方法
    C|C++中的静态全局变量,静态局部变量,全局变量,局部变量的区别
    如何学好VC和MFC(各前辈学习方法及感受整理)(三)
    如何学好VC和MFC(各前辈学习方法及感受整理)(二)
  • 原文地址:https://www.cnblogs.com/xiangdongsheng/p/14264493.html
Copyright © 2011-2022 走看看