zoukankan      html  css  js  c++  java
  • 【RabbitMQ 实战指南】一 延迟队列

    1、什么是延迟队列

    延迟队列中存储延迟消息,延迟消息是指当消息被发送到队列中不会立即消费,而是等待一段时间后再消费该消息。

    延迟队列很多应用场景,一个典型的应用场景是订单未支付超时取消,用户下单之后30分钟内未支付成功,则把订单取消。

    2、使用要求

    RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过过期时间TTL和死信队列来模拟延迟队列。

    过期时间TTL 可以参考文章: 【RabbitMQ 实战指南】一 过期时间TTL

    死信队列可以参考文章:【RabbitMQ 实战指南】一 死信队列

    3、延迟队列测试

    采用订单未支付超时取消的应用场景来做测试,其具体步骤如下:

    • 1、创建两个交换器 exchange.order 和 exchange.delay, 分别绑定两个队列 queue.order 和 queue.delay

    • 2、把 queue.delay 队列里面的消息配置过期时间,一般订单是30分钟,这里设置成10秒,然后通过 x-dead-letter-exchange 指定死信交换器为 exchange.delay

    • 3、发送消息到 queue.order 中,消息过期之后流入 exchange.delay,然后路由到 queue.delay 队列中,然后检查订单状态,如果未支付,则进行取消操作

    3.1、生产者代码

    <?php 
    require __DIR__ . '/../../../../vendor/autoload.php';
    
    use PhpAmqpLibWireAMQPTable;
    use PhpAmqpLibMessageAMQPMessage;
    use PhpAmqpLibExchangeAMQPExchangeType;
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    // todo 更改配置
    $connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');
    
    $channel = $connection->channel();
    
    $channel->exchange_declare('exchange.order', AMQPExchangeType::DIRECT, false, true);
    $channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true);
    $args = new AMQPTable();
    // 消息过期方式:设置 queue.order 队列中的消息10s之后过期
    $args->set('x-message-ttl', 10000);
    $args->set('x-dead-letter-exchange', 'exchange.delay');
    $args->set('x-dead-letter-routing-key', 'routingkey.delay');
    $channel->queue_declare('queue.order', false, true, false, false, false, $args);
    $channel->queue_declare('queue.delay', false, true, false, false);
    
    $channel->queue_bind('queue.order', 'exchange.order', 'routingkey.cancel.order');
    $channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay');
    $message = new AMQPMessage('F20190413180108970');
    $channel->basic_publish($message, 'exchange.order', 'routingkey.cancel.order');
    
    $channel->close();
    $connection->close();

    运行生产者代码之后,queue.order 队列会有一条消息,如下图:

    10秒之后,消息会过期,然后被进入 exchange.delay, 进而路由到 queue.delay 队列中:

    3.2、消费者代码

    <?php 
    require __DIR__ . '/../../../../vendor/autoload.php';
    
    use PhpAmqpLibMessageAMQPMessage;
    use PhpAmqpLibExchangeAMQPExchangeType;
    use PhpAmqpLibConnectionAMQPStreamConnection;
    
    // todo 更改配置
    $connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');
    $channel = $connection->channel();
    
    $channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true);
    $channel->queue_declare('queue.delay', false, true, false, false);
    
    $channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay');
    
    function process_message($message)
    {
        echo "开始处理订单,订单号:" . $message->body . PHP_EOL;
        echo "获取订单的状态,如果未支付,则进行取消订单操作" . PHP_EOL;
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    }
    
    $channel->basic_consume('queue.delay', 'cancelOrder', false, false, false, false, 'process_message');
    
    function shutdown($channel, $connection)
    {
        $channel->close();
        $connection->close();
    }
    register_shutdown_function('shutdown', $channel, $connection);
    
    while ($channel ->is_consuming()) {
        $channel->wait();
    }

    运行消费者代码之后,会获取到订单号,之后可以检查该订单的状态,如果未支付则进行取消操作,如下图:

  • 相关阅读:
    BZOJ 1027: [JSOI2007]合金 (计算几何+Floyd求最小环)
    BZOJ 4522: [Cqoi2016]密钥破解 (Pollard-Rho板题)
    BZOJ 4802: 欧拉函数 (Pollard-Rho)
    BZOJ 3944: Sum (杜教筛)
    BZOJ 3309: DZY Loves Math (莫比乌斯反演)
    BZOJ 2599: [IOI2011]Race(点分治板题)
    BZOJ 3680: 吊打XXX // Luogu [JSOI2004]平衡点 / 吊打XXX (模拟退火)
    Luogu P3690【模板】Link Cut Tree (LCT板题)
    [HNOI2007]最小矩形覆盖
    [SCOI2007]最大土地面积
  • 原文地址:https://www.cnblogs.com/Zhangcsc/p/11761229.html
Copyright © 2011-2022 走看看