Issues (9)

src/Middleware/DelayMiddleware.php (1 issue)

Labels
Severity
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\AMQP\Middleware;
6
7
use InvalidArgumentException;
8
use PhpAmqpLib\Exchange\AMQPExchangeType;
9
use PhpAmqpLib\Message\AMQPMessage;
10
use Yiisoft\Queue\AMQP\Adapter;
11
use Yiisoft\Queue\AMQP\QueueProviderInterface;
12
use Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface;
13
use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface;
14
use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface;
15
use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface;
16
use Yiisoft\Queue\Middleware\Push\PushRequest;
17
18
final class DelayMiddleware implements DelayMiddlewareInterface
19
{
20
    public function __construct(private float $delayInSeconds, private bool $forcePersistentMessages = true)
21
    {
22
    }
23
24
    /**
25
     * @param float $seconds
26
     *
27
     * @return $this
28
     */
29
    public function withDelay(float $seconds): self
30
    {
31
        $new = clone $this;
32
        $new->delayInSeconds = $seconds;
33
34
        return $new;
35
    }
36
37
    public function getDelay(): float
38
    {
39
        return $this->delayInSeconds;
40
    }
41
42
    public function processPush(PushRequest $request, MessageHandlerPushInterface $handler): PushRequest
43
    {
44
        $adapter = $request->getAdapter();
45
        if (!$adapter instanceof Adapter) {
46
            $type = get_debug_type($adapter);
47
            $class = Adapter::class;
48
            throw new InvalidArgumentException(
49
                "This middleware works only with the $class. $type given."
50
            );
51
        }
52
53
        $queueProvider = $adapter->getQueueProvider();
54
        $exchangeSettings = $this->getExchangeSettings($queueProvider->getExchangeSettings());
55
        $queueSettings = $this->getQueueSettings($queueProvider->getQueueSettings(), $queueProvider->getExchangeSettings());
56
        $adapter = $adapter->withQueueProvider(
57
            $queueProvider
58
                ->withMessageProperties($this->getMessageProperties($queueProvider))
59
                ->withExchangeSettings($exchangeSettings)
60
                ->withQueueSettings($queueSettings)
61
        );
62
63
        return $handler->handlePush($request->withAdapter($adapter));
64
    }
65
66
    /**
67
     * @psalm-return array{expiration: int|float, delivery_mode?: int}&array
68
     */
69
    private function getMessageProperties(QueueProviderInterface $queueProvider): array
70
    {
71
        $messageProperties = ['expiration' => $this->delayInSeconds * 1000];
72
        if ($this->forcePersistentMessages === true) {
73
            $messageProperties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
74
        }
75
76
        return array_merge($queueProvider->getMessageProperties(), $messageProperties);
77
    }
78
79
    private function getQueueSettings(
80
        QueueSettingsInterface $queueSettings,
81
        ?ExchangeSettingsInterface $exchangeSettings
82
    ): QueueSettingsInterface {
83
        $deliveryTime = time() + $this->delayInSeconds;
84
85
        return $queueSettings
86
            ->withName("{$queueSettings->getName()}.dlx.$deliveryTime")
87
            ->withAutoDeletable(true)
88
            ->withArguments(
89
                [
90
                    'x-dead-letter-exchange' => ['S', $exchangeSettings?->getName() ?? ''],
91
                    'x-expires' => ['I', $this->delayInSeconds * 1000 + 30000],
92
                    'x-message-ttl' => ['I', $this->delayInSeconds * 1000],
93
                ]
94
            );
95
    }
96
97
    /**
98
     * @see https://github.com/vimeo/psalm/issues/9454
99
     *
100
     * @psalm-suppress LessSpecificReturnType
101
     */
102
    private function getExchangeSettings(?ExchangeSettingsInterface $exchangeSettings): ?ExchangeSettingsInterface
103
    {
104
        /** @noinspection NullPointerExceptionInspection */
105
        return $exchangeSettings
106
            ?->withName("{$exchangeSettings->getName()}.dlx")
0 ignored issues
show
The method getName() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

106
            ?->withName("{$exchangeSettings->/** @scrutinizer ignore-call */ getName()}.dlx")

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
107
            ->withAutoDelete(true)
108
            ->withType(AMQPExchangeType::TOPIC);
109
    }
110
}
111