zoukankan      html  css  js  c++  java
  • PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

    延时队列

    • Delayproducer.Php
    • Amqpbuilder.Php

    AmqpBuilder.php

    <?php
    
    declare(strict_types = 1);
    
    namespace AppComponentsAmqp;
    
    use HyperfAmqpBuilderBuilder;
    
    use HyperfAmqpBuilderQueueBuilder;
    
    class AmqpBuilder extends QueueBuilder
    
    {
    
        /**
    
         * @param array|PhpAmqpLibWireAMQPTable $arguments
    
         *
    
         * @return HyperfAmqpBuilderBuilder
    
         */
    
        public function setArguments($arguments) : Builder
    
        {
    
            $this->arguments = array_merge($this->arguments, $arguments);
    
            return $this;
    
        }
    
        /**
    
         * 设置延时队列相关参数
    
         *
    
         * @param string $queueName
    
         * @param int    $xMessageTtl
    
         * @param string $xDeadLetterExchange
    
         * @param string $xDeadLetterRoutingKey
    
         *
    
         * @return $this
    
         */
    
        public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
    
        {
    
            $this->setArguments([
    
                'x-message-ttl'             => ['I', $xMessageTtl * 1000], // 毫秒
    
                'x-dead-letter-exchange'    => ['S', $xDeadLetterExchange],
    
                'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],
    
            ]);
    
            $this->setQueue($queueName);
    
            return $this;
    
        }
    
    }
    

      

    DelayProducer.php

    <?php
    declare(strict_types = 1);
    namespace AppComponentsAmqp;
    use HyperfAmqpAnnotationProducer;
    use HyperfAmqpBuilder;
    use HyperfAmqpMessageProducerMessageInterface;
    use HyperfDiAnnotationAnnotationCollector;
    use PhpAmqpLibMessageAMQPMessage;
    use Throwable;
    class DelayProducer extends Builder
    {
     /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder              $queueBuilder
     * @param bool                     $confirm
     * @param int                      $timeout
     *
     * @return bool
     * @throws Throwable
     */
     public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
     {
     return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
     {
     return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
     });
     }
     /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder              $queueBuilder
     * @param bool                     $confirm
     * @param int                      $timeout
     *
     * @return bool
     * @throws Throwable
     */
     private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
     {
     $result = false;
     $this->injectMessageProperty($producerMessage);
     $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
     $pool = $this->getConnectionPool($producerMessage->getPoolName());
     /** @var HyperfAmqpConnection $connection */
     $connection = $pool->get();
     if ($confirm) {
     $channel = $connection->getConfirmChannel();
     } else {
     $channel = $connection->getChannel();
     }
     $channel->set_ack_handler(function () use (&$result)
     {
     $result = true;
     });
     try {
     // 处理延时队列
     $exchangeBuilder = $producerMessage->getExchangeBuilder();
     // 队列定义
     $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
     // 路由定义
     $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
     // 队列绑定
     $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
     // 消息发送
     $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
     $channel->wait_for_pending_acks_returns($timeout);
     } catch (Throwable $exception) {
     // Reconnect the connection before release.
     $connection->reconnect();
     throw $exception;
     }
     finally {
     $connection->release();
     }
     return $confirm ? $result : true;
     }
     /**
     * @param ProducerMessageInterface $producerMessage
     */
     private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
     {
     if (class_exists(AnnotationCollector::class)) {
     /** @var HyperfAmqpAnnotationProducer $annotation */
     $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
     if ($annotation) {
     $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
     $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
     }
     }
     }
    }
    

      

    处理超时订单

    • Orderqueueconsumer.Php
    • Orderqueueproducer.Php

    Orderqueueproducer.php

    <?php
    declare(strict_types = 1);
    namespace AppAmqpProducer;
    use HyperfAmqpAnnotationProducer;
    use HyperfAmqpBuilderExchangeBuilder;
    use HyperfAmqpMessageProducerMessage;
    /**
     * @Producer(exchange="order_exchange", routingKey="order_exchange")
     */
    class OrderQueueProducer extends ProducerMessage
    {
     public function __construct($data)
     {
     $this->payload = $data;
     }
     public function getExchangeBuilder() : ExchangeBuilder
     {
     return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
     }
    }
    

      

    Orderqueueconsumer.php
    <?php
    declare(strict_types = 1);
    namespace AppAmqpConsumer;
    use AppServiceCityTransportOrderService;
    use HyperfAmqpResult;
    use HyperfAmqpAnnotationConsumer;
    use HyperfAmqpMessageConsumerMessage;
    /**
     * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)
     */
    class OrderQueueConsumer extends ConsumerMessage
    {
     public function consume($data) : string
     {
     ##业务处理
     }
     public function isEnable() : bool
     {
     return true;
     }
    } 
    

      

    Demo

    $builder = new AmqpBuilder();
     $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
     $que = ApplicationContext::getContainer()->get(DelayProducer::class);
     var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder)) 
    

      

    更多学习内容请访问:

    腾讯T3-T4标准精品PHP架构师教程目录大全,只要你看完保证薪资上升一个台阶(持续更新)

  • 相关阅读:
    零基础学python-2.15 回到我们的游戏 加入for以及列表
    零基础学python-2.14 for循环语句
    零基础学python-在3.x版本之后的print()不换行
    零基础学python-2.13 回到我们的游戏 加入循环
    CSV文件读取
    jmeter 测试webservice协议soap接口
    jmeter-plugins-manager.jar插件安装
    JDBC协议(jmeter链接mysql)
    xftp连接centos7
    xshell连接centos7
  • 原文地址:https://www.cnblogs.com/a609251438/p/12894970.html
Copyright © 2011-2022 走看看