Completed
Pull Request — master (#42)
by
unknown
10:19
created

SqsAdapter::getQueueUrl()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 13
ccs 9
cts 9
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 7
nc 2
nop 0
crap 2
1
<?php
2
3
/**
4
 * This file is part of graze/queue.
5
 *
6
 * Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 *
11
 * @license https://github.com/graze/queue/blob/master/LICENSE MIT
12
 *
13
 * @link    https://github.com/graze/queue
14
 */
15
16
namespace Graze\Queue\Adapter;
17
18
use Aws\Sqs\SqsClient;
19
use Graze\Queue\Adapter\Exception\FailedAcknowledgementException;
20
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
21
use Graze\Queue\Message\MessageFactoryInterface;
22
use Graze\Queue\Message\MessageInterface;
23
24
/**
25
 * Amazon AWS SQS Adapter.
26
 *
27
 * By default this adapter uses standard polling, which may return an empty response
28
 * even if messages exist on the queue.
29
 *
30
 * > This happens when Amazon SQS uses short (standard) polling, the default behavior,
31
 * > where only a subset of the servers (based on a weighted random distribution) are
32
 * > queried to see if any messages are available to include in the response.
33
 *
34
 * You may want to consider setting the `ReceiveMessageWaitTimeSeconds`
35
 * option to enable long polling the queue, which queries all of the servers.
36
 *
37
 * @link https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html
38
 * @link http://docs.aws.amazon.com/aws-sdk-php/guide/latest/service-sqs.html
39
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_createQueue
40
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_deleteMessageBatch
41
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_receiveMessage
42
 * @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_sendMessageBatch
43
 */
44
final class SqsAdapter implements AdapterInterface, NamedInterface
45
{
46
    const BATCHSIZE_DELETE  = 10;
47
    const BATCHSIZE_RECEIVE = 10;
48
    const BATCHSIZE_SEND    = 10;
49
50
    /** @var SqsClient */
51
    protected $client;
52
53
    /** @var array */
54
    protected $options;
55
56
    /** @var string */
57
    protected $name;
58
59
    /** @var string */
60
    protected $url;
61
62
    /**
63
     * @param SqsClient $client
64
     * @param string    $name
65
     * @param array     $options - DelaySeconds <integer> The time in seconds that the delivery of all
66
     *                           messages in the queue will be delayed.
67
     *                           - MaximumMessageSize <integer> The limit of how many bytes a message
68
     *                           can contain before Amazon SQS rejects it.
69
     *                           - MessageRetentionPeriod <integer> The number of seconds Amazon SQS
70
     *                           retains a message.
71
     *                           - Policy <string> The queue's policy. A valid form-url-encoded policy.
72
     *                           - ReceiveMessageWaitTimeSeconds <integer> The time for which a
73
     *                           ReceiveMessage call will wait for a message to arrive.
74
     *                           - VisibilityTimeout <integer> The visibility timeout for the queue.
75
     */
76 17
    public function __construct(SqsClient $client, $name, array $options = [])
77
    {
78 17
        $this->client = $client;
79 17
        $this->name = $name;
80 17
        $this->options = $options;
81 17
    }
82
83
    /**
84
     * @param MessageInterface[] $messages
85
     */
86 5 View Code Duplication
    public function acknowledge(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
87
    {
88 5
        $url = $this->getQueueUrl();
89 5
        $failed = [];
90 5
        $batches = array_chunk($this->createDeleteEntries($messages), self::BATCHSIZE_DELETE);
91
92 5
        foreach ($batches as $batch) {
93 5
            $results = $this->client->deleteMessageBatch([
94 5
                'QueueUrl' => $url,
95 5
                'Entries'  => $batch,
96 5
            ]);
97
98
            $map = function ($result) use ($messages) {
99
                return $messages[$result['Id']];
100 5
            };
101
102 5
            $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: []));
103 5
        }
104
105 5
        if (!empty($failed)) {
106
            throw new FailedAcknowledgementException($this, $failed);
107
        }
108 5
    }
109
110
    /**
111
     * @param MessageFactoryInterface $factory
112
     * @param int                     $limit
113
     *
114
     * @return \Generator
115
     */
116 8
    public function dequeue(MessageFactoryInterface $factory, $limit)
117
    {
118 8
        $remaining = $limit ?: 0;
119
120 8
        while (null === $limit || $remaining > 0) {
121
            /**
122
             * If a limit has been specified, set {@see $size} so that we don't return more
123
             * than the requested number of messages if it's less than the batch size.
124
             */
125 8
            $size = ($limit !== null) ? min($remaining, self::BATCHSIZE_RECEIVE) : self::BATCHSIZE_RECEIVE;
126
127 8
            $timestamp = time() + $this->getQueueVisibilityTimeout();
128
            $validator = function () use ($timestamp) {
129 4
                return time() < $timestamp;
130 8
            };
131
132 8
            $results = $this->client->receiveMessage(array_filter([
133 8
                'QueueUrl'            => $this->getQueueUrl(),
134 8
                'AttributeNames'      => ['All'],
135 8
                'MaxNumberOfMessages' => $size,
136 8
                'VisibilityTimeout'   => $this->getOption('VisibilityTimeout'),
137 8
                'WaitTimeSeconds'     => $this->getOption('ReceiveMessageWaitTimeSeconds'),
138 8
            ]));
139
140 8
            $messages = $results->get('Messages') ?: [];
141
142 8
            if (count($messages) === 0) {
143 1
                break;
144
            }
145
146 7
            foreach ($messages as $result) {
147 7
                yield $factory->createMessage($result['Body'], [
148 7
                    'metadata'  => $this->createMessageMetadata($result),
149 7
                    'validator' => $validator,
150 7
                ]);
151 5
            }
152
153
            // Decrement the number of messages remaining.
154 5
            $remaining -= count($messages);
155 5
        }
156 6
    }
157
158
    /**
159
     * @param MessageInterface[] $messages
160
     */
161 4 View Code Duplication
    public function enqueue(array $messages)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
162
    {
163 4
        $url = $this->getQueueUrl();
164 4
        $failed = [];
165 4
        $batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND);
166
167 4
        foreach ($batches as $batch) {
168 4
            $results = $this->client->sendMessageBatch([
169 4
                'QueueUrl' => $url,
170 4
                'Entries'  => $batch,
171 4
            ]);
172
173
            $map = function ($result) use ($messages) {
174
                return $messages[$result['Id']];
175 4
            };
176
177 4
            $failed = array_merge($failed, array_map($map, $results->get('Failed') ?: []));
178 4
        }
179
180 4
        if (!empty($failed)) {
181
            throw new FailedEnqueueException($this, $failed);
182
        }
183 4
    }
184
185
    /**
186
     * {@inheritdoc}
187
     */
188 2
    public function purge()
189
    {
190 2
        $this->client->purgeQueue(['QueueUrl' => $this->getQueueUrl()]);
191 2
    }
192
193
    /**
194
     * {@inheritdoc}
195
     */
196 2
    public function delete()
197
    {
198 2
        $this->client->deleteQueue(['QueueUrl' => $this->getQueueUrl()]);
199 2
    }
200
201
    /**
202
     * @param MessageInterface[] $messages
203
     *
204
     * @return array
205
     */
206 5
    protected function createDeleteEntries(array $messages)
207
    {
208
        array_walk($messages, function (MessageInterface &$message, $id) {
209 5
            $metadata = $message->getMetadata();
210
            $message = [
211 5
                'Id'            => $id,
212 5
                'ReceiptHandle' => $metadata->get('ReceiptHandle'),
213 5
            ];
214 5
        });
215
216 5
        return $messages;
217
    }
218
219
    /**
220
     * @param MessageInterface[] $messages
221
     *
222
     * @return array
223
     */
224
    protected function createEnqueueEntries(array $messages)
225
    {
226 4
        array_walk($messages, function (MessageInterface &$message, $id) {
227 4
            $metadata = $message->getMetadata();
228
            $message = [
229 4
                'Id'                => $id,
230 4
                'MessageBody'       => $message->getBody(),
231 4
                'MessageAttributes' => $metadata->get('MessageAttributes') ?: [],
232 4
            ];
233 4
            if (!is_null($metadata->get('DelaySeconds'))) {
234 2
                $message['DelaySeconds'] = $metadata->get('DelaySeconds');
235 2
            }
236 4
        });
237
238 4
        return $messages;
239
    }
240
241
    /**
242
     * @param array $result
243
     *
244
     * @return array
245
     */
246 7
    protected function createMessageMetadata(array $result)
247
    {
248 7
        return array_intersect_key($result, [
249 7
            'Attributes'        => [],
250 7
            'MessageAttributes' => [],
251 7
            'MessageId'         => null,
252 7
            'ReceiptHandle'     => null,
253 7
        ]);
254
    }
255
256
    /**
257
     * @param string $name
258
     * @param mixed  $default
259
     *
260
     * @return mixed
261
     */
262 8
    protected function getOption($name, $default = null)
263
    {
264 8
        return isset($this->options[$name]) ? $this->options[$name] : $default;
265
    }
266
267
    /**
268
     * @return string
269
     */
270 16
    protected function getQueueUrl()
271
    {
272 16
        if (!$this->url) {
273 16
            $result = $this->client->createQueue([
274 16
                'QueueName'  => $this->name,
275 16
                'Attributes' => $this->options,
276 16
            ]);
277
278 16
            $this->url = $result->get('QueueUrl');
279 16
        }
280
281 16
        return $this->url;
282
    }
283
284
    /**
285
     * @return int
286
     */
287 8
    protected function getQueueVisibilityTimeout()
288
    {
289 8
        if (!isset($this->options['VisibilityTimeout'])) {
290 8
            $result = $this->client->getQueueAttributes([
291 8
                'QueueUrl'       => $this->getQueueUrl(),
292 8
                'AttributeNames' => ['VisibilityTimeout'],
293 8
            ]);
294
295 8
            $attributes = $result->get('Attributes');
296 8
            $this->options['VisibilityTimeout'] = $attributes['VisibilityTimeout'];
297 8
        }
298
299 8
        return $this->options['VisibilityTimeout'];
300
    }
301
302
    /**
303
     * @return string
304
     */
305
    public function getQueueName()
306
    {
307
        return $this->name;
308
    }
309
}
310