Completed
Push — master ( 129581...a310fa )
by Damian
03:27
created

SqsQueue   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 147
Duplicated Lines 0 %

Test Coverage

Coverage 98.41%

Importance

Changes 0
Metric Value
eloc 57
dl 0
loc 147
ccs 62
cts 63
cp 0.9841
rs 10
c 0
b 0
f 0
wmc 21

11 Methods

Rating   Name   Duplication   Size   Complexity  
A peek() 0 14 3
A queryForOneMessage() 0 5 1
A poll() 0 20 3
A isFifo() 0 3 1
A remove() 0 9 2
A resolveQueueUrl() 0 7 3
A endsWith() 0 5 1
A add() 0 7 2
A offer() 0 16 2
A __construct() 0 5 1
A element() 0 9 2
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Aws\Result;
6
use Aws\Sqs\SqsClient;
7
use Initx\Querabilis\Envelope;
8
use Initx\Querabilis\Exception\IllegalStateException;
9
use Initx\Querabilis\Exception\NoSuchElementException;
10
use Initx\Querabilis\Queue;
11
use JMS\Serializer\SerializerInterface;
12
use Ramsey\Uuid\Uuid;
13
14
final class SqsQueue implements Queue
15
{
16
    use HasFallbackSerializer;
17
18
    /**
19
     * @var SqsClient
20
     */
21
    private $client;
22
23
    /**
24
     * @var string
25
     */
26
    private $queueName;
27
28
    /**
29
     * @var string
30
     */
31
    private $queueUrl = '';
32
33
    /**
34
     * @var SerializerInterface
35
     */
36
    private $serializer;
37
38 4
    public function __construct(SqsClient $client, string $queueName, ?SerializerInterface $serializer = null)
39
    {
40 4
        $this->client = $client;
41 4
        $this->queueName = $queueName;
42 4
        $this->serializer = $this->fallbackSerializer($serializer);
43 4
    }
44
45 2
    public function add(Envelope $envelope): bool
46
    {
47 2
        if (!$this->offer($envelope)) {
48 1
            throw new IllegalStateException("Could not add to queue $this->queueName");
49
        }
50
51 1
        return true;
52
    }
53
54 2
    public function offer(Envelope $envelope): bool
55
    {
56 2
        $this->resolveQueueUrl();
57
58 2
        $serialized = $this->serializer->serialize($envelope, 'json');
59
        $args = [
60 2
            'QueueUrl' => $this->queueUrl,
61 2
            'MessageBody' => $serialized,
62
        ];
63
64 2
        if ($this->isFifo()) {
65 1
            $args['MessageGroupId'] = __CLASS__;
66 1
            $args['MessageDeduplicationId'] = Uuid::uuid4()->toString();
67
        }
68
69 2
        return (bool)$this->client->sendMessage($args);
70
    }
71
72 2
    public function remove(): Envelope
73
    {
74 2
        $envelope = $this->poll();
75
76 2
        if (!$envelope) {
77 1
            throw new NoSuchElementException();
78
        }
79
80 1
        return $envelope;
81
    }
82
83 2
    public function poll(): ?Envelope
84
    {
85 2
        $this->resolveQueueUrl();
86 2
        $result = $this->queryForOneMessage();
87
88 2
        if ($result->get('Messages') && count($result->get('Messages'))) {
89 1
            $message = $result->get('Messages')[0];
90 1
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
91
92
            // remove message from queue
93 1
            $this->client->deleteMessage([
94 1
                'QueueUrl' => $this->queueUrl,
95 1
                'ReceiptHandle' => $message['ReceiptHandle'],
96
            ]);
97
98 1
            return $envelope;
99
        }
100
101
        // no messages
102 1
        return null;
103
    }
104
105 2
    public function element(): Envelope
106
    {
107 2
        $envelope = $this->peek();
108
109 2
        if (!$envelope) {
110 1
            throw new NoSuchElementException();
111
        }
112
113 1
        return $envelope;
114
    }
115
116 2
    public function peek(): ?Envelope
117
    {
118 2
        $this->resolveQueueUrl();
119 2
        $result = $this->queryForOneMessage();
120
121 2
        if ($result->get('Messages') && count($result->get('Messages'))) {
122 1
            $message = $result->get('Messages')[0];
123 1
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
124
125 1
            return $envelope;
126
        }
127
128
        // no messages
129 1
        return null;
130
    }
131
132 4
    private function resolveQueueUrl(): void
133
    {
134 4
        if (empty($this->queueUrl)) {
135 4
            $result = $this->client->getQueueUrl(['QueueName' => $this->queueName]);
136 4
            $this->queueUrl = $result->get('QueueUrl');
137 4
            if (empty($this->queueUrl)) {
138
                throw new IllegalStateException("Could not resolve queue url from queue name '{$this->queueName}'");
139
            }
140
        }
141 4
    }
142
143 2
    private function queryForOneMessage(): Result
144
    {
145 2
        return $this->client->receiveMessage([
146 2
            'MaxNumberOfMessages' => 1,
147 2
            'QueueUrl' => $this->queueUrl,
148
        ]);
149
    }
150
151 2
    private function isFifo(): bool
152
    {
153 2
        return $this->endsWith($this->queueName, '.fifo');
154
    }
155
156 2
    private function endsWith(string $haystack, string $needle): bool
157
    {
158 2
        $length = strlen($needle);
159
160 2
        return (substr($haystack, -$length) === $needle);
161
    }
162
}
163