zoukankan      html  css  js  c++  java
  • PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

    延时队列

    • Delayproducer.Php
    • Amqpbuilder.Php

    AmqpBuilder.php

    <?php
    
    declare(strict_types = 1);
    
    namespace AppComponentsAmqp;
    
    use HyperfAmqpBuilderBuilder;
    
    use HyperfAmqpBuilderQueueBuilder;
    
    class AmqpBuilder extends QueueBuilder
    
    {
    
        /**
    
         * @param array|PhpAmqpLibWireAMQPTable $arguments
    
         *
    
         * @return HyperfAmqpBuilderBuilder
    
         */
    
        public function setArguments($arguments) : Builder
    
        {
    
            $this->arguments = array_merge($this->arguments, $arguments);
    
            return $this;
    
        }
    
        /**
    
         * 设置延时队列相关参数
    
         *
    
         * @param string $queueName
    
         * @param int    $xMessageTtl
    
         * @param string $xDeadLetterExchange
    
         * @param string $xDeadLetterRoutingKey
    
         *
    
         * @return $this
    
         */
    
        public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
    
        {
    
            $this->setArguments([
    
                'x-message-ttl'             => ['I', $xMessageTtl * 1000], // 毫秒
    
                'x-dead-letter-exchange'    => ['S', $xDeadLetterExchange],
    
                'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],
    
            ]);
    
            $this->setQueue($queueName);
    
            return $this;
    
        }
    
    }
    

      

    DelayProducer.php

    <?php
    declare(strict_types = 1);
    namespace AppComponentsAmqp;
    use HyperfAmqpAnnotationProducer;
    use HyperfAmqpBuilder;
    use HyperfAmqpMessageProducerMessageInterface;
    use HyperfDiAnnotationAnnotationCollector;
    use PhpAmqpLibMessageAMQPMessage;
    use Throwable;
    class DelayProducer extends Builder
    {
     /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder              $queueBuilder
     * @param bool                     $confirm
     * @param int                      $timeout
     *
     * @return bool
     * @throws Throwable
     */
     public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
     {
     return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
     {
     return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
     });
     }
     /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder              $queueBuilder
     * @param bool                     $confirm
     * @param int                      $timeout
     *
     * @return bool
     * @throws Throwable
     */
     private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
     {
     $result = false;
     $this->injectMessageProperty($producerMessage);
     $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
     $pool = $this->getConnectionPool($producerMessage->getPoolName());
     /** @var HyperfAmqpConnection $connection */
     $connection = $pool->get();
     if ($confirm) {
     $channel = $connection->getConfirmChannel();
     } else {
     $channel = $connection->getChannel();
     }
     $channel->set_ack_handler(function () use (&$result)
     {
     $result = true;
     });
     try {
     // 处理延时队列
     $exchangeBuilder = $producerMessage->getExchangeBuilder();
     // 队列定义
     $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
     // 路由定义
     $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
     // 队列绑定
     $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
     // 消息发送
     $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
     $channel->wait_for_pending_acks_returns($timeout);
     } catch (Throwable $exception) {
     // Reconnect the connection before release.
     $connection->reconnect();
     throw $exception;
     }
     finally {
     $connection->release();
     }
     return $confirm ? $result : true;
     }
     /**
     * @param ProducerMessageInterface $producerMessage
     */
     private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
     {
     if (class_exists(AnnotationCollector::class)) {
     /** @var HyperfAmqpAnnotationProducer $annotation */
     $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
     if ($annotation) {
     $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
     $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
     }
     }
     }
    }
    

      

    处理超时订单

    • Orderqueueconsumer.Php
    • Orderqueueproducer.Php

    Orderqueueproducer.php

    <?php
    declare(strict_types = 1);
    namespace AppAmqpProducer;
    use HyperfAmqpAnnotationProducer;
    use HyperfAmqpBuilderExchangeBuilder;
    use HyperfAmqpMessageProducerMessage;
    /**
     * @Producer(exchange="order_exchange", routingKey="order_exchange")
     */
    class OrderQueueProducer extends ProducerMessage
    {
     public function __construct($data)
     {
     $this->payload = $data;
     }
     public function getExchangeBuilder() : ExchangeBuilder
     {
     return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
     }
    }
    

      

    Orderqueueconsumer.php
    <?php
    declare(strict_types = 1);
    namespace AppAmqpConsumer;
    use AppServiceCityTransportOrderService;
    use HyperfAmqpResult;
    use HyperfAmqpAnnotationConsumer;
    use HyperfAmqpMessageConsumerMessage;
    /**
     * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)
     */
    class OrderQueueConsumer extends ConsumerMessage
    {
     public function consume($data) : string
     {
     ##业务处理
     }
     public function isEnable() : bool
     {
     return true;
     }
    } 
    

      

    Demo

    $builder = new AmqpBuilder();
     $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
     $que = ApplicationContext::getContainer()->get(DelayProducer::class);
     var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder)) 
    

      

    更多学习内容请访问:

    腾讯T3-T4标准精品PHP架构师教程目录大全,只要你看完保证薪资上升一个台阶(持续更新)

  • 相关阅读:
    c语言网络编程过程及函数说明
    c代码编译完整过程详解
    const关键字的常见作用
    c语言中static关键字的几个作用
    c语言结构体中字节对齐方式
    modbus协议数据格式
    CodeForces
    如何在Dev-Cpp中使用C++11中的函数:stoi、to_string、unordered_map、unordered_set、auto
    关于lower_bound( )和upper_bound( )的常见用法
    CodeForces 600C——思维
  • 原文地址:https://www.cnblogs.com/a609251438/p/12894970.html
Copyright © 2011-2022 走看看