1 | <?php |
||
2 | |||
3 | declare(strict_types=1); |
||
4 | |||
5 | namespace Yiisoft\Queue\AMQP; |
||
6 | |||
7 | use PhpAmqpLib\Message\AMQPMessage; |
||
8 | use Throwable; |
||
9 | use Yiisoft\Queue\Adapter\AdapterInterface; |
||
10 | use Yiisoft\Queue\AMQP\Exception\NotImplementedException; |
||
11 | use Yiisoft\Queue\Cli\LoopInterface; |
||
12 | use Yiisoft\Queue\Enum\JobStatus; |
||
13 | use Yiisoft\Queue\Message\MessageInterface; |
||
14 | |||
15 | final class Adapter implements AdapterInterface |
||
16 | { |
||
17 | public function __construct( |
||
18 | private QueueProviderInterface $queueProvider, |
||
19 | private MessageSerializerInterface $serializer, |
||
20 | private LoopInterface $loop, |
||
21 | ) { |
||
22 | } |
||
23 | |||
24 | public function withChannel(string $channel): self |
||
25 | { |
||
26 | $instance = clone $this; |
||
27 | $instance->queueProvider = $this->queueProvider->withChannelName($channel); |
||
28 | |||
29 | return $instance; |
||
30 | } |
||
31 | |||
32 | /** |
||
33 | * @param callable(MessageInterface): bool $handlerCallback |
||
34 | */ |
||
35 | public function runExisting(callable $handlerCallback): void |
||
36 | { |
||
37 | $channel = $this->queueProvider->getChannel(); |
||
38 | (new ExistingMessagesConsumer($channel, $this->queueProvider |
||
39 | ->getQueueSettings() |
||
40 | ->getName(), $this->serializer)) |
||
41 | ->consume($handlerCallback); |
||
42 | } |
||
43 | |||
44 | /** |
||
45 | * @return never |
||
46 | */ |
||
47 | public function status(string $id): JobStatus |
||
48 | { |
||
49 | throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.'); |
||
50 | } |
||
51 | |||
52 | public function push(MessageInterface $message): void |
||
53 | { |
||
54 | $payload = $this->serializer->serialize($message); |
||
55 | $amqpMessage = new AMQPMessage( |
||
56 | $payload, |
||
57 | array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties()) |
||
58 | ); |
||
59 | $exchangeSettings = $this->queueProvider->getExchangeSettings(); |
||
60 | $this->queueProvider |
||
61 | ->getChannel() |
||
62 | ->basic_publish( |
||
63 | $amqpMessage, |
||
64 | $exchangeSettings?->getName() ?? '', |
||
65 | $exchangeSettings ? '' : $this->queueProvider |
||
66 | ->getQueueSettings() |
||
67 | ->getName() |
||
68 | ); |
||
69 | /** @var string $messageId */ |
||
70 | $messageId = $amqpMessage->get('message_id'); |
||
71 | $message->setId($messageId); |
||
72 | } |
||
73 | |||
74 | public function subscribe(callable $handlerCallback): void |
||
75 | { |
||
76 | $channel = $this->queueProvider->getChannel(); |
||
77 | $channel->basic_consume( |
||
78 | $this->queueProvider |
||
79 | ->getQueueSettings() |
||
80 | ->getName(), |
||
81 | $this->queueProvider |
||
82 | ->getQueueSettings() |
||
83 | ->getName(), |
||
84 | false, |
||
85 | false, |
||
86 | false, |
||
87 | true, |
||
88 | function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void { |
||
89 | try { |
||
90 | $handlerCallback($this->serializer->unserialize($amqpMessage->body)); |
||
0 ignored issues
–
show
|
|||
91 | $channel->basic_ack($amqpMessage->getDeliveryTag()); |
||
92 | } catch (Throwable $exception) { |
||
93 | $consumerTag = $amqpMessage->getConsumerTag(); |
||
94 | if ($consumerTag !== null) { |
||
95 | $channel->basic_cancel($consumerTag); |
||
96 | } |
||
97 | |||
98 | throw $exception; |
||
99 | } |
||
100 | } |
||
101 | ); |
||
102 | |||
103 | while ($this->loop->canContinue()) { |
||
104 | $channel->wait(); |
||
105 | } |
||
106 | } |
||
107 | |||
108 | public function getQueueProvider(): QueueProviderInterface |
||
109 | { |
||
110 | return $this->queueProvider; |
||
111 | } |
||
112 | |||
113 | public function withQueueProvider(QueueProviderInterface $queueProvider): self |
||
114 | { |
||
115 | $new = clone $this; |
||
116 | $new->queueProvider = $queueProvider; |
||
117 | |||
118 | return $new; |
||
119 | } |
||
120 | } |
||
121 |
This property has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.