Passed
Push — master ( 0d68ae...2d2c36 )
by Damian
04:18
created

SqsQueue::peek()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0175

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 14
ccs 7
cts 8
cp 0.875
rs 10
c 0
b 0
f 0
cc 3
nc 2
nop 0
crap 3.0175
1
<?php declare(strict_types=1);
2
3
namespace Initx\Driver;
4
5
use Aws\Result;
6
use Aws\Sqs\SqsClient;
7
use Initx\Envelope;
8
use Initx\Exception\IllegalStateException;
9
use Initx\Exception\NoSuchElementException;
10
use Initx\Queue;
11
use JMS\Serializer\SerializerInterface;
12
use Ramsey\Uuid\Uuid;
13
14
final class SqsQueue implements Queue
15
{
16
    use HasFallbackSerializer;
17
18
    /**
19
     * @var SqsClient
20
     */
21
    private $client;
22
23
    /**
24
     * @var string
25
     */
26
    private $queueName;
27
28
    /**
29
     * @var string
30
     */
31
    private $queueUrl = '';
32
33
    /**
34
     * @var SerializerInterface
35
     */
36
    private $serializer;
37
38 3
    public function __construct(SqsClient $client, string $queueName, ?SerializerInterface $serializer = null)
39
    {
40 3
        $this->client = $client;
41 3
        $this->queueName = $queueName;
42 3
        $this->serializer = $this->fallbackSerializer($serializer);
43 3
    }
44
45 2
    public function add(Envelope $envelope): void
46
    {
47 2
        if (!$this->offer($envelope)) {
48 1
            throw new IllegalStateException("Could not add to queue $this->queueName");
49
        }
50 1
    }
51
52 2
    public function offer(Envelope $envelope): bool
53
    {
54 2
        $this->resolveQueueUrl();
55
56 2
        $serialized = $this->serializer->serialize($envelope, 'json');
57
        $args = [
58 2
            'QueueUrl' => $this->queueUrl,
59 2
            'MessageBody' => $serialized,
60
        ];
61
62 2
        if ($this->isFifo()) {
63 1
            $args['MessageGroupId'] = __CLASS__;
64 1
            $args['MessageDeduplicationId'] = Uuid::uuid4()->toString();
65
        }
66
67 2
        return (bool)$this->client->sendMessage($args);
68
    }
69
70 1
    public function remove(): Envelope
71
    {
72 1
        $envelope = $this->poll();
73
74 1
        if (!$envelope) {
75
            throw new NoSuchElementException();
76
        }
77
78 1
        return $envelope;
79
    }
80
81 1
    public function poll(): ?Envelope
82
    {
83 1
        $this->resolveQueueUrl();
84 1
        $result = $this->queryForOneMessage();
85
86 1
        if ($result->get('Messages') && count($result->get('Messages'))) {
87 1
            $message = $result->get('Messages')[0];
88 1
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
89
90
            // remove message from queue
91 1
            $this->client->deleteMessage([
92 1
                'QueueUrl' => $this->queueUrl,
93 1
                'ReceiptHandle' => $message['ReceiptHandle'],
94
            ]);
95
96 1
            return $envelope;
97
        }
98
99
        // no messages
100
        return null;
101
    }
102
103 1
    public function element(): Envelope
104
    {
105 1
        $envelope = $this->peek();
106
107 1
        if (!$envelope) {
108
            throw new NoSuchElementException();
109
        }
110
111 1
        return $envelope;
112
    }
113
114 1
    public function peek(): ?Envelope
115
    {
116 1
        $this->resolveQueueUrl();
117 1
        $result = $this->queryForOneMessage();
118
119 1
        if ($result->get('Messages') && count($result->get('Messages'))) {
120 1
            $message = $result->get('Messages')[0];
121 1
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
122
123 1
            return $envelope;
124
        }
125
126
        // no messages
127
        return null;
128
    }
129
130 3
    private function resolveQueueUrl(): void
131
    {
132 3
        if (empty($this->queueUrl)) {
133 3
            $result = $this->client->getQueueUrl(['QueueName' => $this->queueName]);
134 3
            $this->queueUrl = $result->get('QueueUrl');
135 3
            if (empty($this->queueUrl)) {
136
                throw new IllegalStateException("Could not resolve queue url from queue name '{$this->queueName}'");
137
            }
138
        }
139 3
    }
140
141 1
    private function queryForOneMessage(): Result
142
    {
143 1
        return $this->client->receiveMessage([
144 1
            'MaxNumberOfMessages' => 1,
145 1
            'QueueUrl' => $this->queueUrl,
146
        ]);
147
    }
148
149 2
    private function isFifo(): bool
150
    {
151 2
        return $this->endsWith($this->queueName, '.fifo');
152
    }
153
154 2
    private function endsWith(string $haystack, string $needle): bool
155
    {
156 2
        $length = strlen($needle);
157
158 2
        return (substr($haystack, -$length) === $needle);
159
    }
160
}
161