当前位置:首页 > PHP教程 > php高级应用 > 列表

PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

发布:smiling 来源: PHP粉丝网  添加日期:2022-06-14 08:55:02 浏览: 评论:0 

延时队列

Delayproducer.Php

Amqpbuilder.Php

AmqpBuilder.php

  1. <?php 
  2.  
  3. declare(strict_types = 1); 
  4.  
  5. namespace App\Components\Amqp; 
  6.  
  7. use Hyperf\Amqp\Builder\Builder; 
  8.  
  9. use Hyperf\Amqp\Builder\QueueBuilder; 
  10.  
  11. class AmqpBuilder extends QueueBuilder 
  12.  
  13.  
  14.     /** 
  15.  
  16.      * @param array|\PhpAmqpLib\Wire\AMQPTable $arguments 
  17.  
  18.      * 
  19.  
  20.      * @return \Hyperf\Amqp\Builder\Builder 
  21.  
  22.      */ 
  23.  
  24.     public function setArguments($arguments) : Builder 
  25.  
  26.     { 
  27.  
  28.         $this->arguments = array_merge($this->arguments, $arguments); 
  29.  
  30.         return $this
  31.  
  32.     } 
  33.  
  34.     /** 
  35.  
  36.      * 设置延时队列相关参数 
  37.  
  38.      * 
  39.  
  40.      * @param string $queueName 
  41.  
  42.      * @param int    $xMessageTtl 
  43.  
  44.      * @param string $xDeadLetterExchange 
  45.  
  46.      * @param string $xDeadLetterRoutingKey 
  47.  
  48.      * 
  49.  
  50.      * @return $this 
  51.  
  52.      */ 
  53.  
  54.     public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self 
  55.  
  56.     { 
  57.  
  58.         $this->setArguments([ 
  59.  
  60.             'x-message-ttl'             => ['I'$xMessageTtl * 1000], // 毫秒 
  61.  
  62.             'x-dead-letter-exchange'    => ['S'$xDeadLetterExchange], 
  63.  
  64.             'x-dead-letter-routing-key' => ['S'$xDeadLetterRoutingKey], 
  65.  
  66.         ]); 
  67.  
  68.         $this->setQueue($queueName); 
  69.  
  70.         return $this
  71.  
  72.     } 
  73.  

DelayProducer.php

  1. <?php 
  2.  
  3. declare(strict_types = 1); 
  4.  
  5. namespace App\Components\Amqp; 
  6.  
  7. use Hyperf\Amqp\Annotation\Producer; 
  8.  
  9. use Hyperf\Amqp\Builder; 
  10.  
  11. use Hyperf\Amqp\Message\ProducerMessageInterface; 
  12.  
  13. use Hyperf\Di\Annotation\AnnotationCollector; 
  14.  
  15. use PhpAmqpLib\Message\AMQPMessage; 
  16.  
  17. use Throwable; 
  18.  
  19. class DelayProducer extends Builder 
  20.  
  21.  
  22.     /** 
  23.  
  24.      * @param ProducerMessageInterface $producerMessage 
  25.  
  26.      * @param AmqpBuilder              $queueBuilder 
  27.  
  28.      * @param bool                     $confirm 
  29.  
  30.      * @param int                      $timeout 
  31.  
  32.      * 
  33.  
  34.      * @return bool 
  35.  
  36.      * @throws \Throwable 
  37.  
  38.      */ 
  39.  
  40.     public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool 
  41.  
  42.     { 
  43.  
  44.         return retry(1, function () use ($producerMessage$queueBuilder$confirm$timeout
  45.  
  46.         { 
  47.  
  48.             return $this->produceMessage($producerMessage$queueBuilder$confirm$timeout); 
  49.  
  50.         }); 
  51.  
  52.     } 
  53.  
  54.     /** 
  55.  
  56.      * @param ProducerMessageInterface $producerMessage 
  57.  
  58.      * @param AmqpBuilder              $queueBuilder 
  59.  
  60.      * @param bool                     $confirm 
  61.  
  62.      * @param int                      $timeout 
  63.  
  64.      * 
  65.  
  66.      * @return bool 
  67.  
  68.      * @throws \Throwable 
  69.  
  70.      */ 
  71.  
  72.     private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool 
  73.  
  74.     { 
  75.  
  76.         $result = false; 
  77.  
  78.         $this->injectMessageProperty($producerMessage); 
  79.  
  80.         $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties()); 
  81.  
  82.         $pool    = $this->getConnectionPool($producerMessage->getPoolName()); 
  83.  
  84.         /** @var \Hyperf\Amqp\Connection $connection */ 
  85.  
  86.         $connection = $pool->get(); 
  87.  
  88.         if ($confirm) { 
  89.  
  90.             $channel = $connection->getConfirmChannel(); 
  91.  
  92.         } else { 
  93.  
  94.             $channel = $connection->getChannel(); 
  95.  
  96.         } 
  97.  
  98.         $channel->set_ack_handler(function () use (&$result
  99.  
  100.         { 
  101.  
  102.             $result = true; 
  103.  
  104.         }); 
  105.  
  106.         try { 
  107.  
  108.             // 处理延时队列 
  109.  
  110.             $exchangeBuilder = $producerMessage->getExchangeBuilder(); 
  111.  
  112.             // 队列定义 
  113.  
  114.             $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket()); 
  115.  
  116.             // 路由定义 
  117.  
  118.             $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket()); 
  119.  
  120.             // 队列绑定 
  121.  
  122.             $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey()); 
  123.  
  124.             // 消息发送 
  125.  
  126.             $channel->basic_publish($message$producerMessage->getExchange(), $producerMessage->getRoutingKey()); 
  127.  
  128.             $channel->wait_for_pending_acks_returns($timeout); 
  129.  
  130.         } catch (Throwable $exception) { 
  131.  
  132.             // Reconnect the connection before release. 
  133.  
  134.             $connection->reconnect(); 
  135.  
  136.             throw $exception
  137.  
  138.         } 
  139.  
  140.         finally { 
  141.  
  142.             $connection->release(); 
  143.  
  144.         } 
  145.  
  146.         return $confirm ? $result : true; 
  147.  
  148.     } 
  149.  
  150.     /** 
  151.  
  152.      * @param ProducerMessageInterface $producerMessage 
  153.  
  154.      */ 
  155.  
  156.     private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void 
  157.  
  158.     { 
  159.  
  160.         if (class_exists(AnnotationCollector::class)) { 
  161.  
  162.             /** @var \Hyperf\Amqp\Annotation\Producer $annotation */ 
  163.  
  164.             $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class); 
  165.  
  166.             if ($annotation) { 
  167.  
  168.                 $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey); 
  169.  
  170.                 $annotation->exchange && $producerMessage->setExchange($annotation->exchange); 
  171.  
  172.             } 
  173.  
  174.         } 
  175.  
  176.     } 
  177.  

处理超时订单

Orderqueueconsumer.Php

Orderqueueproducer.Php

Orderqueueproducer.php

  1. <?php 
  2.  
  3. declare(strict_types = 1); 
  4.  
  5. namespace App\Amqp\Producer; 
  6.  
  7. use Hyperf\Amqp\Annotation\Producer; 
  8.  
  9. use Hyperf\Amqp\Builder\ExchangeBuilder; 
  10.  
  11. use Hyperf\Amqp\Message\ProducerMessage; 
  12.  
  13. /** 
  14.  
  15.  * @Producer(exchange="order_exchange", routingKey="order_exchange") 
  16.  
  17.  */ 
  18.  
  19. class OrderQueueProducer extends ProducerMessage 
  20.  
  21.  
  22.     public function __construct($data
  23.  
  24.     { 
  25.  
  26.         $this->payload = $data
  27.  
  28.     } 
  29.  
  30.     public function getExchangeBuilder() : ExchangeBuilder 
  31.  
  32.     { 
  33.  
  34.         return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub 
  35.  
  36.     } 
  37.  

Orderqueueconsumer.php

  1. <?php 
  2.  
  3. declare(strict_types = 1); 
  4.  
  5. namespace App\Amqp\Consumer; 
  6.  
  7. use App\Service\CityTransport\OrderService; 
  8.  
  9. use Hyperf\Amqp\Result; 
  10.  
  11. use Hyperf\Amqp\Annotation\Consumer; 
  12.  
  13. use Hyperf\Amqp\Message\ConsumerMessage; 
  14.  
  15. /** 
  16.  
  17.  * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1) 
  18.  
  19.  */ 
  20.  
  21. class OrderQueueConsumer extends ConsumerMessage 
  22.  
  23.  
  24.     public function consume($data) : string 
  25.  
  26.     { 
  27.  
  28.        ##业务处理 
  29.  
  30.     } 
  31.  
  32.     public function isEnable() : bool 
  33.  
  34.     { 
  35.  
  36.         return true; 
  37.  
  38.     } 
  39.  

Demo

  1. $builder = new AmqpBuilder(); 
  2.  
  3.         $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange''delay_route'); 
  4.  
  5.         $que = ApplicationContext::getContainer()->get(DelayProducer::class); 
  6.  
  7.         var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))

Tags: Hyperf PHP延时队列

分享到: