AmqpProducer::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 2
crap 1
1
<?php
2
3
namespace Werkspot\MessageQueue\DeliveryQueue\Amqp;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Connection\AMQPLazyConnection;
7
use PhpAmqpLib\Message\AMQPMessage;
8
use PhpAmqpLib\Wire\AMQPTable;
9
use Werkspot\MessageQueue\DeliveryQueue\ProducerInterface;
10
use Werkspot\MessageQueue\Message\MessageInterface;
11
12
class AmqpProducer implements ProducerInterface
13
{
14
    /**
15
     * @var AMQPChannel
16
     */
17
    private $channel;
18
19
    /**
20
     * @var AMQPLazyConnection
21
     */
22
    private $connection;
23
24
    /**
25
     * @var MessageIdGeneratorInterface
26
     */
27
    private $idGenerator;
28
29 3
    public function __construct(AMQPLazyConnection $connection, MessageIdGeneratorInterface $idGenerator)
30
    {
31 3
        $this->connection = $connection;
32 3
        $this->idGenerator = $idGenerator;
33 3
    }
34
35 2
    public function send(MessageInterface $message, string $queueName): void
36
    {
37 2
        $this->setupChannel($queueName);
38
39 2
        $amqpMessage = new AMQPMessage(serialize($message));
40 2
        $amqpMessage->set('message_id', $this->idGenerator->generateId());
41 2
        $amqpMessage->set('priority', $message->getPriority());
42 2
        $amqpMessage->set('timestamp', microtime(true) * 1000);
43
44 2
        $this->channel->basic_publish($amqpMessage, '', $queueName);
45 2
    }
46
47 2
    private function setupChannel(string $queueName): void
48
    {
49 2
        if ($this->channel === null) {
50
            // Don't do this in the constructor, as it will connect() to the rabbit server, making it non-lazy, so
51
            // only channel() when we really need it
52 2
            $this->channel = $this->connection->channel();
53
        }
54
55 2
        $this->channel->queue_declare(
56 2
            $queueName,
57 2
            false,
58 2
            true,
59 2
            false,
60 2
            false,
61 2
            false,
62 2
            new AMQPTable(array(
0 ignored issues
show
Bug introduced by
new PhpAmqpLib\Wire\AMQP...x-max-priority' => 10)) of type PhpAmqpLib\Wire\AMQPTable is incompatible with the type array expected by parameter $arguments of PhpAmqpLib\Channel\AMQPChannel::queue_declare(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

62
            /** @scrutinizer ignore-type */ new AMQPTable(array(
Loading history...
63 2
                "x-max-priority" => 10
64
            ))
65
        );
66 2
    }
67
}
68