Test Failed
Pull Request — master (#96)
by Dmitriy
06:18 queued 02:27
created

DelayMiddleware   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 91
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 9
eloc 36
dl 0
loc 91
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\AMQP\Middleware;
6
7
use InvalidArgumentException;
8
use PhpAmqpLib\Exchange\AMQPExchangeType;
9
use PhpAmqpLib\Message\AMQPMessage;
10
use Yiisoft\Queue\AMQP\Adapter;
11
use Yiisoft\Queue\AMQP\QueueProviderInterface;
12
use Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface;
13
use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface;
14
use Yiisoft\Queue\Middleware\Push\Implementation\DelayMiddlewareInterface;
15
use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface;
16
use Yiisoft\Queue\Middleware\Push\PushRequest;
17
18
final class DelayMiddleware implements DelayMiddlewareInterface
19
{
20
    public function __construct(private float $delayInSeconds, private bool $forcePersistentMessages = true)
21
    {
22
    }
23
24
    /**
25
     * @param float $seconds
26
     *
27
     * @return $this
28
     */
29
    public function withDelay(float $seconds): self
30
    {
31
        $new = clone $this;
32
        $new->delayInSeconds = $seconds;
33
34
        return $new;
35
    }
36
37
    public function getDelay(): float
38
    {
39
        return $this->delayInSeconds;
40
    }
41
42
    public function processPush(PushRequest $request, MessageHandlerPushInterface $handler): PushRequest
43
    {
44
        $adapter = $request->getAdapter();
45
        if (!$adapter instanceof Adapter) {
46
            throw new InvalidArgumentException(
47
                sprintf(
48
                    'This middleware works only with the %s. %s given.',
49
                    Adapter::class,
50
                    get_debug_type($adapter)
51
                )
52
            );
53
        }
54
55
        $queueProvider = $adapter->getQueueProvider();
56
        $originalExchangeSettings = $queueProvider->getExchangeSettings();
57
        $delayedExchangeSettings = $this->getExchangeSettings($originalExchangeSettings);
58
        $queueSettings = $this->getQueueSettings(
59
            $queueProvider->getQueueSettings(),
60
            $originalExchangeSettings
61
        );
62
63
        $adapter = $adapter->withQueueProvider(
64
            $queueProvider
65
                ->withMessageProperties($this->getMessageProperties($queueProvider))
66
                ->withExchangeSettings($delayedExchangeSettings)
67
                ->withQueueSettings($queueSettings)
68
        );
69
70
        return $handler->handlePush($request->withAdapter($adapter));
71
    }
72
73
    /**
74
     * @psalm-return array{expiration: int|float, delivery_mode?: int}&array
75
     */
76
    private function getMessageProperties(QueueProviderInterface $queueProvider): array
77
    {
78
        $messageProperties = ['expiration' => $this->delayInSeconds * 1000];
79
        if ($this->forcePersistentMessages === true) {
80
            $messageProperties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
81
        }
82
83
        return array_merge($queueProvider->getMessageProperties(), $messageProperties);
84
    }
85
86
    private function getQueueSettings(
87
        QueueSettingsInterface $queueSettings,
88
        ?ExchangeSettingsInterface $originalExchangeSettings
89
    ): QueueSettingsInterface {
90
        $arguments = [
91
            'x-dead-letter-exchange' => ['S', $originalExchangeSettings?->getName() ?? ''],
92
            'x-dead-routing-key' => ['S', $queueSettings->getName()],
93
            'x-expires' => ['I', $this->delayInSeconds * 1000 + 30_000],
0 ignored issues
show
Bug introduced by
A parse error occurred: Syntax error, unexpected T_STRING, expecting ',' or ']' on line 93 at column 66
Loading history...
94
            'x-message-ttl' => ['I', $this->delayInSeconds * 1000],
95
        ];
96
        return $queueSettings
97
            ->withName("{$queueSettings->getName()}.dlx")
98
            ->withArguments($arguments);
99
    }
100
101
    /**
102
     * @see https://github.com/vimeo/psalm/issues/9454
103
     *
104
     * @psalm-suppress LessSpecificReturnType
105
     */
106
    private function getExchangeSettings(?ExchangeSettingsInterface $exchangeSettings): ?ExchangeSettingsInterface
107
    {
108
        /** @noinspection NullPointerExceptionInspection */
109
        return $exchangeSettings
110
            ?->withName("{$exchangeSettings->getName()}.dlx")
111
            ->withType(AMQPExchangeType::TOPIC);
112
    }
113
}
114