Completed
Branch master (339a46)
by Damian
04:14
created

AmqpQueue   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 91
Duplicated Lines 0 %

Test Coverage

Coverage 96.43%

Importance

Changes 0
Metric Value
eloc 31
dl 0
loc 91
ccs 27
cts 28
cp 0.9643
rs 10
c 0
b 0
f 0
wmc 8

5 Methods

Rating   Name   Duplication   Size   Complexity  
A add() 0 7 2
A poll() 0 9 2
A peek() 0 11 2
A __construct() 0 13 1
A offer() 0 8 1
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Queue;
8
use JMS\Serializer\SerializerInterface;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Connection\AbstractConnection;
11
use PhpAmqpLib\Message\AMQPMessage;
12
13
final class AmqpQueue implements Queue
14
{
15
    use HasFallbackSerializer;
16
    use HasDefaultRemoveAndElement;
17
18
    /**
19
     * @var SerializerInterface
20
     */
21
    private $serializer;
22
23
    /**
24
     * @var string
25
     */
26
    private $exchange;
27
28
    /**
29
     * @var AbstractConnection
30
     */
31
    private $connection;
32
33
    /**
34
     * @var string
35
     */
36
    private $routingKey;
37
38
    /**
39
     * @var string
40
     */
41
    private $queue;
42
43
    /**
44
     * @var AMQPChannel
45
     */
46
    private $channel;
47
48 5
    public function __construct(
49
        AbstractConnection $connection,
50
        string $queue,
51
        string $exchange = '',
52
        string $routingKey = '',
53
        ?SerializerInterface $serializer = null
54
    ) {
55 5
        $this->connection = $connection;
56 5
        $this->channel = $connection->channel();
57 5
        $this->queue = $queue;
58 5
        $this->exchange = $exchange;
59 5
        $this->routingKey = $routingKey;
60 5
        $this->serializer = $this->fallbackSerializer($serializer);
61 5
    }
62
63 3
    public function add(Envelope $envelope): bool
64
    {
65 3
        if (!$this->offer($envelope)) {
66
            throw new IllegalStateException("Could not write to AMQP");
67
        }
68
69 3
        return true;
70
    }
71
72 4
    public function offer(Envelope $envelope): bool
73
    {
74 4
        $serialized = $this->serializer->serialize($envelope, 'json');
75 4
        $message = new AMQPMessage($serialized);
76
77 4
        $this->channel->basic_publish($message, $this->exchange, $this->routingKey);
78
79 4
        return true;
80
    }
81
82 2
    public function poll(): ?Envelope
83
    {
84 2
        $message = $this->channel->basic_get($this->queue, true);
85
86 2
        if (!$message) {
87 1
            return null;
88
        }
89
90 1
        return $this->serializer->deserialize($message->body, Envelope::class, 'json');
91
    }
92
93 2
    public function peek(): ?Envelope
94
    {
95 2
        $message = $this->channel->basic_get($this->queue);
96
97 2
        if (!$message) {
98 1
            return null;
99
        }
100
101 1
        $this->channel->basic_nack($message->get('delivery_tag'), false, true);
102
103 1
        return $this->serializer->deserialize($message->body, Envelope::class, 'json');
104
    }
105
}
106