Completed
Pull Request — master (#47)
by Harry
03:26
created

SqsAdapter::acknowledge()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 21
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 13
nc 4
nop 1
dl 0
loc 21
ccs 16
cts 16
cp 1
crap 4
rs 9.0534
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * This file is part of graze/queue.
5
 *
6
 * Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://github.com/graze/queue/blob/master/LICENSE MIT
12
 *
13
 * @link    https://github.com/graze/queue
14
 */
15
16
namespace Graze\Queue\Adapter;
17
18
use Aws\Sqs\SqsClient;
19
use Graze\Queue\Adapter\Exception\FailedAcknowledgementException;
20
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
21
use Graze\Queue\Message\MessageFactoryInterface;
22
use Graze\Queue\Message\MessageInterface;
23
24
/**
25
 * Amazon AWS SQS Adapter.
26
 *
27
 * By default this adapter uses standard polling, which may return an empty response
28
 * even if messages exist on the queue.
29
 *
30
 * > This happens when Amazon SQS uses short (standard) polling, the default behavior,
31
 * > where only a subset of the servers (based on a weighted random distribution) are
32
 * > queried to see if any messages are available to include in the response.
33
 *
34
 * You may want to consider setting the `ReceiveMessageWaitTimeSeconds`
35
 * option to enable long polling the queue, which queries all of the servers.
36
 *
37
 * @link https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html
38
 * @link http://docs.aws.amazon.com/aws-sdk-php/guide/latest/service-sqs.html
39
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_createQueue
40
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_deleteMessageBatch
41
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_receiveMessage
42
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_sendMessageBatch
43
 */
44
final class SqsAdapter implements AdapterInterface, NamedInterface
45
{
46
    const BATCHSIZE_DELETE  = 10;
47
    const BATCHSIZE_RECEIVE = 10;
48
    const BATCHSIZE_SEND    = 10;
49
50
    /** @var SqsClient */
51
    protected $client;
52
53
    /** @var array */
54
    protected $options;
55
56
    /** @var string */
57
    protected $name;
58
59
    /** @var string */
60
    protected $url;
61
62
    /**
63
     * @param SqsClient $client
64
     * @param string    $name
65
     * @param array     $options - DelaySeconds <integer> The time in seconds that the delivery of all
66
     *                           messages in the queue will be delayed.
67
     *                           - MaximumMessageSize <integer> The limit of how many bytes a message
68
     *                           can contain before Amazon SQS rejects it.
69
     *                           - MessageRetentionPeriod <integer> The number of seconds Amazon SQS
70
     *                           retains a message.
71
     *                           - Policy <string> The queue's policy. A valid form-url-encoded policy.
72
     *                           - ReceiveMessageWaitTimeSeconds <integer> The time for which a
73
     *                           ReceiveMessage call will wait for a message to arrive.
74
     *                           - VisibilityTimeout <integer> The visibility timeout for the queue.
75
     */
76 19
    public function __construct(SqsClient $client, $name, array $options = [])
77
    {
78 19
        $this->client = $client;
79 19
        $this->name = $name;
80 19
        $this->options = $options;
81 19
    }
82
83
    /**
84
     * @param MessageInterface[] $messages
85
     */
86 6
    public function acknowledge(array $messages)
87
    {
88 6
        $url = $this->getQueueUrl();
89 6
        $errors = [];
90 6
        $batches = array_chunk($this->createDeleteEntries($messages), self::BATCHSIZE_DELETE);
91
92 6
        foreach ($batches as $batch) {
93 6
            $results = $this->client->deleteMessageBatch([
94 6
                'QueueUrl' => $url,
95 6
                'Entries'  => $batch,
96 6
            ]);
97
98 6
            $errors = array_merge($errors, $results->get('Failed') ?: []);
99 6
        }
100
        $map = function ($result) use ($messages) {
101 1
            return $messages[$result['Id']];
102 6
        };
103 6
        $failed = array_map($map, $errors);
104
105 6
        if (!empty($failed)) {
106 1
            throw new FailedAcknowledgementException($this, $failed, $errors);
107
        }
108 5
    }
109
110
    /**
111
     * @param MessageInterface[] $messages
112
     *
113
     * @throws FailedAcknowledgementException|void
114
     */
115 1
    public function reject(array $messages)
116
    {
117 1
        $url = $this->getQueueUrl();
118 1
        $errors = [];
119 1
        $batches = array_chunk($this->createRejectEntries($messages), self::BATCHSIZE_DELETE);
120
121 1
        foreach ($batches as $batch) {
122 1
            $results = $this->client->changeMessageVisibilityBatch([
123 1
                'QueueUrl' => $url,
124 1
                'Entries'  => $batch,
125 1
            ]);
126
127 1
            $errors = array_merge($errors, $results->get('Failed') ?: []);
128 1
        }
129
        $map = function ($result) use ($messages) {
130
            return $messages[$result['Id']];
131 1
        };
132 1
        $failed = array_map($map, $errors);
133
134 1
        if (!empty($failed)) {
135
            throw new FailedAcknowledgementException($this, $failed, $errors);
136
        }
137 1
    }
138
139
    /**
140
     * @param MessageFactoryInterface $factory
141
     * @param int                     $limit
142
     *
143
     * @return \Generator
144
     */
145 8
    public function dequeue(MessageFactoryInterface $factory, $limit)
146
    {
147 8
        $remaining = $limit ?: 0;
148
149 8
        while (null === $limit || $remaining > 0) {
150
            /**
151
             * If a limit has been specified, set {@see $size} so that we don't return more
152
             * than the requested number of messages if it's less than the batch size.
153
             */
154 8
            $size = ($limit !== null) ? min($remaining, self::BATCHSIZE_RECEIVE) : self::BATCHSIZE_RECEIVE;
155
156 8
            $timestamp = time() + $this->getQueueVisibilityTimeout();
157
            $validator = function () use ($timestamp) {
158 4
                return time() < $timestamp;
159 8
            };
160
161 8
            $results = $this->client->receiveMessage(array_filter([
162 8
                'QueueUrl'            => $this->getQueueUrl(),
163 8
                'AttributeNames'      => ['All'],
164 8
                'MaxNumberOfMessages' => $size,
165 8
                'VisibilityTimeout'   => $this->getOption('VisibilityTimeout'),
166 8
                'WaitTimeSeconds'     => $this->getOption('ReceiveMessageWaitTimeSeconds'),
167 8
            ]));
168
169 8
            $messages = $results->get('Messages') ?: [];
170
171 8
            if (count($messages) === 0) {
172 1
                break;
173
            }
174
175 7
            foreach ($messages as $result) {
176 7
                yield $factory->createMessage(
177 7
                    $result['Body'],
178
                    [
179 7
                        'metadata'  => $this->createMessageMetadata($result),
180 7
                        'validator' => $validator,
181
                    ]
182 7
                );
183 5
            }
184
185
            // Decrement the number of messages remaining.
186 5
            $remaining -= count($messages);
187 5
        }
188 6
    }
189
190
    /**
191
     * @param MessageInterface[] $messages
192
     */
193 4
    public function enqueue(array $messages)
194
    {
195 4
        $url = $this->getQueueUrl();
196 4
        $errors = [];
197 4
        $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND);
198
199 4
        foreach ($batches as $batch) {
200 4
            $results = $this->client->sendMessageBatch([
201 4
                'QueueUrl' => $url,
202 4
                'Entries'  => $batch,
203 4
            ]);
204
205 4
            $errors = array_merge($errors, $results->get('Failed') ?: []);
206 4
        }
207
        $map = function ($result) use ($messages) {
208
            return $messages[$result['Id']];
209 4
        };
210 4
        $failed = array_map($map, $errors);
211
212 4
        if (!empty($failed)) {
213
            throw new FailedEnqueueException($this, $failed, $errors);
214
        }
215 4
    }
216
217
    /**
218
     * {@inheritdoc}
219
     */
220 2
    public function purge()
221
    {
222 2
        $this->client->purgeQueue(['QueueUrl' => $this->getQueueUrl()]);
223 2
    }
224
225
    /**
226
     * {@inheritdoc}
227
     */
228 2
    public function delete()
229
    {
230 2
        $this->client->deleteQueue(['QueueUrl' => $this->getQueueUrl()]);
231 2
    }
232
233
    /**
234
     * @param MessageInterface[] $messages
235
     *
236
     * @return array
237
     */
238 6
    protected function createDeleteEntries(array $messages)
239
    {
240 6
        array_walk(
241 6
            $messages,
242
            function (MessageInterface &$message, $id) {
243 6
                $metadata = $message->getMetadata();
244
                $message = [
245 6
                    'Id'            => $id,
246 6
                    'ReceiptHandle' => $metadata->get('ReceiptHandle'),
247 6
                ];
248 6
            }
249 6
        );
250
251 6
        return $messages;
252
    }
253
254
    /**
255
     * @param MessageInterface[] $messages
256
     *
257
     * @return array
258
     */
259 1
    protected function createRejectEntries(array $messages)
260
    {
261 1
        array_walk(
262 1
            $messages,
263
            function (MessageInterface &$message, $id) {
264 1
                $metadata = $message->getMetadata();
265
                $message = [
266 1
                    'Id'                => $id,
267 1
                    'ReceiptHandle'     => $metadata->get('ReceiptHandle'),
268 1
                    'VisibilityTimeout' => 0,
269 1
                ];
270 1
            }
271 1
        );
272
273 1
        return $messages;
274
    }
275
276
    /**
277
     * @param MessageInterface[] $messages
278
     *
279
     * @return array
280
     */
281 4
    protected function createEnqueueEntries(array $messages)
282
    {
283 4
        array_walk(
284 4
            $messages,
285 4
            function (MessageInterface &$message, $id) {
286 4
                $metadata = $message->getMetadata();
287
                $message = [
288 4
                    'Id'                => $id,
289 4
                    'MessageBody'       => $message->getBody(),
290 4
                    'MessageAttributes' => $metadata->get('MessageAttributes') ?: [],
291 4
                ];
292 4
                if (!is_null($metadata->get('DelaySeconds'))) {
293 2
                    $message['DelaySeconds'] = $metadata->get('DelaySeconds');
294 2
                }
295 4
            }
296 4
        );
297
298 4
        return $messages;
299
    }
300
301
    /**
302
     * @param array $result
303
     *
304
     * @return array
305
     */
306 7
    protected function createMessageMetadata(array $result)
307
    {
308 7
        return array_intersect_key(
309 7
            $result,
310
            [
311 7
                'Attributes'        => [],
312 7
                'MessageAttributes' => [],
313 7
                'MessageId'         => null,
314 7
                'ReceiptHandle'     => null,
315
            ]
316 7
        );
317
    }
318
319
    /**
320
     * @param string $name
321
     * @param mixed  $default
322
     *
323
     * @return mixed
324
     */
325 8
    protected function getOption($name, $default = null)
326
    {
327 8
        return isset($this->options[$name]) ? $this->options[$name] : $default;
328
    }
329
330
    /**
331
     * @return string
332
     */
333 18
    protected function getQueueUrl()
334
    {
335 18
        if (!$this->url) {
336 18
            $result = $this->client->createQueue([
337 18
                'QueueName'  => $this->name,
338 18
                'Attributes' => $this->options,
339 18
            ]);
340
341 18
            $this->url = $result->get('QueueUrl');
342 18
        }
343
344 18
        return $this->url;
345
    }
346
347
    /**
348
     * @return int
349
     */
350 8
    protected function getQueueVisibilityTimeout()
351
    {
352 8
        if (!isset($this->options['VisibilityTimeout'])) {
353 8
            $result = $this->client->getQueueAttributes([
354 8
                'QueueUrl'       => $this->getQueueUrl(),
355 8
                'AttributeNames' => ['VisibilityTimeout'],
356 8
            ]);
357
358 8
            $attributes = $result->get('Attributes');
359 8
            $this->options['VisibilityTimeout'] = $attributes['VisibilityTimeout'];
360 8
        }
361
362 8
        return $this->options['VisibilityTimeout'];
363
    }
364
365
    /**
366
     * @return string
367
     */
368 1
    public function getQueueName()
369
    {
370 1
        return $this->name;
371
    }
372
}
373