zoukankan      html  css  js  c++  java
  • rabbitmq 延迟队列 php

    <?php
    
    require_once '../vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    use PhpAmqpLibWireAMQPTable;
    use PhpAmqpLibExchangeAMQPExchangeType;
    
    $connection = new AMQPStreamConnection('192.168.0.5', 5672, 'test', 'test', '/');
    $channel = $connection->channel();
    //给cache发送  使其过期然后定向到另一个
    //声明两个队列
    $channel->exchange_declare('delay_exchange', 'direct',false,false,false);
    $channel->exchange_declare('cache_exchange', 'direct',false,false,false);
     
    $tale = new AMQPTable();
    $tale->set('x-dead-letter-exchange', 'delay_exchange');//****很关键  表示过期后由哪个exchange处理
    $tale->set('x-dead-letter-routing-key','delay_exchange');//****很关键  表示过期后由哪个exchange处理
    //$tale->set('x-message-ttl',15000);  //存活时长   下面的过期时间不能超过
     
    $channel->queue_declare('cache_queue',false,true,false,false,false,$tale);
    $channel->queue_bind('cache_queue', 'cache_exchange','cache_exchange');
     
    $channel->queue_declare('delay_queue',false,true,false,false,false);
    $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
     
     
    $msg = new AMQPMessage($argv[1], array(
        'expiration' => intval($argv[2]),
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
     
    ));
     
    $channel->basic_publish($msg,'cache_exchange','cache_exchange');
    echo date('Y-m-d H:i:s')." [x] Sent {$argv[1]} ".PHP_EOL;
     
    $channel->close();
    $connection->close();
    

      

    <?php
    require_once '../vendor/autoload.php';
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    $connection = new AMQPStreamConnection('192.168.0.5', 5672, 'test', 'test', '/');
    $channel = $connection->channel();
    $channel->exchange_declare('delay_exchange', 'direct',false,false,false);
     
    $channel->queue_declare('delay_queue',false,true,false,false,false);
    $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
     
    echo ' [*] Waiting for message. To exit press CTRL+C '.PHP_EOL;
     
    $callback = function ($msg){
        echo date('Y-m-d H:i:s')." [x] Received",$msg->body,PHP_EOL;
     
        // $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
     
    };
     
    //只有consumer已经处理并确认了上一条message时queue才分派新的message给它
    // $channel->basic_qos(null, 1, null);
    $channel->basic_consume('delay_queue','',false,true,false,false,$callback);
     
     
    while (count($channel->callbacks)) {
        $channel->wait();
    }
    $channel->close();
    $connection->close()
    

      

  • 相关阅读:
    训练网络考虑内容
    阿斯顿
    wifi 模块Yeelink联网
    fpga串口通信的verilog驱动
    vga显示彩条
    状态机之二段式
    矩阵键盘
    error
    时钟
    Error(10028)
  • 原文地址:https://www.cnblogs.com/setevn/p/14645605.html
Copyright © 2011-2022 走看看