Test Failed
Pull Request — master (#100)
by Dmitriy
27:31 queued 13:25
created

DelayMiddleware::process()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 20
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 14
nc 1
nop 2
dl 0
loc 20
rs 9.7998
c 1
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\AMQP\Middleware;
6
7
use InvalidArgumentException;
8
use PhpAmqpLib\Exchange\AMQPExchangeType;
9
use PhpAmqpLib\Message\AMQPMessage;
10
use Yiisoft\Queue\Adapter\AdapterInterface;
11
use Yiisoft\Queue\AMQP\Adapter;
12
use Yiisoft\Queue\AMQP\QueueProviderInterface;
13
use Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface;
14
use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface;
15
use Yiisoft\Queue\Middleware\DelayMiddlewareInterface;
16
use Yiisoft\Queue\Middleware\MessageHandlerInterface;
17
use Yiisoft\Queue\Middleware\MiddlewareInterface;
18
use Yiisoft\Queue\Middleware\Request;
19
20
final class DelayMiddleware implements MiddlewareInterface, DelayMiddlewareInterface
21
{
22
    public function __construct(
23
        private AdapterInterface $adapter,
24
        private float $delayInSeconds,
25
        private bool $forcePersistentMessages = true
26
    ) {
27
        if (!$adapter instanceof Adapter) {
28
            throw new InvalidArgumentException(
29
                sprintf(
30
                    'This middleware works only with the %s. %s given.',
31
                    Adapter::class,
32
                    get_debug_type($adapter)
33
                )
34
            );
35
        }
36
    }
37
38
    /**
39
     * @param float $seconds
40
     *
41
     * @return $this
42
     */
43
    public function withDelay(float $seconds): self
44
    {
45
        $new = clone $this;
46
        $new->delayInSeconds = $seconds;
47
48
        return $new;
49
    }
50
51
    public function getDelay(): float
52
    {
53
        return $this->delayInSeconds;
54
    }
55
56
    public function process(Request $request, MessageHandlerInterface $handler): Request
57
    {
58
        $queueProvider = $this->adapter->getQueueProvider();
0 ignored issues
show
Bug introduced by
The method getQueueProvider() does not exist on Yiisoft\Queue\Adapter\AdapterInterface. It seems like you code against a sub-type of Yiisoft\Queue\Adapter\AdapterInterface such as Yiisoft\Queue\AMQP\Adapter. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

58
        /** @scrutinizer ignore-call */ 
59
        $queueProvider = $this->adapter->getQueueProvider();
Loading history...
59
        $originalExchangeSettings = $queueProvider->getExchangeSettings();
60
        $delayedExchangeSettings = $this->getExchangeSettings($originalExchangeSettings);
61
        $queueSettings = $this->getQueueSettings(
62
            $queueProvider->getQueueSettings(),
63
            $originalExchangeSettings
64
        );
65
66
        $adapter = $this->adapter->withQueueProvider(
0 ignored issues
show
Bug introduced by
The method withQueueProvider() does not exist on Yiisoft\Queue\Adapter\AdapterInterface. It seems like you code against a sub-type of Yiisoft\Queue\Adapter\AdapterInterface such as Yiisoft\Queue\AMQP\Adapter. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

66
        /** @scrutinizer ignore-call */ 
67
        $adapter = $this->adapter->withQueueProvider(
Loading history...
67
            $queueProvider
68
                ->withMessageProperties($this->getMessageProperties($queueProvider))
69
                ->withExchangeSettings($delayedExchangeSettings)
70
                ->withQueueSettings($queueSettings)
71
        );
72
73
        return $handler->handle(
74
            $request->withQueue(
75
                $request->getQueue()->withAdapter($adapter)
76
            )
77
        );
78
    }
79
80
    /**
81
     * @psalm-return array{expiration: int|float, delivery_mode?: int}&array
82
     */
83
    private function getMessageProperties(QueueProviderInterface $queueProvider): array
84
    {
85
        $messageProperties = ['expiration' => $this->delayInSeconds * 1000];
86
        if ($this->forcePersistentMessages === true) {
87
            $messageProperties['delivery_mode'] = AMQPMessage::DELIVERY_MODE_PERSISTENT;
88
        }
89
90
        return array_merge($queueProvider->getMessageProperties(), $messageProperties);
91
    }
92
93
    private function getQueueSettings(
94
        QueueSettingsInterface $queueSettings,
95
        ?ExchangeSettingsInterface $exchangeSettings
96
    ): QueueSettingsInterface {
97
        $deliveryTime = time() + $this->delayInSeconds;
98
99
        return $queueSettings
100
            ->withName("{$queueSettings->getName()}.dlx.$deliveryTime")
101
            ->withAutoDeletable(true)
102
            ->withArguments(
103
                [
104
                    'x-dead-letter-exchange' => ['S', $exchangeSettings?->getName() ?? ''],
105
                    'x-expires' => ['I', $this->delayInSeconds * 1000 + 30000],
106
                    'x-message-ttl' => ['I', $this->delayInSeconds * 1000],
107
                ]
108
            );
109
    }
110
111
    /**
112
     * @see https://github.com/vimeo/psalm/issues/9454
113
     *
114
     * @psalm-suppress LessSpecificReturnType
115
     */
116
    private function getExchangeSettings(?ExchangeSettingsInterface $exchangeSettings): ?ExchangeSettingsInterface
117
    {
118
        /** @noinspection NullPointerExceptionInspection */
119
        return $exchangeSettings
120
            ?->withName("{$exchangeSettings->getName()}.dlx")
0 ignored issues
show
Bug introduced by
The method getName() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

120
            ?->withName("{$exchangeSettings->/** @scrutinizer ignore-call */ getName()}.dlx")

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
121
            ->withAutoDelete(false)
122
            ->withType(AMQPExchangeType::TOPIC);
123
    }
124
}
125