AmqpQueue   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 85
Duplicated Lines 0 %

Test Coverage

Coverage 96.3%

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 29
c 1
b 0
f 1
dl 0
loc 85
ccs 26
cts 27
cp 0.963
rs 10
wmc 8

5 Methods

Rating   Name   Duplication   Size   Complexity  
A add() 0 7 2
A poll() 0 9 2
A peek() 0 11 2
A __construct() 0 12 1
A offer() 0 8 1
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Queue;
8
use JMS\Serializer\SerializerInterface;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Connection\AbstractConnection;
11
use PhpAmqpLib\Message\AMQPMessage;
12
13
final class AmqpQueue implements Queue
14
{
15
    use HasFallbackSerializer;
16
    use HasDefaultRemoveAndElement;
17
18
    /**
19
     * @var SerializerInterface
20
     */
21
    private $serializer;
22
23
    /**
24
     * @var string
25
     */
26
    private $exchange;
27
28
    /**
29
     * @var string
30
     */
31
    private $routingKey;
32
33
    /**
34
     * @var string
35
     */
36
    private $queue;
37
38
    /**
39
     * @var AMQPChannel
40
     */
41
    private $channel;
42
43 5
    public function __construct(
44
        AbstractConnection $connection,
45
        string $queue,
46
        string $exchange = '',
47
        string $routingKey = '',
48
        ?SerializerInterface $serializer = null
49
    ) {
50 5
        $this->channel = $connection->channel();
51 5
        $this->queue = $queue;
52 5
        $this->exchange = $exchange;
53 5
        $this->routingKey = $routingKey;
54 5
        $this->serializer = $this->fallbackSerializer($serializer);
55 5
    }
56
57 3
    public function add(Envelope $envelope): bool
58
    {
59 3
        if (!$this->offer($envelope)) {
60
            throw new IllegalStateException("Could not write to AMQP");
61
        }
62
63 3
        return true;
64
    }
65
66 4
    public function offer(Envelope $envelope): bool
67
    {
68 4
        $serialized = $this->serializer->serialize($envelope, 'json');
69 4
        $message = new AMQPMessage($serialized);
70
71 4
        $this->channel->basic_publish($message, $this->exchange, $this->routingKey);
72
73 4
        return true;
74
    }
75
76 2
    public function poll(): ?Envelope
77
    {
78 2
        $message = $this->channel->basic_get($this->queue, true);
79
80 2
        if (!$message) {
81 1
            return null;
82
        }
83
84 1
        return $this->serializer->deserialize($message->body, Envelope::class, 'json');
85
    }
86
87 2
    public function peek(): ?Envelope
88
    {
89 2
        $message = $this->channel->basic_get($this->queue);
90
91 2
        if (!$message) {
92 1
            return null;
93
        }
94
95 1
        $this->channel->basic_nack($message->get('delivery_tag'), false, true);
96
97 1
        return $this->serializer->deserialize($message->body, Envelope::class, 'json');
98
    }
99
}
100