1 | <?php |
||
2 | |||
3 | declare(strict_types=1); |
||
4 | |||
5 | namespace Yiisoft\Queue\AMQP; |
||
6 | |||
7 | use PhpAmqpLib\Channel\AMQPChannel; |
||
8 | use PhpAmqpLib\Message\AMQPMessage; |
||
9 | use Throwable; |
||
10 | use Yiisoft\Queue\Message\MessageInterface; |
||
11 | |||
12 | /** |
||
13 | * @internal |
||
14 | */ |
||
15 | final class ExistingMessagesConsumer |
||
16 | { |
||
17 | private bool $messageConsumed = false; |
||
18 | |||
19 | public function __construct( |
||
20 | private AMQPChannel $channel, |
||
21 | private string $queueName, |
||
22 | private MessageSerializerInterface $serializer |
||
23 | ) { |
||
24 | } |
||
25 | |||
26 | /** |
||
27 | * @param callable(MessageInterface): bool $callback |
||
28 | */ |
||
29 | public function consume(callable $callback): void |
||
30 | { |
||
31 | $this->channel->basic_consume( |
||
32 | $this->queueName, |
||
33 | '', |
||
34 | false, |
||
35 | false, |
||
36 | false, |
||
37 | false, |
||
38 | function (AMQPMessage $amqpMessage) use ($callback): void { |
||
39 | try { |
||
40 | $message = $this->serializer->unserialize($amqpMessage->body); |
||
0 ignored issues
–
show
|
|||
41 | if ($this->messageConsumed = $callback($message)) { |
||
42 | $this->channel->basic_ack($amqpMessage->getDeliveryTag()); |
||
43 | } |
||
44 | } catch (Throwable $exception) { |
||
45 | $this->messageConsumed = false; |
||
46 | $consumerTag = $amqpMessage->getConsumerTag(); |
||
47 | if ($consumerTag !== null) { |
||
48 | $this->channel->basic_cancel($consumerTag); |
||
49 | } |
||
50 | |||
51 | throw $exception; |
||
52 | } |
||
53 | } |
||
54 | ); |
||
55 | |||
56 | do { |
||
57 | $this->messageConsumed = false; |
||
58 | $this->channel->wait(null, true); |
||
59 | } while ($this->messageConsumed === true); |
||
60 | } |
||
61 | } |
||
62 |
This property has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.