Completed
Pull Request — master (#43)
by Harry
10:13
created

SqsAdapter   B

Complexity

Total Complexity 36

Size/Duplication

Total Lines 329
Duplicated Lines 30.4 %

Coupling/Cohesion

Components 1
Dependencies 7

Test Coverage

Coverage 94.87%

Importance

Changes 0
Metric Value
wmc 36
lcom 1
cbo 7
dl 100
loc 329
ccs 111
cts 117
cp 0.9487
rs 8.8
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
B acknowledge() 23 23 4
A delete() 0 4 1
B reject() 23 23 4
C dequeue() 0 44 8
B enqueue() 23 23 4
A purge() 0 4 1
A createDeleteEntries() 15 15 1
A createRejectEntries() 16 16 1
A createEnqueueEntries() 0 19 3
A createMessageMetadata() 0 12 1
A getOption() 0 4 2
A getQueueUrl() 0 13 2
A getQueueVisibilityTimeout() 0 14 2
A getQueueName() 0 4 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
3
/**
4
 * This file is part of graze/queue.
5
 *
6
 * Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://github.com/graze/queue/blob/master/LICENSE MIT
12
 *
13
 * @link    https://github.com/graze/queue
14
 */
15
16
namespace Graze\Queue\Adapter;
17
18
use Aws\Sqs\SqsClient;
19
use Graze\Queue\Adapter\Exception\FailedAcknowledgementException;
20
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
21
use Graze\Queue\Message\MessageFactoryInterface;
22
use Graze\Queue\Message\MessageInterface;
23
24
/**
25
 * Amazon AWS SQS Adapter.
26
 *
27
 * By default this adapter uses standard polling, which may return an empty response
28
 * even if messages exist on the queue.
29
 *
30
 * > This happens when Amazon SQS uses short (standard) polling, the default behavior,
31
 * > where only a subset of the servers (based on a weighted random distribution) are
32
 * > queried to see if any messages are available to include in the response.
33
 *
34
 * You may want to consider setting the `ReceiveMessageWaitTimeSeconds`
35
 * option to enable long polling the queue, which queries all of the servers.
36
 *
37
 * @link https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html
38
 * @link http://docs.aws.amazon.com/aws-sdk-php/guide/latest/service-sqs.html
39
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_createQueue
40
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_deleteMessageBatch
41
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_receiveMessage
42
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_sendMessageBatch
43
 */
44
final class SqsAdapter implements AdapterInterface, NamedInterface
45
{
46
    const BATCHSIZE_DELETE  = 10;
47
    const BATCHSIZE_RECEIVE = 10;
48
    const BATCHSIZE_SEND    = 10;
49
50
    /** @var SqsClient */
51
    protected $client;
52
53
    /** @var array */
54
    protected $options;
55
56
    /** @var string */
57
    protected $name;
58
59
    /** @var string */
60
    protected $url;
61
62
    /**
63
     * @param SqsClient $client
64
     * @param string    $name
65
     * @param array     $options - DelaySeconds <integer> The time in seconds that the delivery of all
66
     *                           messages in the queue will be delayed.
67
     *                           - MaximumMessageSize <integer> The limit of how many bytes a message
68
     *                           can contain before Amazon SQS rejects it.
69
     *                           - MessageRetentionPeriod <integer> The number of seconds Amazon SQS
70
     *                           retains a message.
71
     *                           - Policy <string> The queue's policy. A valid form-url-encoded policy.
72
     *                           - ReceiveMessageWaitTimeSeconds <integer> The time for which a
73
     *                           ReceiveMessage call will wait for a message to arrive.
74
     *                           - VisibilityTimeout <integer> The visibility timeout for the queue.
75
     */
76 17
    public function __construct(SqsClient $client, $name, array $options = [])
77
    {
78 17
        $this->client = $client;
79 17
        $this->name = $name;
80 17
        $this->options = $options;
81 17
    }
82
83
    /**
84
     * @param MessageInterface[] $messages
85
     */
86 5 View Code Duplication
    public function acknowledge(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
87
    {
88 5
        $url = $this->getQueueUrl();
89 5
        $failed = [];
90 5
        $batches = array_chunk($this->createDeleteEntries($messages), self::BATCHSIZE_DELETE);
91
92 5
        foreach ($batches as $batch) {
93 5
            $results = $this->client->deleteMessageBatch([
94 5
                'QueueUrl' => $url,
95 5
                'Entries'  => $batch,
96 5
            ]);
97
98
            $map = function ($result) use ($messages) {
99
                return $messages[$result['Id']];
100 5
            };
101
102 5
            $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: []));
103 5
        }
104
105 5
        if (!empty($failed)) {
106
            throw new FailedAcknowledgementException($this, $failed);
107
        }
108 5
    }
109
110
    /**
111
     * @param MessageInterface[] $messages
112
     *
113
     * @throws FailedAcknowledgementException|void
114
     */
115 View Code Duplication
    public function reject(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
116 8
    {
117
        $url = $this->getQueueUrl();
118 8
        $failed = [];
119
        $batches = array_chunk($this->createRejectEntries($messages), self::BATCHSIZE_DELETE);
120 8
121
        foreach ($batches as $batch) {
122
            $results = $this->client->changeMessageVisibilityBatch([
123
                'QueueUrl' => $url,
124
                'Entries'  => $batch,
125 8
            ]);
126
127 8
            $map = function ($result) use ($messages) {
128
                return $messages[$result['Id']];
129 4
            };
130 8
131
            $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: []));
132 8
        }
133 8
134 8
        if (!empty($failed)) {
135 8
            throw new FailedAcknowledgementException($this, $failed);
136 8
        }
137 8
    }
138 8
139
    /**
140 8
     * @param MessageFactoryInterface $factory
141
     * @param int                     $limit
142 8
     *
143 1
     * @return \Generator
144
     */
145
    public function dequeue(MessageFactoryInterface $factory, $limit)
146 7
    {
147 7
        $remaining = $limit ?: 0;
148 7
149 7
        while (null === $limit || $remaining > 0) {
150 7
            /**
151 5
             * If a limit has been specified, set {@see $size} so that we don't return more
152
             * than the requested number of messages if it's less than the batch size.
153
             */
154 5
            $size = ($limit !== null) ? min($remaining, self::BATCHSIZE_RECEIVE) : self::BATCHSIZE_RECEIVE;
155 5
156 6
            $timestamp = time() + $this->getQueueVisibilityTimeout();
157
            $validator = function () use ($timestamp) {
158
                return time() < $timestamp;
159
            };
160
161 4
            $results = $this->client->receiveMessage(array_filter([
162
                'QueueUrl'            => $this->getQueueUrl(),
163 4
                'AttributeNames'      => ['All'],
164 4
                'MaxNumberOfMessages' => $size,
165 4
                'VisibilityTimeout'   => $this->getOption('VisibilityTimeout'),
166
                'WaitTimeSeconds'     => $this->getOption('ReceiveMessageWaitTimeSeconds'),
167 4
            ]));
168 4
169 4
            $messages = $results->get('Messages') ?: [];
170 4
171 4
            if (count($messages) === 0) {
172
                break;
173
            }
174
175 4
            foreach ($messages as $result) {
176
                yield $factory->createMessage(
177 4
                    $result['Body'],
178 4
                    [
179
                        'metadata'  => $this->createMessageMetadata($result),
180 4
                        'validator' => $validator,
181
                    ]
182
                );
183 4
            }
184
185
            // Decrement the number of messages remaining.
186
            $remaining -= count($messages);
187
        }
188 2
    }
189
190 2
    /**
191 2
     * @param MessageInterface[] $messages
192
     */
193 View Code Duplication
    public function enqueue(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
194
    {
195
        $url = $this->getQueueUrl();
196 2
        $failed = [];
197
        $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND);
198 2
199 2
        foreach ($batches as $batch) {
200
            $results = $this->client->sendMessageBatch([
201
                'QueueUrl' => $url,
202
                'Entries'  => $batch,
203
            ]);
204
205
            $map = function ($result) use ($messages) {
206 5
                return $messages[$result['Id']];
207
            };
208
209 5
            $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: []));
210
        }
211 5
212 5
        if (!empty($failed)) {
213 5
            throw new FailedEnqueueException($this, $failed);
214 5
        }
215
    }
216 5
217
    /**
218
     * {@inheritdoc}
219
     */
220
    public function purge()
221
    {
222
        $this->client->purgeQueue(['QueueUrl' => $this->getQueueUrl()]);
223
    }
224
225
    /**
226 4
     * {@inheritdoc}
227 4
     */
228
    public function delete()
229 4
    {
230 4
        $this->client->deleteQueue(['QueueUrl' => $this->getQueueUrl()]);
231 4
    }
232 4
233 4
    /**
234 2
     * @param MessageInterface[] $messages
235 2
     *
236 4
     * @return array
237
     */
238 4 View Code Duplication
    protected function createDeleteEntries(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
239
    {
240
        array_walk(
241
            $messages,
242
            function (MessageInterface &$message, $id) {
243
                $metadata = $message->getMetadata();
244
                $message = [
245
                    'Id'            => $id,
246 7
                    'ReceiptHandle' => $metadata->get('ReceiptHandle'),
247
                ];
248 7
            }
249 7
        );
250 7
251 7
        return $messages;
252 7
    }
253 7
254
    /**
255
     * @param MessageInterface[] $messages
256
     *
257
     * @return array
258
     */
259 View Code Duplication
    protected function createRejectEntries(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
260
    {
261
        array_walk(
262 8
            $messages,
263
            function (MessageInterface &$message, $id) {
264 8
                $metadata = $message->getMetadata();
265
                $message = [
266
                    'Id'                => $id,
267
                    'ReceiptHandle'     => $metadata->get('ReceiptHandle'),
268
                    'VisibilityTimeout' => 0,
269
                ];
270 16
            }
271
        );
272 16
273 16
        return $messages;
274 16
    }
275 16
276 16
    /**
277
     * @param MessageInterface[] $messages
278 16
     *
279 16
     * @return array
280
     */
281 16
    protected function createEnqueueEntries(array $messages)
282
    {
283
        array_walk(
284
            $messages,
285
            function (MessageInterface &$message, $id) {
286
                $metadata = $message->getMetadata();
287 8
                $message = [
288
                    'Id'                => $id,
289 8
                    'MessageBody'       => $message->getBody(),
290 8
                    'MessageAttributes' => $metadata->get('MessageAttributes') ?: [],
291 8
                ];
292 8
                if (!is_null($metadata->get('DelaySeconds'))) {
293 8
                    $message['DelaySeconds'] = $metadata->get('DelaySeconds');
294
                }
295 8
            }
296 8
        );
297 8
298
        return $messages;
299 8
    }
300
301
    /**
302
     * @param array $result
303
     *
304
     * @return array
305
     */
306
    protected function createMessageMetadata(array $result)
307
    {
308
        return array_intersect_key(
309
            $result,
310
            [
311
                'Attributes'        => [],
312
                'MessageAttributes' => [],
313
                'MessageId'         => null,
314
                'ReceiptHandle'     => null,
315
            ]
316
        );
317
    }
318
319
    /**
320
     * @param string $name
321
     * @param mixed  $default
322
     *
323
     * @return mixed
324
     */
325
    protected function getOption($name, $default = null)
326
    {
327
        return isset($this->options[$name]) ? $this->options[$name] : $default;
328
    }
329
330
    /**
331
     * @return string
332
     */
333
    protected function getQueueUrl()
334
    {
335
        if (!$this->url) {
336
            $result = $this->client->createQueue([
337
                'QueueName'  => $this->name,
338
                'Attributes' => $this->options,
339
            ]);
340
341
            $this->url = $result->get('QueueUrl');
342
        }
343
344
        return $this->url;
345
    }
346
347
    /**
348
     * @return int
349
     */
350
    protected function getQueueVisibilityTimeout()
351
    {
352
        if (!isset($this->options['VisibilityTimeout'])) {
353
            $result = $this->client->getQueueAttributes([
354
                'QueueUrl'       => $this->getQueueUrl(),
355
                'AttributeNames' => ['VisibilityTimeout'],
356
            ]);
357
358
            $attributes = $result->get('Attributes');
359
            $this->options['VisibilityTimeout'] = $attributes['VisibilityTimeout'];
360
        }
361
362
        return $this->options['VisibilityTimeout'];
363
    }
364
365
    /**
366
     * @return string
367
     */
368
    public function getQueueName()
369
    {
370
        return $this->name;
371
    }
372
}
373