Adapter   A
last analyzed

Complexity

Total Complexity 12

Size/Duplication

Total Lines 104
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 12
eloc 52
dl 0
loc 104
rs 10
c 0
b 0
f 0

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
A withChannel() 0 6 1
A getQueueProvider() 0 3 1
A push() 0 20 2
A status() 0 3 1
A runExisting() 0 7 1
A withQueueProvider() 0 6 1
A subscribe() 0 31 4
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\AMQP;
6
7
use PhpAmqpLib\Message\AMQPMessage;
8
use Throwable;
9
use Yiisoft\Queue\Adapter\AdapterInterface;
10
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
11
use Yiisoft\Queue\Cli\LoopInterface;
12
use Yiisoft\Queue\Enum\JobStatus;
13
use Yiisoft\Queue\Message\MessageInterface;
14
15
final class Adapter implements AdapterInterface
16
{
17
    public function __construct(
18
        private QueueProviderInterface $queueProvider,
19
        private MessageSerializerInterface $serializer,
20
        private LoopInterface $loop,
21
    ) {
22
    }
23
24
    public function withChannel(string $channel): self
25
    {
26
        $instance = clone $this;
27
        $instance->queueProvider = $this->queueProvider->withChannelName($channel);
28
29
        return $instance;
30
    }
31
32
    /**
33
     * @param callable(MessageInterface): bool  $handlerCallback
34
     */
35
    public function runExisting(callable $handlerCallback): void
36
    {
37
        $channel = $this->queueProvider->getChannel();
38
        (new ExistingMessagesConsumer($channel, $this->queueProvider
39
            ->getQueueSettings()
40
            ->getName(), $this->serializer))
41
            ->consume($handlerCallback);
42
    }
43
44
    /**
45
     * @return never
46
     */
47
    public function status(string $id): JobStatus
48
    {
49
        throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
50
    }
51
52
    public function push(MessageInterface $message): void
53
    {
54
        $payload = $this->serializer->serialize($message);
55
        $amqpMessage = new AMQPMessage(
56
            $payload,
57
            array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
58
        );
59
        $exchangeSettings = $this->queueProvider->getExchangeSettings();
60
        $this->queueProvider
61
            ->getChannel()
62
            ->basic_publish(
63
                $amqpMessage,
64
                $exchangeSettings?->getName() ?? '',
65
                $exchangeSettings ? '' : $this->queueProvider
66
                    ->getQueueSettings()
67
                    ->getName()
68
            );
69
        /** @var string $messageId */
70
        $messageId = $amqpMessage->get('message_id');
71
        $message->setId($messageId);
0 ignored issues
show
Bug Best Practice introduced by
The expression ImplicitReturnNode returns the type null which is incompatible with the return type mandated by Yiisoft\Queue\Adapter\AdapterInterface::push() of Yiisoft\Queue\Message\MessageInterface.

In the issue above, the returned value is violating the contract defined by the mentioned interface.

Let's take a look at an example:

interface HasName {
    /** @return string */
    public function getName();
}

class Name {
    public $name;
}

class User implements HasName {
    /** @return string|Name */
    public function getName() {
        return new Name('foo'); // This is a violation of the ``HasName`` interface
                                // which only allows a string value to be returned.
    }
}
Loading history...
Bug introduced by
The method setId() does not exist on Yiisoft\Queue\Message\MessageInterface. It seems like you code against a sub-type of Yiisoft\Queue\Message\MessageInterface such as Yiisoft\Queue\Message\IdEnvelope. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

71
        $message->/** @scrutinizer ignore-call */ 
72
                  setId($messageId);
Loading history...
72
    }
73
74
    public function subscribe(callable $handlerCallback): void
75
    {
76
        $channel = $this->queueProvider->getChannel();
77
        $channel->basic_consume(
78
            $this->queueProvider
79
                ->getQueueSettings()
80
                ->getName(),
81
            $this->queueProvider
82
                ->getQueueSettings()
83
                ->getName(),
84
            false,
85
            false,
86
            false,
87
            true,
88
            function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
89
                try {
90
                    $handlerCallback($this->serializer->unserialize($amqpMessage->body));
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$body has been deprecated: Will be removed in version 4.0, use getBody() instead. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

90
                    $handlerCallback($this->serializer->unserialize(/** @scrutinizer ignore-deprecated */ $amqpMessage->body));

This property has been deprecated. The supplier of the class has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the property will be removed from the class and what other property to use instead.

Loading history...
91
                    $channel->basic_ack($amqpMessage->getDeliveryTag());
92
                } catch (Throwable $exception) {
93
                    $consumerTag = $amqpMessage->getConsumerTag();
94
                    if ($consumerTag !== null) {
95
                        $channel->basic_cancel($consumerTag);
96
                    }
97
98
                    throw $exception;
99
                }
100
            }
101
        );
102
103
        while ($this->loop->canContinue()) {
104
            $channel->wait();
105
        }
106
    }
107
108
    public function getQueueProvider(): QueueProviderInterface
109
    {
110
        return $this->queueProvider;
111
    }
112
113
    public function withQueueProvider(QueueProviderInterface $queueProvider): self
114
    {
115
        $new = clone $this;
116
        $new->queueProvider = $queueProvider;
117
118
        return $new;
119
    }
120
}
121