1 | <?php |
||||||
2 | |||||||
3 | declare(strict_types=1); |
||||||
4 | |||||||
5 | namespace Yiisoft\Queue\AMQP; |
||||||
6 | |||||||
7 | use PhpAmqpLib\Message\AMQPMessage; |
||||||
8 | use Throwable; |
||||||
9 | use Yiisoft\Queue\Adapter\AdapterInterface; |
||||||
10 | use Yiisoft\Queue\AMQP\Exception\NotImplementedException; |
||||||
11 | use Yiisoft\Queue\Cli\LoopInterface; |
||||||
12 | use Yiisoft\Queue\Enum\JobStatus; |
||||||
13 | use Yiisoft\Queue\Message\MessageInterface; |
||||||
14 | |||||||
15 | final class Adapter implements AdapterInterface |
||||||
16 | { |
||||||
17 | public function __construct( |
||||||
18 | private QueueProviderInterface $queueProvider, |
||||||
19 | private MessageSerializerInterface $serializer, |
||||||
20 | private LoopInterface $loop, |
||||||
21 | ) { |
||||||
22 | } |
||||||
23 | |||||||
24 | public function withChannel(string $channel): self |
||||||
25 | { |
||||||
26 | $instance = clone $this; |
||||||
27 | $instance->queueProvider = $this->queueProvider->withChannelName($channel); |
||||||
28 | |||||||
29 | return $instance; |
||||||
30 | } |
||||||
31 | |||||||
32 | /** |
||||||
33 | * @param callable(MessageInterface): bool $handlerCallback |
||||||
34 | */ |
||||||
35 | public function runExisting(callable $handlerCallback): void |
||||||
36 | { |
||||||
37 | $channel = $this->queueProvider->getChannel(); |
||||||
38 | (new ExistingMessagesConsumer($channel, $this->queueProvider |
||||||
39 | ->getQueueSettings() |
||||||
40 | ->getName(), $this->serializer)) |
||||||
41 | ->consume($handlerCallback); |
||||||
42 | } |
||||||
43 | |||||||
44 | /** |
||||||
45 | * @return never |
||||||
46 | */ |
||||||
47 | public function status(string $id): JobStatus |
||||||
48 | { |
||||||
49 | throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.'); |
||||||
50 | } |
||||||
51 | |||||||
52 | public function push(MessageInterface $message): void |
||||||
53 | { |
||||||
54 | $payload = $this->serializer->serialize($message); |
||||||
55 | $amqpMessage = new AMQPMessage( |
||||||
56 | $payload, |
||||||
57 | array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties()) |
||||||
58 | ); |
||||||
59 | $exchangeSettings = $this->queueProvider->getExchangeSettings(); |
||||||
60 | $this->queueProvider |
||||||
61 | ->getChannel() |
||||||
62 | ->basic_publish( |
||||||
63 | $amqpMessage, |
||||||
64 | $exchangeSettings?->getName() ?? '', |
||||||
65 | $exchangeSettings ? '' : $this->queueProvider |
||||||
66 | ->getQueueSettings() |
||||||
67 | ->getName() |
||||||
68 | ); |
||||||
69 | /** @var string $messageId */ |
||||||
70 | $messageId = $amqpMessage->get('message_id'); |
||||||
71 | $message->setId($messageId); |
||||||
0 ignored issues
–
show
The method
setId() does not exist on Yiisoft\Queue\Message\MessageInterface . It seems like you code against a sub-type of Yiisoft\Queue\Message\MessageInterface such as Yiisoft\Queue\Message\IdEnvelope .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||||
72 | } |
||||||
73 | |||||||
74 | public function subscribe(callable $handlerCallback): void |
||||||
75 | { |
||||||
76 | $channel = $this->queueProvider->getChannel(); |
||||||
77 | $channel->basic_consume( |
||||||
78 | $this->queueProvider |
||||||
79 | ->getQueueSettings() |
||||||
80 | ->getName(), |
||||||
81 | $this->queueProvider |
||||||
82 | ->getQueueSettings() |
||||||
83 | ->getName(), |
||||||
84 | false, |
||||||
85 | false, |
||||||
86 | false, |
||||||
87 | true, |
||||||
88 | function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void { |
||||||
89 | try { |
||||||
90 | $handlerCallback($this->serializer->unserialize($amqpMessage->body)); |
||||||
0 ignored issues
–
show
The property
PhpAmqpLib\Message\AMQPMessage::$body has been deprecated: Will be removed in version 4.0, use getBody() instead.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This property has been deprecated. The supplier of the class has supplied an explanatory message. The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead. ![]() |
|||||||
91 | $channel->basic_ack($amqpMessage->getDeliveryTag()); |
||||||
92 | } catch (Throwable $exception) { |
||||||
93 | $consumerTag = $amqpMessage->getConsumerTag(); |
||||||
94 | if ($consumerTag !== null) { |
||||||
95 | $channel->basic_cancel($consumerTag); |
||||||
96 | } |
||||||
97 | |||||||
98 | throw $exception; |
||||||
99 | } |
||||||
100 | } |
||||||
101 | ); |
||||||
102 | |||||||
103 | while ($this->loop->canContinue()) { |
||||||
104 | $channel->wait(); |
||||||
105 | } |
||||||
106 | } |
||||||
107 | |||||||
108 | public function getQueueProvider(): QueueProviderInterface |
||||||
109 | { |
||||||
110 | return $this->queueProvider; |
||||||
111 | } |
||||||
112 | |||||||
113 | public function withQueueProvider(QueueProviderInterface $queueProvider): self |
||||||
114 | { |
||||||
115 | $new = clone $this; |
||||||
116 | $new->queueProvider = $queueProvider; |
||||||
117 | |||||||
118 | return $new; |
||||||
119 | } |
||||||
120 | } |
||||||
121 |
In the issue above, the returned value is violating the contract defined by the mentioned interface.
Let's take a look at an example: