Completed
Branch master (339a46)
by Damian
04:14
created

AmqpQueue::offer()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 4
nc 1
nop 1
dl 0
loc 8
ccs 5
cts 5
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Queue;
8
use JMS\Serializer\SerializerInterface;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Connection\AbstractConnection;
11
use PhpAmqpLib\Message\AMQPMessage;
12
13
final class AmqpQueue implements Queue
14
{
15
    use HasFallbackSerializer;
16
    use HasDefaultRemoveAndElement;
17
18
    /**
19
     * @var SerializerInterface
20
     */
21
    private $serializer;
22
23
    /**
24
     * @var string
25
     */
26
    private $exchange;
27
28
    /**
29
     * @var AbstractConnection
30
     */
31
    private $connection;
32
33
    /**
34
     * @var string
35
     */
36
    private $routingKey;
37
38
    /**
39
     * @var string
40
     */
41
    private $queue;
42
43
    /**
44
     * @var AMQPChannel
45
     */
46
    private $channel;
47
48 5
    public function __construct(
49
        AbstractConnection $connection,
50
        string $queue,
51
        string $exchange = '',
52
        string $routingKey = '',
53
        ?SerializerInterface $serializer = null
54
    ) {
55 5
        $this->connection = $connection;
56 5
        $this->channel = $connection->channel();
57 5
        $this->queue = $queue;
58 5
        $this->exchange = $exchange;
59 5
        $this->routingKey = $routingKey;
60 5
        $this->serializer = $this->fallbackSerializer($serializer);
61 5
    }
62
63 3
    public function add(Envelope $envelope): bool
64
    {
65 3
        if (!$this->offer($envelope)) {
66
            throw new IllegalStateException("Could not write to AMQP");
67
        }
68
69 3
        return true;
70
    }
71
72 4
    public function offer(Envelope $envelope): bool
73
    {
74 4
        $serialized = $this->serializer->serialize($envelope, 'json');
75 4
        $message = new AMQPMessage($serialized);
76
77 4
        $this->channel->basic_publish($message, $this->exchange, $this->routingKey);
78
79 4
        return true;
80
    }
81
82 2
    public function poll(): ?Envelope
83
    {
84 2
        $message = $this->channel->basic_get($this->queue, true);
85
86 2
        if (!$message) {
87 1
            return null;
88
        }
89
90 1
        return $this->serializer->deserialize($message->body, Envelope::class, 'json');
91
    }
92
93 2
    public function peek(): ?Envelope
94
    {
95 2
        $message = $this->channel->basic_get($this->queue);
96
97 2
        if (!$message) {
98 1
            return null;
99
        }
100
101 1
        $this->channel->basic_nack($message->get('delivery_tag'), false, true);
102
103 1
        return $this->serializer->deserialize($message->body, Envelope::class, 'json');
104
    }
105
}
106