Test Failed
Pull Request — master (#110)
by
unknown
04:23
created

Adapter::withQueueProvider()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
c 0
b 0
f 0
dl 0
loc 5
rs 10
cc 1
nc 1
nop 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace App;
6
7
use PhpAmqpLib\Message\AMQPMessage;
8
use Throwable;
9
use Yiisoft\Queue\Adapter\AdapterInterface;
10
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
11
use Yiisoft\Queue\AMQP\ExistingMessagesConsumer;
12
use Yiisoft\Queue\AMQP\MessageSerializerInterface;
13
use Yiisoft\Queue\AMQP\QueueProviderInterface;
14
use Yiisoft\Queue\Cli\LoopInterface;
15
use Yiisoft\Queue\Enum\JobStatus;
16
use Yiisoft\Queue\Message\MessageInterface;
17
18
final class Adapter implements AdapterInterface {
19
    /**
20
     * @param QueueProviderInterface $queueProvider
21
     * @param MessageSerializerInterface $serializer
22
     * @param LoopInterface $loop
23
     */
24
    public function __construct(
25
        private QueueProviderInterface              $queueProvider,
26
        private readonly MessageSerializerInterface $serializer,
27
        private readonly LoopInterface              $loop,
28
    ) {
29
    }
30
31
    /**
32
     * @param string $channel
33
     * @return $this
34
     */
35
    public function withChannel(string $channel): self {
36
        $instance = clone $this;
37
        $instance->queueProvider = $this->queueProvider->withChannelName($channel);
38
39
        return $instance;
40
    }
41
42
    /**
43
     * @param callable(MessageInterface): bool $handlerCallback
44
     */
45
    public function runExisting(callable $handlerCallback): void {
46
        $channel = $this->queueProvider->getChannel();
47
        (new ExistingMessagesConsumer($channel, $this->queueProvider
48
            ->getQueueSettings()
49
            ->getName(), $this->serializer))
50
            ->consume($handlerCallback);
51
    }
52
53
    /**
54
     * @param string|int $id
55
     * @return JobStatus
56
     */
57
    public function status(string|int $id): JobStatus {
58
        throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
59
    }
60
61
    /**
62
     * @param MessageInterface $message
63
     * @return MessageInterface
64
     */
65
    public function push(MessageInterface $message): MessageInterface {
66
        $payload = $this->serializer->serialize($message);
67
        $amqpMessage = new AMQPMessage(
68
            $payload,
69
            array_merge(['message_id' => uniqid('', true)], $this->queueProvider->getMessageProperties())
70
        );
71
        $exchangeSettings = $this->queueProvider->getExchangeSettings();
72
        $this->queueProvider
73
            ->getChannel()
74
            ->basic_publish(
75
                $amqpMessage,
76
                $exchangeSettings?->getName() ?? '',
77
                $exchangeSettings ? '' : $this->queueProvider
78
                    ->getQueueSettings()
79
                    ->getName()
80
            );
81
        /** @var string $messageId */
82
        $messageId = $amqpMessage->get('message_id');
83
        $message->setId($messageId);
0 ignored issues
show
Bug introduced by
The method setId() does not exist on Yiisoft\Queue\Message\MessageInterface. It seems like you code against a sub-type of Yiisoft\Queue\Message\MessageInterface such as Yiisoft\Queue\Message\IdEnvelope. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

83
        $message->/** @scrutinizer ignore-call */ 
84
                  setId($messageId);
Loading history...
84
85
        return $message;
86
    }
87
88
    /**
89
     * @param callable $handlerCallback
90
     * @return void
91
     */
92
    public function subscribe(callable $handlerCallback): void {
93
        $channel = $this->queueProvider->getChannel();
94
        $channel->basic_consume(
95
            $this->queueProvider
96
                ->getQueueSettings()
97
                ->getName(),
98
            $this->queueProvider
99
                ->getQueueSettings()
100
                ->getName(),
101
            false,
102
            false,
103
            false,
104
            true,
105
            function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
106
                try {
107
                    $handlerCallback($this->serializer->unserialize($amqpMessage->body));
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$body has been deprecated: Will be removed in version 4.0, use getBody() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

107
                    $handlerCallback($this->serializer->unserialize(/** @scrutinizer ignore-deprecated */ $amqpMessage->body));

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
108
                    $channel->basic_ack($amqpMessage->getDeliveryTag());
109
                } catch (Throwable $exception) {
110
                    $consumerTag = $amqpMessage->getConsumerTag();
111
                    if ($consumerTag !== null) {
112
                        $channel->basic_cancel($consumerTag);
113
                    }
114
115
                    throw $exception;
116
                }
117
            }
118
        );
119
120
        while ($this->loop->canContinue()) {
121
            $channel->wait();
122
        }
123
    }
124
125
    /**
126
     * @return QueueProviderInterface
127
     */
128
    public function getQueueProvider(): QueueProviderInterface {
129
        return $this->queueProvider;
130
    }
131
132
    /**
133
     * @param QueueProviderInterface $queueProvider
134
     * @return $this
135
     */
136
    public function withQueueProvider(QueueProviderInterface $queueProvider): self {
137
        $new = clone $this;
138
        $new->queueProvider = $queueProvider;
139
140
        return $new;
141
    }
142
}
143