最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

来源:动视网 责编:小采 时间:2020-11-03 12:30:52
文档

PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

PHP 框架 Hyperf 实现处理超时未支付订单和延时队列:延时队列Delayproducer.PhpAmqpbuilder.PhpAmqpBuilder.php<php declare(strict_types = 1); namespace App\Components\Amqp; use Hyperf\Amqp\Builder\Builder; use Hyperf\Amqp\Builder\Que
推荐度:
导读PHP 框架 Hyperf 实现处理超时未支付订单和延时队列:延时队列Delayproducer.PhpAmqpbuilder.PhpAmqpBuilder.php<php declare(strict_types = 1); namespace App\Components\Amqp; use Hyperf\Amqp\Builder\Builder; use Hyperf\Amqp\Builder\Que


延时队列

  • Delayproducer.Php

  • Amqpbuilder.Php

  • AmqpBuilder.php

    <?php
    declare(strict_types = 1);
    namespace AppComponentsAmqp;
    use HyperfAmqpBuilderBuilder;
    use HyperfAmqpBuilderQueueBuilder;
    class AmqpBuilder extends QueueBuilder
    {
     /**
     * @param array|PhpAmqpLibWireAMQPTable $arguments
     *
     * @return HyperfAmqpBuilderBuilder
     */
     public function setArguments($arguments) : Builder
     {
     $this->arguments = array_merge($this->arguments, $arguments);
     return $this;
     }
     /**
     * 设置延时队列相关参数
     *
     * @param string $queueName
     * @param int $xMessageTtl
     * @param string $xDeadLetterExchange
     * @param string $xDeadLetterRoutingKey
     *
     * @return $this
     */
     public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
     {
     $this->setArguments([
     'x-message-ttl' => ['I', $xMessageTtl * 1000], // 毫秒
     'x-dead-letter-exchange' => ['S', $xDeadLetterExchange],
     'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],
     ]);
     $this->setQueue($queueName);
     return $this;
     }
    }

    DelayProducer.php

    <?php
    declare(strict_types = 1);
    namespace AppComponentsAmqp;
    use HyperfAmqpAnnotationProducer;
    use HyperfAmqpBuilder;
    use HyperfAmqpMessageProducerMessageInterface;
    use HyperfDiAnnotationAnnotationCollector;
    use PhpAmqpLibMessageAMQPMessage;
    use Throwable;
    class DelayProducer extends Builder
    {
     /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder $queueBuilder
     * @param bool $confirm
     * @param int $timeout
     *
     * @return bool
     * @throws Throwable
     */
     public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
     {
     return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout)
     {
     return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
     });
     }
     /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder $queueBuilder
     * @param bool $confirm
     * @param int $timeout
     *
     * @return bool
     * @throws Throwable
     */
     private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
     {
     $result = false;
     $this->injectMessageProperty($producerMessage);
     $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
     $pool = $this->getConnectionPool($producerMessage->getPoolName());
     /** @var HyperfAmqpConnection $connection */
     $connection = $pool->get();
     if ($confirm) {
     $channel = $connection->getConfirmChannel();
     } else {
     $channel = $connection->getChannel();
     }
     $channel->set_ack_handler(function () use (&$result)
     {
     $result = true;
     });
     try {
     // 处理延时队列
     $exchangeBuilder = $producerMessage->getExchangeBuilder();
     // 队列定义
     $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
     // 路由定义
     $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
     // 队列绑定
     $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
     // 消息发送
     $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
     $channel->wait_for_pending_acks_returns($timeout);
     } catch (Throwable $exception) {
     // Reconnect the connection before release.
     $connection->reconnect();
     throw $exception;
     }
     finally {
     $connection->release();
     }
     return $confirm ? $result : true;
     }
     /**
     * @param ProducerMessageInterface $producerMessage
     */
     private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
     {
     if (class_exists(AnnotationCollector::class)) {
     /** @var HyperfAmqpAnnotationProducer $annotation */
     $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
     if ($annotation) {
     $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
     $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
     }
     }
     }
    }

    处理超时订单

  • Orderqueueconsumer.Php

  • Orderqueueproducer.Php

  • Orderqueueproducer.php

    <?php
    declare(strict_types = 1);
    namespace AppAmqpProducer;
    use HyperfAmqpAnnotationProducer;
    use HyperfAmqpBuilderExchangeBuilder;
    use HyperfAmqpMessageProducerMessage;
    /**
     * @Producer(exchange="order_exchange", routingKey="order_exchange")
     */
    class OrderQueueProducer extends ProducerMessage
    {
     public function __construct($data)
     {
     $this->payload = $data;
     }
     public function getExchangeBuilder() : ExchangeBuilder
     {
     return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
     }
    }

    Orderqueueconsumer.php

    <?php
    declare(strict_types = 1);
    namespace AppAmqpConsumer;
    use AppServiceCityTransportOrderService;
    use HyperfAmqpResult;
    use HyperfAmqpAnnotationConsumer;
    use HyperfAmqpMessageConsumerMessage;
    /**
     * @Consumer(exchange="delay_exchange", routingKey="delay_route", queue="delay_queue", name ="OrderQueueConsumer", nums=1)
     */
    class OrderQueueConsumer extends ConsumerMessage
    {
     public function consume($data) : string
     {
     ##业务处理
     }
     public function isEnable() : bool
     {
     return true;
     }
    }

    Demo

    $builder = new AmqpBuilder();
     $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
     $que = ApplicationContext::getContainer()->get(DelayProducer::class);
     var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))

    推荐教程:《PHP教程》

    文档

    PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

    PHP 框架 Hyperf 实现处理超时未支付订单和延时队列:延时队列Delayproducer.PhpAmqpbuilder.PhpAmqpBuilder.php<php declare(strict_types = 1); namespace App\Components\Amqp; use Hyperf\Amqp\Builder\Builder; use Hyperf\Amqp\Builder\Que
    推荐度:
    标签: php 框架 Hyperf
    • 热门焦点

    最新推荐

    猜你喜欢

    热门推荐

    专题
    Top