Issues (9)

src/Adapter.php (2 issues)

1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\AMQP;
6
7
use PhpAmqpLib\Message\AMQPMessage;
8
use Throwable;
9
use Yiisoft\Queue\Adapter\AdapterInterface;
10
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
11
use Yiisoft\Queue\Cli\LoopInterface;
12
use Yiisoft\Queue\Enum\JobStatus;
13
use Yiisoft\Queue\Message\MessageInterface;
14
15
final class Adapter implements AdapterInterface
16
{
17
    public function __construct(
18
        private QueueProviderInterface $queueProvider,
19
        private MessageSerializerInterface $serializer,
20
        private LoopInterface $loop,
21
    ) {
22
    }
23
24
    public function withChannel(string $channel): self
25
    {
26
        $instance = clone $this;
27
        $instance->queueProvider = $this->queueProvider->withChannelName($channel);
28
29
        return $instance;
30
    }
31
32
    /**
33
     * @param callable(MessageInterface): bool  $handlerCallback
34
     */
35
    public function runExisting(callable $handlerCallback): void
36
    {
37
        $channel = $this->queueProvider->getChannel();
38
        (new ExistingMessagesConsumer($channel, $this->queueProvider
39
            ->getQueueSettings()
40
            ->getName(), $this->serializer))
41
            ->consume($handlerCallback);
42
    }
43
44
    /**
45
     * @return never
46
     */
47
    public function status(string $id): JobStatus
48
    {
49
        throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
50
    }
51
52
    public function push(MessageInterface $message): void
53
    {
54
        $payload = $this->serializer->serialize($message);
55
        $amqpMessage = new AMQPMessage(
56
            $payload,
57
            array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
58
        );
59
        $exchangeSettings = $this->queueProvider->getExchangeSettings();
60
        $this->queueProvider
61
            ->getChannel()
62
            ->basic_publish(
63
                $amqpMessage,
64
                $exchangeSettings?->getName() ?? '',
65
                $exchangeSettings ? '' : $this->queueProvider
66
                    ->getQueueSettings()
67
                    ->getName()
68
            );
69
        /** @var string $messageId */
70
        $messageId = $amqpMessage->get('message_id');
71
        $message->setId($messageId);
0 ignored issues
show
Bug Best Practice introduced by
The expression ImplicitReturnNode returns the type null which is incompatible with the return type mandated by Yiisoft\Queue\Adapter\AdapterInterface::push() of Yiisoft\Queue\Message\MessageInterface.

In the issue above, the returned value is violating the contract defined by the mentioned interface.

Let's take a look at an example:

interface HasName {
    /** @return string */
    public function getName();
}

class Name {
    public $name;
}

class User implements HasName {
    /** @return string|Name */
    public function getName() {
        return new Name('foo'); // This is a violation of the ``HasName`` interface
                                // which only allows a string value to be returned.
    }
}
Loading history...
The method setId() does not exist on Yiisoft\Queue\Message\MessageInterface. It seems like you code against a sub-type of Yiisoft\Queue\Message\MessageInterface such as Yiisoft\Queue\Message\IdEnvelope. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

71
        $message->/** @scrutinizer ignore-call */ 
72
                  setId($messageId);
Loading history...
72
    }
73
74
    public function subscribe(callable $handlerCallback): void
75
    {
76
        $channel = $this->queueProvider->getChannel();
77
        $channel->basic_consume(
78
            $this->queueProvider
79
                ->getQueueSettings()
80
                ->getName(),
81
            $this->queueProvider
82
                ->getQueueSettings()
83
                ->getName(),
84
            false,
85
            false,
86
            false,
87
            true,
88
            function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
89
                try {
90
                    $handlerCallback($this->serializer->unserialize($amqpMessage->body));
91
                    $channel->basic_ack($amqpMessage->getDeliveryTag());
92
                } catch (Throwable $exception) {
93
                    $consumerTag = $amqpMessage->getConsumerTag();
94
                    if ($consumerTag !== null) {
95
                        $channel->basic_cancel($consumerTag);
96
                    }
97
98
                    throw $exception;
99
                }
100
            }
101
        );
102
103
        while ($this->loop->canContinue()) {
104
            $channel->wait();
105
        }
106
    }
107
108
    public function getQueueProvider(): QueueProviderInterface
109
    {
110
        return $this->queueProvider;
111
    }
112
113
    public function withQueueProvider(QueueProviderInterface $queueProvider): self
114
    {
115
        $new = clone $this;
116
        $new->queueProvider = $queueProvider;
117
118
        return $new;
119
    }
120
}
121