Test Failed
Pull Request — master (#111)
by Viktor
15:25
created

Adapter::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 0

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 0
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 3
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\AMQP;
6
7
use BackedEnum;
8
use PhpAmqpLib\Message\AMQPMessage;
9
use Throwable;
10
use Yiisoft\Queue\Adapter\AdapterInterface;
11
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
12
use Yiisoft\Queue\Cli\LoopInterface;
13
use Yiisoft\Queue\Enum\JobStatus;
14
use Yiisoft\Queue\Message\IdEnvelope;
15
use Yiisoft\Queue\Message\MessageInterface;
16
17
final class Adapter implements AdapterInterface
18
{
19
    public function __construct(
20
        private QueueProviderInterface $queueProvider,
21
        private readonly MessageSerializerInterface $serializer,
22
        private readonly LoopInterface $loop,
23
    ) {
24
    }
25
26
    public function withChannel(BackedEnum|string $channel): self
27
    {
28
        $instance = clone $this;
29
        $channelName = is_string($channel) ? $channel : (string) $channel->value;
0 ignored issues
show
Bug introduced by
Accessing value on the interface BackedEnum suggest that you code against a concrete implementation. How about adding an instanceof check?
Loading history...
30
        $instance->queueProvider = $this->queueProvider->withChannelName($channelName);
31
32
        return $instance;
33
    }
34
35
    /**
36
     * @param callable(MessageInterface): bool  $handlerCallback
37
     */
38
    public function runExisting(callable $handlerCallback): void
39
    {
40
        $channel = $this->queueProvider->getChannel();
41
        (new ExistingMessagesConsumer($channel, $this->queueProvider
42
            ->getQueueSettings()
43
            ->getName(), $this->serializer))
44
            ->consume($handlerCallback);
45
    }
46
47
    /**
48
     * @return never
49
     */
50
    public function status(int|string $id): JobStatus
51
    {
52
        throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
53
    }
54
55
    public function push(MessageInterface $message): MessageInterface
56
    {
57
        $payload = $this->serializer->serialize($message);
58
        $amqpMessage = new AMQPMessage(
59
            $payload,
60
            array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
61
        );
62
        $exchangeSettings = $this->queueProvider->getExchangeSettings();
63
        $this->queueProvider
64
            ->getChannel()
65
            ->basic_publish(
66
                $amqpMessage,
67
                $exchangeSettings?->getName() ?? '',
68
                $exchangeSettings ? '' : $this->queueProvider
69
                    ->getQueueSettings()
70
                    ->getName()
71
            );
72
        /** @var string $messageId */
73
        $messageId = $amqpMessage->get('message_id');
74
75
        return new IdEnvelope($message, $messageId);
76
    }
77
78
    public function subscribe(callable $handlerCallback): void
79
    {
80
        $channel = $this->queueProvider->getChannel();
81
        $channel->basic_consume(
82
            $this->queueProvider
83
                ->getQueueSettings()
84
                ->getName(),
85
            $this->queueProvider
86
                ->getQueueSettings()
87
                ->getName(),
88
            false,
89
            false,
90
            false,
91
            true,
92
            function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
93
                try {
94
                    $handlerCallback($this->serializer->unserialize($amqpMessage->getBody()));
95
                    $channel->basic_ack($amqpMessage->getDeliveryTag());
96
                } catch (Throwable $exception) {
97
                    $consumerTag = $amqpMessage->getConsumerTag();
98
                    if ($consumerTag !== null) {
99
                        $channel->basic_cancel($consumerTag);
100
                    }
101
102
                    throw $exception;
103
                }
104
            }
105
        );
106
107
        while ($this->loop->canContinue()) {
108
            $channel->wait();
109
        }
110
    }
111
112
    public function getQueueProvider(): QueueProviderInterface
113
    {
114
        return $this->queueProvider;
115
    }
116
117
    public function withQueueProvider(QueueProviderInterface $queueProvider): self
118
    {
119
        $new = clone $this;
120
        $new->queueProvider = $queueProvider;
121
122
        return $new;
123
    }
124
125
    public function getChannel(): string
126
    {
127
        return $this->queueProvider->getQueueSettings()->getName();
128
    }
129
}
130