Issues (9)

src/Adapter.php (1 issue)

1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\AMQP;
6
7
use PhpAmqpLib\Message\AMQPMessage;
8
use Throwable;
9
use Yiisoft\Queue\Adapter\AdapterInterface;
10
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
11
use Yiisoft\Queue\Cli\LoopInterface;
12
use Yiisoft\Queue\Enum\JobStatus;
13
use Yiisoft\Queue\Message\MessageInterface;
14
15
final class Adapter implements AdapterInterface
16
{
17
    public function __construct(
18
        private QueueProviderInterface $queueProvider,
19
        private MessageSerializerInterface $serializer,
20
        private LoopInterface $loop,
21
    ) {
22
    }
23
24
    public function withChannel(string $channel): self
25
    {
26
        $instance = clone $this;
27
        $instance->queueProvider = $this->queueProvider->withChannelName($channel);
28
29
        return $instance;
30
    }
31
32
    /**
33
     * @param callable(MessageInterface): bool  $handlerCallback
34
     */
35
    public function runExisting(callable $handlerCallback): void
36
    {
37
        $channel = $this->queueProvider->getChannel();
38
        (new ExistingMessagesConsumer($channel, $this->queueProvider
39
            ->getQueueSettings()
40
            ->getName(), $this->serializer))
41
            ->consume($handlerCallback);
42
    }
43
44
    /**
45
     * @return never
46
     */
47
    public function status(string $id): JobStatus
48
    {
49
        throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
50
    }
51
52
    public function push(MessageInterface $message): void
53
    {
54
        $payload = $this->serializer->serialize($message);
55
        $amqpMessage = new AMQPMessage(
56
            $payload,
57
            array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
58
        );
59
        $exchangeSettings = $this->queueProvider->getExchangeSettings();
60
        $this->queueProvider
61
            ->getChannel()
62
            ->basic_publish(
63
                $amqpMessage,
64
                $exchangeSettings?->getName() ?? '',
65
                $exchangeSettings ? '' : $this->queueProvider
66
                    ->getQueueSettings()
67
                    ->getName()
68
            );
69
        /** @var string $messageId */
70
        $messageId = $amqpMessage->get('message_id');
71
        $message->setId($messageId);
72
    }
73
74
    public function subscribe(callable $handlerCallback): void
75
    {
76
        $channel = $this->queueProvider->getChannel();
77
        $channel->basic_consume(
78
            $this->queueProvider
79
                ->getQueueSettings()
80
                ->getName(),
81
            $this->queueProvider
82
                ->getQueueSettings()
83
                ->getName(),
84
            false,
85
            false,
86
            false,
87
            true,
88
            function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
89
                try {
90
                    $handlerCallback($this->serializer->unserialize($amqpMessage->body));
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$body has been deprecated: Will be removed in version 4.0, use getBody() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

90
                    $handlerCallback($this->serializer->unserialize(/** @scrutinizer ignore-deprecated */ $amqpMessage->body));

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
91
                    $channel->basic_ack($amqpMessage->getDeliveryTag());
92
                } catch (Throwable $exception) {
93
                    $consumerTag = $amqpMessage->getConsumerTag();
94
                    if ($consumerTag !== null) {
95
                        $channel->basic_cancel($consumerTag);
96
                    }
97
98
                    throw $exception;
99
                }
100
            }
101
        );
102
103
        while ($this->loop->canContinue()) {
104
            $channel->wait();
105
        }
106
    }
107
108
    public function getQueueProvider(): QueueProviderInterface
109
    {
110
        return $this->queueProvider;
111
    }
112
113
    public function withQueueProvider(QueueProviderInterface $queueProvider): self
114
    {
115
        $new = clone $this;
116
        $new->queueProvider = $queueProvider;
117
118
        return $new;
119
    }
120
}
121