SqsAdapter   B
last analyzed

Complexity

Total Complexity 41

Size/Duplication

Total Lines 370
Duplicated Lines 0 %

Test Coverage

Coverage 98.86%

Importance

Changes 0
Metric Value
dl 0
loc 370
ccs 173
cts 175
cp 0.9886
rs 8.2769
c 0
b 0
f 0
wmc 41

17 Methods

Rating   Name   Duplication   Size   Complexity  
A delete() 0 3 1
A __construct() 0 5 1
A getOption() 0 3 2
A getQueueName() 0 3 1
A createRejectEntries() 0 3 1
A purge() 0 3 1
A enqueue() 0 21 4
A getQueueVisibilityTimeout() 0 13 2
A createVisibilityTimeoutEntries() 0 15 1
A createMessageMetadata() 0 9 1
C dequeue() 0 42 8
A reject() 0 21 4
A extend() 0 21 4
A acknowledge() 0 21 4
A getQueueUrl() 0 12 2
A createDeleteEntries() 0 14 1
A createEnqueueEntries() 0 18 3

How to fix   Complexity   

Complex Class

Complex classes like SqsAdapter often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use SqsAdapter, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
/**
4
 * This file is part of graze/queue.
5
 *
6
 * Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://github.com/graze/queue/blob/master/LICENSE MIT
12
 *
13
 * @link    https://github.com/graze/queue
14
 */
15
16
namespace Graze\Queue\Adapter;
17
18
use Aws\Sqs\SqsClient;
19
use Graze\Queue\Adapter\Exception\FailedAcknowledgementException;
20
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
21
use Graze\Queue\Adapter\Exception\FailedExtensionException;
22
use Graze\Queue\Adapter\Exception\FailedRejectionException;
23
use Graze\Queue\Message\MessageFactoryInterface;
24
use Graze\Queue\Message\MessageInterface;
25
26
/**
27
 * Amazon AWS SQS Adapter.
28
 *
29
 * By default this adapter uses standard polling, which may return an empty response
30
 * even if messages exist on the queue.
31
 *
32
 * > This happens when Amazon SQS uses short (standard) polling, the default behavior,
33
 * > where only a subset of the servers (based on a weighted random distribution) are
34
 * > queried to see if any messages are available to include in the response.
35
 *
36
 * You may want to consider setting the `ReceiveMessageWaitTimeSeconds`
37
 * option to enable long polling the queue, which queries all of the servers.
38
 *
39
 * @link https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html
40
 * @link http://docs.aws.amazon.com/aws-sdk-php/guide/latest/service-sqs.html
41
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_createQueue
42
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_deleteMessageBatch
43
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_receiveMessage
44
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_sendMessageBatch
45
 */
46
final class SqsAdapter implements AdapterInterface, NamedInterface
47
{
48
    const BATCHSIZE_DELETE  = 10;
49
    const BATCHSIZE_RECEIVE = 10;
50
    const BATCHSIZE_SEND    = 10;
51
    const BATCHSIZE_EXTEND  = 10;
52
53
    /** @var SqsClient */
54
    protected $client;
55
56
    /** @var array */
57
    protected $options;
58
59
    /** @var string */
60
    protected $name;
61
62
    /** @var string */
63
    protected $url;
64
65
    /**
66
     * @param SqsClient $client
67
     * @param string    $name
68
     * @param array     $options - DelaySeconds <integer> The time in seconds that the delivery of all
69
     *                           messages in the queue will be delayed.
70
     *                           - MaximumMessageSize <integer> The limit of how many bytes a message
71
     *                           can contain before Amazon SQS rejects it.
72
     *                           - MessageRetentionPeriod <integer> The number of seconds Amazon SQS
73
     *                           retains a message.
74
     *                           - Policy <string> The queue's policy. A valid form-url-encoded policy.
75
     *                           - ReceiveMessageWaitTimeSeconds <integer> The time for which a
76
     *                           ReceiveMessage call will wait for a message to arrive.
77
     *                           - VisibilityTimeout <integer> The visibility timeout for the queue.
78
     */
79 22
    public function __construct(SqsClient $client, $name, array $options = [])
80
    {
81 22
        $this->client = $client;
82 22
        $this->name = $name;
83 22
        $this->options = $options;
84 22
    }
85
86
    /**
87
     * @param MessageInterface[] $messages
88
     */
89 6
    public function acknowledge(array $messages)
90
    {
91 6
        $url = $this->getQueueUrl();
92 6
        $errors = [];
93 6
        $batches = array_chunk($this->createDeleteEntries($messages), self::BATCHSIZE_DELETE);
94
95 6
        foreach ($batches as $batch) {
96 6
            $results = $this->client->deleteMessageBatch([
97 6
                'QueueUrl' => $url,
98 6
                'Entries'  => $batch,
99 6
            ]);
100
101 6
            $errors = array_merge($errors, $results->get('Failed') ?: []);
102 6
        }
103
        $map = function ($result) use ($messages) {
104 1
            return $messages[$result['Id']];
105 6
        };
106 6
        $failed = array_map($map, $errors);
107
108 6
        if (!empty($failed)) {
109 1
            throw new FailedAcknowledgementException($this, $failed, $errors);
110
        }
111 5
    }
112
113
    /**
114
     * @param MessageInterface[] $messages
115
     * @param int                $duration Number of seconds to ensure that this message stays being processed and not
116
     *                                     put back on the queue
117
     *
118
     * @return void
119
     */
120 2
    public function extend(array $messages, $duration)
121
    {
122 2
        $url = $this->getQueueUrl();
123 2
        $errors = [];
124 2
        $batches = array_chunk($this->createVisibilityTimeoutEntries($messages, $duration), self::BATCHSIZE_EXTEND);
125
126 2
        foreach ($batches as $batch) {
127 2
            $results = $this->client->changeMessageVisibilityBatch([
128 2
                'QueueUrl' => $url,
129 2
                'Entries'  => $batch,
130 2
            ]);
131
132 2
            $errors = array_merge($errors, $results->get('Failed') ?: []);
133 2
        }
134
        $map = function ($result) use ($messages) {
135 1
            return $messages[$result['Id']];
136 2
        };
137 2
        $failed = array_map($map, $errors);
138
139 2
        if (!empty($failed)) {
140 1
            throw new FailedExtensionException($this, $failed, $errors);
141
        }
142 1
    }
143
144
    /**
145
     * @param MessageInterface[] $messages
146
     *
147
     * @throws FailedAcknowledgementException|void
148
     */
149 2
    public function reject(array $messages)
150
    {
151 2
        $url = $this->getQueueUrl();
152 2
        $errors = [];
153 2
        $batches = array_chunk($this->createRejectEntries($messages), self::BATCHSIZE_DELETE);
154
155 2
        foreach ($batches as $batch) {
156 2
            $results = $this->client->changeMessageVisibilityBatch([
157 2
                'QueueUrl' => $url,
158 2
                'Entries'  => $batch,
159 2
            ]);
160
161 2
            $errors = array_merge($errors, $results->get('Failed') ?: []);
162 2
        }
163
        $map = function ($result) use ($messages) {
164 1
            return $messages[$result['Id']];
165 2
        };
166 2
        $failed = array_map($map, $errors);
167
168 2
        if (!empty($failed)) {
169 1
            throw new FailedRejectionException($this, $failed, $errors);
170
        }
171 1
    }
172
173
    /**
174
     * @param MessageFactoryInterface $factory
175
     * @param int                     $limit
176
     *
177
     * @return \Generator
178
     */
179 8
    public function dequeue(MessageFactoryInterface $factory, $limit)
180
    {
181 8
        $remaining = $limit ?: 0;
182
183 8
        while (null === $limit || $remaining > 0) {
184
            /**
185
             * If a limit has been specified, set {@see $size} so that we don't return more
186
             * than the requested number of messages if it's less than the batch size.
187
             */
188 8
            $size = ($limit !== null) ? min($remaining, self::BATCHSIZE_RECEIVE) : self::BATCHSIZE_RECEIVE;
189
190 8
            $timestamp = time() + $this->getQueueVisibilityTimeout();
191
            $validator = function () use ($timestamp) {
192 4
                return time() < $timestamp;
193 8
            };
194
195 8
            $results = $this->client->receiveMessage(array_filter([
196 8
                'QueueUrl'            => $this->getQueueUrl(),
197 8
                'AttributeNames'      => ['All'],
198 8
                'MaxNumberOfMessages' => $size,
199 8
                'VisibilityTimeout'   => $this->getOption('VisibilityTimeout'),
200 8
                'WaitTimeSeconds'     => $this->getOption('ReceiveMessageWaitTimeSeconds'),
201 8
            ]));
202
203 8
            $messages = $results->get('Messages') ?: [];
204
205 8
            if (count($messages) === 0) {
206 1
                break;
207
            }
208
209 7
            foreach ($messages as $result) {
210 7
                yield $factory->createMessage(
211 7
                    $result['Body'],
212
                    [
213 7
                        'metadata'  => $this->createMessageMetadata($result),
214 7
                        'validator' => $validator,
215
                    ]
216 7
                );
217 5
            }
218
219
            // Decrement the number of messages remaining.
220 5
            $remaining -= count($messages);
221 5
        }
222 6
    }
223
224
    /**
225
     * @param MessageInterface[] $messages
226
     */
227 4
    public function enqueue(array $messages)
228
    {
229 4
        $url = $this->getQueueUrl();
230 4
        $errors = [];
231 4
        $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND);
232
233 4
        foreach ($batches as $batch) {
234 4
            $results = $this->client->sendMessageBatch([
235 4
                'QueueUrl' => $url,
236 4
                'Entries'  => $batch,
237 4
            ]);
238
239 4
            $errors = array_merge($errors, $results->get('Failed') ?: []);
240 4
        }
241
        $map = function ($result) use ($messages) {
242
            return $messages[$result['Id']];
243 4
        };
244 4
        $failed = array_map($map, $errors);
245
246 4
        if (!empty($failed)) {
247
            throw new FailedEnqueueException($this, $failed, $errors);
248
        }
249 4
    }
250
251
    /**
252
     * {@inheritdoc}
253
     */
254 2
    public function purge()
255
    {
256 2
        $this->client->purgeQueue(['QueueUrl' => $this->getQueueUrl()]);
257 2
    }
258
259
    /**
260
     * {@inheritdoc}
261
     */
262 2
    public function delete()
263
    {
264 2
        $this->client->deleteQueue(['QueueUrl' => $this->getQueueUrl()]);
265 2
    }
266
267
    /**
268
     * @param MessageInterface[] $messages
269
     *
270
     * @return array
271
     */
272 6
    protected function createDeleteEntries(array $messages)
273
    {
274 6
        array_walk(
275 6
            $messages,
276
            function (MessageInterface &$message, $id) {
277 6
                $metadata = $message->getMetadata();
278
                $message = [
279 6
                    'Id'            => $id,
280 6
                    'ReceiptHandle' => $metadata->get('ReceiptHandle'),
281 6
                ];
282 6
            }
283 6
        );
284
285 6
        return $messages;
286
    }
287
288
    /**
289
     * @param MessageInterface[] $messages
290
     *
291
     * @return array
292
     */
293 2
    protected function createRejectEntries(array $messages)
294
    {
295 2
        return $this->createVisibilityTimeoutEntries($messages, 0);
296
    }
297
298
    /**
299
     * @param MessageInterface[] $messages
300
     * @param int                $timeout
301
     *
302
     * @return array
303
     */
304 4
    private function createVisibilityTimeoutEntries(array $messages, $timeout)
305
    {
306 4
        array_walk(
307 4
            $messages,
308
            function (MessageInterface &$message, $id) use ($timeout) {
309 4
                $metadata = $message->getMetadata();
310
                $message = [
311 4
                    'Id'                => $id,
312 4
                    'ReceiptHandle'     => $metadata->get('ReceiptHandle'),
313 4
                    'VisibilityTimeout' => $timeout,
314 4
                ];
315 4
            }
316 4
        );
317
318 4
        return $messages;
319
    }
320
321
    /**
322
     * @param MessageInterface[] $messages
323
     *
324
     * @return array
325
     */
326 4
    protected function createEnqueueEntries(array $messages)
327
    {
328 4
        array_walk(
329 4
            $messages,
330 4
            function (MessageInterface &$message, $id) {
331 4
                $metadata = $message->getMetadata();
332
                $message = [
333 4
                    'Id'                => $id,
334 4
                    'MessageBody'       => $message->getBody(),
335 4
                    'MessageAttributes' => $metadata->get('MessageAttributes') ?: [],
336 4
                ];
337 4
                if (!is_null($metadata->get('DelaySeconds'))) {
338 2
                    $message['DelaySeconds'] = $metadata->get('DelaySeconds');
339 2
                }
340 4
            }
341 4
        );
342
343 4
        return $messages;
344
    }
345
346
    /**
347
     * @param array $result
348
     *
349
     * @return array
350
     */
351 7
    protected function createMessageMetadata(array $result)
352
    {
353 7
        return array_intersect_key(
354 7
            $result,
355
            [
356 7
                'Attributes'        => [],
357 7
                'MessageAttributes' => [],
358 7
                'MessageId'         => null,
359 7
                'ReceiptHandle'     => null,
360
            ]
361 7
        );
362
    }
363
364
    /**
365
     * @param string $name
366
     * @param mixed  $default
367
     *
368
     * @return mixed
369
     */
370 8
    protected function getOption($name, $default = null)
371
    {
372 8
        return isset($this->options[$name]) ? $this->options[$name] : $default;
373
    }
374
375
    /**
376
     * @return string
377
     */
378 21
    protected function getQueueUrl()
379
    {
380 21
        if (!$this->url) {
381 21
            $result = $this->client->createQueue([
382 21
                'QueueName'  => $this->name,
383 21
                'Attributes' => $this->options,
384 21
            ]);
385
386 21
            $this->url = $result->get('QueueUrl');
387 21
        }
388
389 21
        return $this->url;
390
    }
391
392
    /**
393
     * @return int
394
     */
395 8
    protected function getQueueVisibilityTimeout()
396
    {
397 8
        if (!isset($this->options['VisibilityTimeout'])) {
398 8
            $result = $this->client->getQueueAttributes([
399 8
                'QueueUrl'       => $this->getQueueUrl(),
400 8
                'AttributeNames' => ['VisibilityTimeout'],
401 8
            ]);
402
403 8
            $attributes = $result->get('Attributes');
404 8
            $this->options['VisibilityTimeout'] = $attributes['VisibilityTimeout'];
405 8
        }
406
407 8
        return $this->options['VisibilityTimeout'];
408
    }
409
410
    /**
411
     * @return string
412
     */
413 3
    public function getQueueName()
414
    {
415 3
        return $this->name;
416
    }
417
}
418