AmqpProducer   A
last analyzed

Complexity

Total Complexity 4

Size/Duplication

Total Lines 52
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 4
dl 0
loc 52
ccs 24
cts 24
cp 1
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A send() 0 10 1
A __construct() 0 4 1
A setupChannel() 0 17 2
1
<?php
2
3
namespace Werkspot\MessageQueue\DeliveryQueue\Amqp;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Connection\AMQPLazyConnection;
7
use PhpAmqpLib\Message\AMQPMessage;
8
use PhpAmqpLib\Wire\AMQPTable;
9
use Werkspot\MessageQueue\DeliveryQueue\ProducerInterface;
10
use Werkspot\MessageQueue\Message\MessageInterface;
11
12
class AmqpProducer implements ProducerInterface
13
{
14
    /**
15
     * @var AMQPChannel
16
     */
17
    private $channel;
18
19
    /**
20
     * @var AMQPLazyConnection
21
     */
22
    private $connection;
23
24
    /**
25
     * @var MessageIdGeneratorInterface
26
     */
27
    private $idGenerator;
28
29 3
    public function __construct(AMQPLazyConnection $connection, MessageIdGeneratorInterface $idGenerator)
30
    {
31 3
        $this->connection = $connection;
32 3
        $this->idGenerator = $idGenerator;
33 3
    }
34
35 2
    public function send(MessageInterface $message, string $queueName): void
36
    {
37 2
        $this->setupChannel($queueName);
38
39 2
        $amqpMessage = new AMQPMessage(serialize($message));
40 2
        $amqpMessage->set('message_id', $this->idGenerator->generateId());
41 2
        $amqpMessage->set('priority', $message->getPriority());
42 2
        $amqpMessage->set('timestamp', microtime(true) * 1000);
43
44 2
        $this->channel->basic_publish($amqpMessage, '', $queueName);
45 2
    }
46
47 2
    private function setupChannel(string $queueName): void
48
    {
49 2
        if ($this->channel === null) {
50
            // Don't do this in the constructor, as it will connect() to the rabbit server, making it non-lazy, so
51
            // only channel() when we really need it
52 2
            $this->channel = $this->connection->channel();
53
        }
54
55 2
        $this->channel->queue_declare(
56 2
            $queueName,
57 2
            false,
58 2
            true,
59 2
            false,
60 2
            false,
61 2
            false,
62 2
            new AMQPTable(array(
0 ignored issues
show
Bug introduced by
new PhpAmqpLib\Wire\AMQP...x-max-priority' => 10)) of type PhpAmqpLib\Wire\AMQPTable is incompatible with the type array expected by parameter $arguments of PhpAmqpLib\Channel\AMQPChannel::queue_declare(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

62
            /** @scrutinizer ignore-type */ new AMQPTable(array(
Loading history...
63 2
                "x-max-priority" => 10
64
            ))
65
        );
66 2
    }
67
}
68