Completed
Pull Request — master (#43)
by Harry
11:28 queued 08:52
created

SqsAdapter::reject()   B

Complexity

Conditions 4
Paths 4

Size

Total Lines 23
Code Lines 13

Duplication

Lines 23
Ratio 100 %

Code Coverage

Tests 14
CRAP Score 4.0312

Importance

Changes 0
Metric Value
dl 23
loc 23
c 0
b 0
f 0
ccs 14
cts 16
cp 0.875
rs 8.7972
cc 4
eloc 13
nc 4
nop 1
crap 4.0312
1
<?php
2
3
/**
4
 * This file is part of graze/queue.
5
 *
6
 * Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://github.com/graze/queue/blob/master/LICENSE MIT
12
 *
13
 * @link    https://github.com/graze/queue
14
 */
15
16
namespace Graze\Queue\Adapter;
17
18
use Aws\Sqs\SqsClient;
19
use Graze\Queue\Adapter\Exception\FailedAcknowledgementException;
20
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
21
use Graze\Queue\Message\MessageFactoryInterface;
22
use Graze\Queue\Message\MessageInterface;
23
24
/**
25
 * Amazon AWS SQS Adapter.
26
 *
27
 * By default this adapter uses standard polling, which may return an empty response
28
 * even if messages exist on the queue.
29
 *
30
 * > This happens when Amazon SQS uses short (standard) polling, the default behavior,
31
 * > where only a subset of the servers (based on a weighted random distribution) are
32
 * > queried to see if any messages are available to include in the response.
33
 *
34
 * You may want to consider setting the `ReceiveMessageWaitTimeSeconds`
35
 * option to enable long polling the queue, which queries all of the servers.
36
 *
37
 * @link https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html
38
 * @link http://docs.aws.amazon.com/aws-sdk-php/guide/latest/service-sqs.html
39
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_createQueue
40
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_deleteMessageBatch
41
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_receiveMessage
42
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_sendMessageBatch
43
 */
44
final class SqsAdapter implements AdapterInterface, NamedInterface
45
{
46
    const BATCHSIZE_DELETE  = 10;
47
    const BATCHSIZE_RECEIVE = 10;
48
    const BATCHSIZE_SEND    = 10;
49
50
    /** @var SqsClient */
51
    protected $client;
52
53
    /** @var array */
54
    protected $options;
55
56
    /** @var string */
57
    protected $name;
58
59
    /** @var string */
60
    protected $url;
61
62
    /**
63
     * @param SqsClient $client
64
     * @param string    $name
65
     * @param array     $options - DelaySeconds <integer> The time in seconds that the delivery of all
66
     *                           messages in the queue will be delayed.
67
     *                           - MaximumMessageSize <integer> The limit of how many bytes a message
68
     *                           can contain before Amazon SQS rejects it.
69
     *                           - MessageRetentionPeriod <integer> The number of seconds Amazon SQS
70
     *                           retains a message.
71
     *                           - Policy <string> The queue's policy. A valid form-url-encoded policy.
72
     *                           - ReceiveMessageWaitTimeSeconds <integer> The time for which a
73
     *                           ReceiveMessage call will wait for a message to arrive.
74
     *                           - VisibilityTimeout <integer> The visibility timeout for the queue.
75
     */
76 18
    public function __construct(SqsClient $client, $name, array $options = [])
77
    {
78 18
        $this->client = $client;
79 18
        $this->name = $name;
80 18
        $this->options = $options;
81 18
    }
82
83
    /**
84
     * @param MessageInterface[] $messages
85
     */
86 5 View Code Duplication
    public function acknowledge(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
87
    {
88 5
        $url = $this->getQueueUrl();
89 5
        $failed = [];
90 5
        $batches = array_chunk($this->createDeleteEntries($messages), self::BATCHSIZE_DELETE);
91
92 5
        foreach ($batches as $batch) {
93 5
            $results = $this->client->deleteMessageBatch([
94 5
                'QueueUrl' => $url,
95 5
                'Entries'  => $batch,
96 5
            ]);
97
98
            $map = function ($result) use ($messages) {
99
                return $messages[$result['Id']];
100 5
            };
101
102 5
            $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: []));
103 5
        }
104
105 5
        if (!empty($failed)) {
106
            throw new FailedAcknowledgementException($this, $failed);
107
        }
108 5
    }
109
110
    /**
111
     * @param MessageInterface[] $messages
112
     *
113
     * @throws FailedAcknowledgementException|void
114
     */
115 1 View Code Duplication
    public function reject(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
116
    {
117 1
        $url = $this->getQueueUrl();
118 1
        $failed = [];
119 1
        $batches = array_chunk($this->createRejectEntries($messages), self::BATCHSIZE_DELETE);
120
121 1
        foreach ($batches as $batch) {
122 1
            $results = $this->client->changeMessageVisibilityBatch([
123 1
                'QueueUrl' => $url,
124 1
                'Entries'  => $batch,
125 1
            ]);
126
127
            $map = function ($result) use ($messages) {
128
                return $messages[$result['Id']];
129 1
            };
130
131 1
            $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: []));
132 1
        }
133
134 1
        if (!empty($failed)) {
135
            throw new FailedAcknowledgementException($this, $failed);
136
        }
137 1
    }
138
139
    /**
140
     * @param MessageFactoryInterface $factory
141
     * @param int                     $limit
142
     *
143
     * @return \Generator
144
     */
145 8
    public function dequeue(MessageFactoryInterface $factory, $limit)
146
    {
147 8
        $remaining = $limit ?: 0;
148
149 8
        while (null === $limit || $remaining > 0) {
150
            /**
151
             * If a limit has been specified, set {@see $size} so that we don't return more
152
             * than the requested number of messages if it's less than the batch size.
153
             */
154 8
            $size = ($limit !== null) ? min($remaining, self::BATCHSIZE_RECEIVE) : self::BATCHSIZE_RECEIVE;
155
156 8
            $timestamp = time() + $this->getQueueVisibilityTimeout();
157
            $validator = function () use ($timestamp) {
158 4
                return time() < $timestamp;
159 8
            };
160
161 8
            $results = $this->client->receiveMessage(array_filter([
162 8
                'QueueUrl'            => $this->getQueueUrl(),
163 8
                'AttributeNames'      => ['All'],
164 8
                'MaxNumberOfMessages' => $size,
165 8
                'VisibilityTimeout'   => $this->getOption('VisibilityTimeout'),
166 8
                'WaitTimeSeconds'     => $this->getOption('ReceiveMessageWaitTimeSeconds'),
167 8
            ]));
168
169 8
            $messages = $results->get('Messages') ?: [];
170
171 8
            if (count($messages) === 0) {
172 1
                break;
173
            }
174
175 7
            foreach ($messages as $result) {
176 7
                yield $factory->createMessage(
177 7
                    $result['Body'],
178
                    [
179 7
                        'metadata'  => $this->createMessageMetadata($result),
180 7
                        'validator' => $validator,
181
                    ]
182 7
                );
183 5
            }
184
185
            // Decrement the number of messages remaining.
186 5
            $remaining -= count($messages);
187 5
        }
188 6
    }
189
190
    /**
191
     * @param MessageInterface[] $messages
192
     */
193 4 View Code Duplication
    public function enqueue(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
194
    {
195 4
        $url = $this->getQueueUrl();
196 4
        $failed = [];
197 4
        $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND);
198
199 4
        foreach ($batches as $batch) {
200 4
            $results = $this->client->sendMessageBatch([
201 4
                'QueueUrl' => $url,
202 4
                'Entries'  => $batch,
203 4
            ]);
204
205
            $map = function ($result) use ($messages) {
206
                return $messages[$result['Id']];
207 4
            };
208
209 4
            $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: []));
210 4
        }
211
212 4
        if (!empty($failed)) {
213
            throw new FailedEnqueueException($this, $failed);
214
        }
215 4
    }
216
217
    /**
218
     * {@inheritdoc}
219
     */
220 2
    public function purge()
221
    {
222 2
        $this->client->purgeQueue(['QueueUrl' => $this->getQueueUrl()]);
223 2
    }
224
225
    /**
226
     * {@inheritdoc}
227
     */
228 2
    public function delete()
229
    {
230 2
        $this->client->deleteQueue(['QueueUrl' => $this->getQueueUrl()]);
231 2
    }
232
233
    /**
234
     * @param MessageInterface[] $messages
235
     *
236
     * @return array
237
     */
238 5 View Code Duplication
    protected function createDeleteEntries(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
239
    {
240 5
        array_walk(
241 5
            $messages,
242
            function (MessageInterface &$message, $id) {
243 5
                $metadata = $message->getMetadata();
244
                $message = [
245 5
                    'Id'            => $id,
246 5
                    'ReceiptHandle' => $metadata->get('ReceiptHandle'),
247 5
                ];
248 5
            }
249 5
        );
250
251 5
        return $messages;
252
    }
253
254
    /**
255
     * @param MessageInterface[] $messages
256
     *
257
     * @return array
258
     */
259 1 View Code Duplication
    protected function createRejectEntries(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
260
    {
261 1
        array_walk(
262 1
            $messages,
263
            function (MessageInterface &$message, $id) {
264 1
                $metadata = $message->getMetadata();
265
                $message = [
266 1
                    'Id'                => $id,
267 1
                    'ReceiptHandle'     => $metadata->get('ReceiptHandle'),
268 1
                    'VisibilityTimeout' => 0,
269 1
                ];
270 1
            }
271 1
        );
272
273 1
        return $messages;
274
    }
275
276
    /**
277
     * @param MessageInterface[] $messages
278
     *
279
     * @return array
280
     */
281 4
    protected function createEnqueueEntries(array $messages)
282
    {
283 4
        array_walk(
284 4
            $messages,
285 4
            function (MessageInterface &$message, $id) {
286 4
                $metadata = $message->getMetadata();
287
                $message = [
288 4
                    'Id'                => $id,
289 4
                    'MessageBody'       => $message->getBody(),
290 4
                    'MessageAttributes' => $metadata->get('MessageAttributes') ?: [],
291 4
                ];
292 4
                if (!is_null($metadata->get('DelaySeconds'))) {
293 2
                    $message['DelaySeconds'] = $metadata->get('DelaySeconds');
294 2
                }
295 4
            }
296 4
        );
297
298 4
        return $messages;
299
    }
300
301
    /**
302
     * @param array $result
303
     *
304
     * @return array
305
     */
306 7
    protected function createMessageMetadata(array $result)
307
    {
308 7
        return array_intersect_key(
309 7
            $result,
310
            [
311 7
                'Attributes'        => [],
312 7
                'MessageAttributes' => [],
313 7
                'MessageId'         => null,
314 7
                'ReceiptHandle'     => null,
315
            ]
316 7
        );
317
    }
318
319
    /**
320
     * @param string $name
321
     * @param mixed  $default
322
     *
323
     * @return mixed
324
     */
325 8
    protected function getOption($name, $default = null)
326
    {
327 8
        return isset($this->options[$name]) ? $this->options[$name] : $default;
328
    }
329
330
    /**
331
     * @return string
332
     */
333 17
    protected function getQueueUrl()
334
    {
335 17
        if (!$this->url) {
336 17
            $result = $this->client->createQueue([
337 17
                'QueueName'  => $this->name,
338 17
                'Attributes' => $this->options,
339 17
            ]);
340
341 17
            $this->url = $result->get('QueueUrl');
342 17
        }
343
344 17
        return $this->url;
345
    }
346
347
    /**
348
     * @return int
349
     */
350 8
    protected function getQueueVisibilityTimeout()
351
    {
352 8
        if (!isset($this->options['VisibilityTimeout'])) {
353 8
            $result = $this->client->getQueueAttributes([
354 8
                'QueueUrl'       => $this->getQueueUrl(),
355 8
                'AttributeNames' => ['VisibilityTimeout'],
356 8
            ]);
357
358 8
            $attributes = $result->get('Attributes');
359 8
            $this->options['VisibilityTimeout'] = $attributes['VisibilityTimeout'];
360 8
        }
361
362 8
        return $this->options['VisibilityTimeout'];
363
    }
364
365
    /**
366
     * @return string
367
     */
368
    public function getQueueName()
369
    {
370
        return $this->name;
371
    }
372
}
373