zoukankan      html  css  js  c++  java
  • rabbitmq 延迟队列 php

    <?php
    
    require_once '../vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    use PhpAmqpLibWireAMQPTable;
    use PhpAmqpLibExchangeAMQPExchangeType;
    
    $connection = new AMQPStreamConnection('192.168.0.5', 5672, 'test', 'test', '/');
    $channel = $connection->channel();
    //给cache发送  使其过期然后定向到另一个
    //声明两个队列
    $channel->exchange_declare('delay_exchange', 'direct',false,false,false);
    $channel->exchange_declare('cache_exchange', 'direct',false,false,false);
     
    $tale = new AMQPTable();
    $tale->set('x-dead-letter-exchange', 'delay_exchange');//****很关键  表示过期后由哪个exchange处理
    $tale->set('x-dead-letter-routing-key','delay_exchange');//****很关键  表示过期后由哪个exchange处理
    //$tale->set('x-message-ttl',15000);  //存活时长   下面的过期时间不能超过
     
    $channel->queue_declare('cache_queue',false,true,false,false,false,$tale);
    $channel->queue_bind('cache_queue', 'cache_exchange','cache_exchange');
     
    $channel->queue_declare('delay_queue',false,true,false,false,false);
    $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
     
     
    $msg = new AMQPMessage($argv[1], array(
        'expiration' => intval($argv[2]),
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
     
    ));
     
    $channel->basic_publish($msg,'cache_exchange','cache_exchange');
    echo date('Y-m-d H:i:s')." [x] Sent {$argv[1]} ".PHP_EOL;
     
    $channel->close();
    $connection->close();
    

      

    <?php
    require_once '../vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $connection = new AMQPStreamConnection('192.168.0.5', 5672, 'test', 'test', '/');
    $channel = $connection->channel();
    $channel->exchange_declare('delay_exchange', 'direct',false,false,false);
     
    $channel->queue_declare('delay_queue',false,true,false,false,false);
    $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
     
    echo ' [*] Waiting for message. To exit press CTRL+C '.PHP_EOL;
     
    $callback = function ($msg){
        echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
     
        // $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
     
    };
     
    //只有consumer已经处理并确认了上一条message时queue才分派新的message给它
    // $channel->basic_qos(null, 1, null);
    $channel->basic_consume('delay_queue','',false,true,false,false,$callback);
     
     
    while (count($channel->callbacks)) {
        $channel->wait();
    }
    $channel->close();
    $connection->close()
    

      

  • 相关阅读:
    python中可变类型和不可变类型
    python PEP8开发规范
    pandas之——Series常用总结
    python os 模块的使用
    Markdown语法
    HttpClient连接池抛出大量ConnectionPoolTimeoutException: Timeout waiting for connection异常排查
    MySQL union all排序问题
    mysql解决datetime与timestamp精确到毫秒的问题
    keepalived + nginx实现高可用
    配置文件keepalived.conf详解
  • 原文地址:https://www.cnblogs.com/setevn/p/14645605.html
Copyright © 2011-2022 走看看