Passed
Branch master (ae13e1)
by Damian
03:05
created

SqsQueue::peek()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0175

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 14
ccs 7
cts 8
cp 0.875
rs 10
c 0
b 0
f 0
cc 3
nc 2
nop 0
crap 3.0175
1
<?php declare(strict_types=1);
2
3
namespace Initx\Driver;
4
5
use Aws\Result;
6
use Aws\Sqs\SqsClient;
7
use Initx\Envelope;
8
use Initx\Exception\IllegalStateException;
9
use Initx\Exception\NoSuchElementException;
10
use Initx\Queue;
11
use JMS\Serializer\SerializerInterface;
12
use Ramsey\Uuid\Uuid;
13
14
class SqsQueue implements Queue
15
{
16
    use HasFallbackSerializer;
17
18
    /**
19
     * @var SqsClient
20
     */
21
    private $client;
22
23
    /**
24
     * @var string
25
     */
26
    private $queueName;
27
28
    /**
29
     * @var string
30
     */
31
    private $queueUrl;
32
33
    /**
34
     * @var SerializerInterface
35
     */
36
    private $serializer;
37
38 3
    public function __construct(SqsClient $client, string $queueName, ?SerializerInterface $serializer = null)
39
    {
40 3
        $this->client = $client;
41 3
        $this->queueName = $queueName;
42 3
        $this->serializer = $serializer;
43 3
        $this->fallbackSerializer();
44 3
    }
45
46 2
    public function add(Envelope $envelope): void
47
    {
48 2
        if (!$this->offer($envelope)) {
49 1
            throw new IllegalStateException("Could not add to queue $this->queueName");
50
        }
51 1
    }
52
53 2
    public function offer(Envelope $envelope): bool
54
    {
55 2
        $this->resolveQueueUrl();
56
57 2
        $serialized = $this->serializer->serialize($envelope, 'json');
58
        $args = [
59 2
            'QueueUrl' => $this->queueUrl,
60 2
            'MessageBody' => $serialized,
61
        ];
62
63 2
        if ($this->isFifo()) {
64 1
            $args['MessageGroupId'] = __CLASS__;
65 1
            $args['MessageDeduplicationId'] = Uuid::uuid4()->toString();
66
        }
67
68 2
        $result = $this->client->sendMessage($args);
69
70 2
        return $result ? true : false;
0 ignored issues
show
introduced by
$result is of type Aws\Result, thus it always evaluated to true.
Loading history...
71
    }
72
73 1
    public function remove(): Envelope
74
    {
75 1
        $envelope = $this->poll();
76
77 1
        if (!$envelope) {
78
            throw new NoSuchElementException();
79
        }
80
81 1
        return $envelope;
82
    }
83
84 1
    public function poll(): ?Envelope
85
    {
86 1
        $this->resolveQueueUrl();
87 1
        $result = $this->queryForOneMessage();
88
89 1
        if ($result->get('Messages') && count($result->get('Messages'))) {
90 1
            $message = $result->get('Messages')[0];
91 1
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
92
93
            // remove message from queue
94 1
            $this->client->deleteMessage([
95 1
                'QueueUrl' => $this->queueUrl,
96 1
                'ReceiptHandle' => $message['ReceiptHandle'],
97
            ]);
98
99 1
            return $envelope;
100
        }
101
102
        // no messages
103
        return null;
104
    }
105
106 1
    public function element(): Envelope
107
    {
108 1
        $envelope = $this->peek();
109
110 1
        if (!$envelope) {
111
            throw new NoSuchElementException();
112
        }
113
114 1
        return $envelope;
115
    }
116
117 1
    public function peek(): ?Envelope
118
    {
119 1
        $this->resolveQueueUrl();
120 1
        $result = $this->queryForOneMessage();
121
122 1
        if ($result->get('Messages') && count($result->get('Messages'))) {
123 1
            $message = $result->get('Messages')[0];
124 1
            $envelope = $this->serializer->deserialize($message['Body'], Envelope::class, 'json');
125
126 1
            return $envelope;
127
        }
128
129
        // no messages
130
        return null;
131
    }
132
133 3
    private function resolveQueueUrl(): void
134
    {
135 3
        if ($this->queueUrl === null) {
136 3
            $result = $this->client->getQueueUrl(['QueueName' => $this->queueName]);
137
138 3
            if (!$result || !$this->queueUrl = $result->get('QueueUrl')) {
0 ignored issues
show
introduced by
$result is of type Aws\Result, thus it always evaluated to true.
Loading history...
139
                throw new IllegalStateException("Could not resolve queue url from queue name '{$this->queueName}'");
140
            }
141
        }
142
143 3
    }
144
145 1
    private function queryForOneMessage(): Result
146
    {
147 1
        return $this->client->receiveMessage([
148 1
            'MaxNumberOfMessages' => 1,
149 1
            'QueueUrl' => $this->queueUrl,
150
        ]);
151
    }
152
153 2
    private function isFifo(): bool
154
    {
155 2
        return $this->endsWith($this->queueName, '.fifo');
156
    }
157
158 2
    private function endsWith(string $haystack, string $needle): bool
159
    {
160 2
        $length = strlen($needle);
161
162 2
        return (substr($haystack, -$length) === $needle);
163
    }
164
}
165