1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
/** |
4
|
|
|
* This file is part of graze/queue. |
5
|
|
|
* |
6
|
|
|
* Copyright (c) 2015 Nature Delivered Ltd. <https://www.graze.com> |
7
|
|
|
* |
8
|
|
|
* For the full copyright and license information, please view the LICENSE |
9
|
|
|
* file that was distributed with this source code. |
10
|
|
|
* |
11
|
|
|
* @license https://github.com/graze/queue/blob/master/LICENSE MIT |
12
|
|
|
* |
13
|
|
|
* @link https://github.com/graze/queue |
14
|
|
|
*/ |
15
|
|
|
|
16
|
|
|
namespace Graze\Queue\Adapter; |
17
|
|
|
|
18
|
|
|
use Aws\Sqs\SqsClient; |
19
|
|
|
use Graze\Queue\Adapter\Exception\FailedAcknowledgementException; |
20
|
|
|
use Graze\Queue\Adapter\Exception\FailedEnqueueException; |
21
|
|
|
use Graze\Queue\Message\MessageFactoryInterface; |
22
|
|
|
use Graze\Queue\Message\MessageInterface; |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* Amazon AWS SQS Adapter. |
26
|
|
|
* |
27
|
|
|
* By default this adapter uses standard polling, which may return an empty response |
28
|
|
|
* even if messages exist on the queue. |
29
|
|
|
* |
30
|
|
|
* > This happens when Amazon SQS uses short (standard) polling, the default behavior, |
31
|
|
|
* > where only a subset of the servers (based on a weighted random distribution) are |
32
|
|
|
* > queried to see if any messages are available to include in the response. |
33
|
|
|
* |
34
|
|
|
* You may want to consider setting the `ReceiveMessageWaitTimeSeconds` |
35
|
|
|
* option to enable long polling the queue, which queries all of the servers. |
36
|
|
|
* |
37
|
|
|
* @link https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html |
38
|
|
|
* @link http://docs.aws.amazon.com/aws-sdk-php/guide/latest/service-sqs.html |
39
|
|
|
* @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_createQueue |
40
|
|
|
* @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_deleteMessageBatch |
41
|
|
|
* @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_receiveMessage |
42
|
|
|
* @link http://docs.aws.amazon.com/aws-sdk-php/latest/class-Aws.Sqs.SqsClient.html#_sendMessageBatch |
43
|
|
|
*/ |
44
|
|
|
final class SqsAdapter implements AdapterInterface, NamedInterface |
45
|
|
|
{ |
46
|
|
|
const BATCHSIZE_DELETE = 10; |
47
|
|
|
const BATCHSIZE_RECEIVE = 10; |
48
|
|
|
const BATCHSIZE_SEND = 10; |
49
|
|
|
|
50
|
|
|
/** @var SqsClient */ |
51
|
|
|
protected $client; |
52
|
|
|
|
53
|
|
|
/** @var array */ |
54
|
|
|
protected $options; |
55
|
|
|
|
56
|
|
|
/** @var string */ |
57
|
|
|
protected $name; |
58
|
|
|
|
59
|
|
|
/** @var string */ |
60
|
|
|
protected $url; |
61
|
|
|
|
62
|
|
|
/** |
63
|
|
|
* @param SqsClient $client |
64
|
|
|
* @param string $name |
65
|
|
|
* @param array $options - DelaySeconds <integer> The time in seconds that the delivery of all |
66
|
|
|
* messages in the queue will be delayed. |
67
|
|
|
* - MaximumMessageSize <integer> The limit of how many bytes a message |
68
|
|
|
* can contain before Amazon SQS rejects it. |
69
|
|
|
* - MessageRetentionPeriod <integer> The number of seconds Amazon SQS |
70
|
|
|
* retains a message. |
71
|
|
|
* - Policy <string> The queue's policy. A valid form-url-encoded policy. |
72
|
|
|
* - ReceiveMessageWaitTimeSeconds <integer> The time for which a |
73
|
|
|
* ReceiveMessage call will wait for a message to arrive. |
74
|
|
|
* - VisibilityTimeout <integer> The visibility timeout for the queue. |
75
|
|
|
*/ |
76
|
18 |
|
public function __construct(SqsClient $client, $name, array $options = []) |
77
|
|
|
{ |
78
|
18 |
|
$this->client = $client; |
79
|
18 |
|
$this->name = $name; |
80
|
18 |
|
$this->options = $options; |
81
|
18 |
|
} |
82
|
|
|
|
83
|
|
|
/** |
84
|
|
|
* @param MessageInterface[] $messages |
85
|
|
|
*/ |
86
|
5 |
View Code Duplication |
public function acknowledge(array $messages) |
|
|
|
|
87
|
|
|
{ |
88
|
5 |
|
$url = $this->getQueueUrl(); |
89
|
5 |
|
$failed = []; |
90
|
5 |
|
$batches = array_chunk($this->createDeleteEntries($messages), self::BATCHSIZE_DELETE); |
91
|
|
|
|
92
|
5 |
|
foreach ($batches as $batch) { |
93
|
5 |
|
$results = $this->client->deleteMessageBatch([ |
94
|
5 |
|
'QueueUrl' => $url, |
95
|
5 |
|
'Entries' => $batch, |
96
|
5 |
|
]); |
97
|
|
|
|
98
|
|
|
$map = function ($result) use ($messages) { |
99
|
|
|
return $messages[$result['Id']]; |
100
|
5 |
|
}; |
101
|
|
|
|
102
|
5 |
|
$failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
103
|
5 |
|
} |
104
|
|
|
|
105
|
5 |
|
if (!empty($failed)) { |
106
|
|
|
throw new FailedAcknowledgementException($this, $failed); |
107
|
|
|
} |
108
|
5 |
|
} |
109
|
|
|
|
110
|
|
|
/** |
111
|
|
|
* @param MessageInterface[] $messages |
112
|
|
|
* |
113
|
|
|
* @throws FailedAcknowledgementException|void |
114
|
|
|
*/ |
115
|
1 |
View Code Duplication |
public function reject(array $messages) |
|
|
|
|
116
|
|
|
{ |
117
|
1 |
|
$url = $this->getQueueUrl(); |
118
|
1 |
|
$failed = []; |
119
|
1 |
|
$batches = array_chunk($this->createRejectEntries($messages), self::BATCHSIZE_DELETE); |
120
|
|
|
|
121
|
1 |
|
foreach ($batches as $batch) { |
122
|
1 |
|
$results = $this->client->changeMessageVisibilityBatch([ |
123
|
1 |
|
'QueueUrl' => $url, |
124
|
1 |
|
'Entries' => $batch, |
125
|
1 |
|
]); |
126
|
|
|
|
127
|
|
|
$map = function ($result) use ($messages) { |
128
|
|
|
return $messages[$result['Id']]; |
129
|
1 |
|
}; |
130
|
|
|
|
131
|
1 |
|
$failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
132
|
1 |
|
} |
133
|
|
|
|
134
|
1 |
|
if (!empty($failed)) { |
135
|
|
|
throw new FailedAcknowledgementException($this, $failed); |
136
|
|
|
} |
137
|
1 |
|
} |
138
|
|
|
|
139
|
|
|
/** |
140
|
|
|
* @param MessageFactoryInterface $factory |
141
|
|
|
* @param int $limit |
142
|
|
|
* |
143
|
|
|
* @return \Generator |
144
|
|
|
*/ |
145
|
8 |
|
public function dequeue(MessageFactoryInterface $factory, $limit) |
146
|
|
|
{ |
147
|
8 |
|
$remaining = $limit ?: 0; |
148
|
|
|
|
149
|
8 |
|
while (null === $limit || $remaining > 0) { |
150
|
|
|
/** |
151
|
|
|
* If a limit has been specified, set {@see $size} so that we don't return more |
152
|
|
|
* than the requested number of messages if it's less than the batch size. |
153
|
|
|
*/ |
154
|
8 |
|
$size = ($limit !== null) ? min($remaining, self::BATCHSIZE_RECEIVE) : self::BATCHSIZE_RECEIVE; |
155
|
|
|
|
156
|
8 |
|
$timestamp = time() + $this->getQueueVisibilityTimeout(); |
157
|
|
|
$validator = function () use ($timestamp) { |
158
|
4 |
|
return time() < $timestamp; |
159
|
8 |
|
}; |
160
|
|
|
|
161
|
8 |
|
$results = $this->client->receiveMessage(array_filter([ |
162
|
8 |
|
'QueueUrl' => $this->getQueueUrl(), |
163
|
8 |
|
'AttributeNames' => ['All'], |
164
|
8 |
|
'MaxNumberOfMessages' => $size, |
165
|
8 |
|
'VisibilityTimeout' => $this->getOption('VisibilityTimeout'), |
166
|
8 |
|
'WaitTimeSeconds' => $this->getOption('ReceiveMessageWaitTimeSeconds'), |
167
|
8 |
|
])); |
168
|
|
|
|
169
|
8 |
|
$messages = $results->get('Messages') ?: []; |
170
|
|
|
|
171
|
8 |
|
if (count($messages) === 0) { |
172
|
1 |
|
break; |
173
|
|
|
} |
174
|
|
|
|
175
|
7 |
|
foreach ($messages as $result) { |
176
|
7 |
|
yield $factory->createMessage( |
177
|
7 |
|
$result['Body'], |
178
|
|
|
[ |
179
|
7 |
|
'metadata' => $this->createMessageMetadata($result), |
180
|
7 |
|
'validator' => $validator, |
181
|
|
|
] |
182
|
7 |
|
); |
183
|
5 |
|
} |
184
|
|
|
|
185
|
|
|
// Decrement the number of messages remaining. |
186
|
5 |
|
$remaining -= count($messages); |
187
|
5 |
|
} |
188
|
6 |
|
} |
189
|
|
|
|
190
|
|
|
/** |
191
|
|
|
* @param MessageInterface[] $messages |
192
|
|
|
*/ |
193
|
4 |
View Code Duplication |
public function enqueue(array $messages) |
|
|
|
|
194
|
|
|
{ |
195
|
4 |
|
$url = $this->getQueueUrl(); |
196
|
4 |
|
$failed = []; |
197
|
4 |
|
$batches = array_chunk($this->createEnqueueEntries($messages), self::BATCHSIZE_SEND); |
198
|
|
|
|
199
|
4 |
|
foreach ($batches as $batch) { |
200
|
4 |
|
$results = $this->client->sendMessageBatch([ |
201
|
4 |
|
'QueueUrl' => $url, |
202
|
4 |
|
'Entries' => $batch, |
203
|
4 |
|
]); |
204
|
|
|
|
205
|
|
|
$map = function ($result) use ($messages) { |
206
|
|
|
return $messages[$result['Id']]; |
207
|
4 |
|
}; |
208
|
|
|
|
209
|
4 |
|
$failed = array_merge($failed, array_map($map, $results->get('Failed') ?: [])); |
210
|
4 |
|
} |
211
|
|
|
|
212
|
4 |
|
if (!empty($failed)) { |
213
|
|
|
throw new FailedEnqueueException($this, $failed); |
214
|
|
|
} |
215
|
4 |
|
} |
216
|
|
|
|
217
|
|
|
/** |
218
|
|
|
* {@inheritdoc} |
219
|
|
|
*/ |
220
|
2 |
|
public function purge() |
221
|
|
|
{ |
222
|
2 |
|
$this->client->purgeQueue(['QueueUrl' => $this->getQueueUrl()]); |
223
|
2 |
|
} |
224
|
|
|
|
225
|
|
|
/** |
226
|
|
|
* {@inheritdoc} |
227
|
|
|
*/ |
228
|
2 |
|
public function delete() |
229
|
|
|
{ |
230
|
2 |
|
$this->client->deleteQueue(['QueueUrl' => $this->getQueueUrl()]); |
231
|
2 |
|
} |
232
|
|
|
|
233
|
|
|
/** |
234
|
|
|
* @param MessageInterface[] $messages |
235
|
|
|
* |
236
|
|
|
* @return array |
237
|
|
|
*/ |
238
|
5 |
View Code Duplication |
protected function createDeleteEntries(array $messages) |
|
|
|
|
239
|
|
|
{ |
240
|
5 |
|
array_walk( |
241
|
5 |
|
$messages, |
242
|
|
|
function (MessageInterface &$message, $id) { |
243
|
5 |
|
$metadata = $message->getMetadata(); |
244
|
|
|
$message = [ |
245
|
5 |
|
'Id' => $id, |
246
|
5 |
|
'ReceiptHandle' => $metadata->get('ReceiptHandle'), |
247
|
5 |
|
]; |
248
|
5 |
|
} |
249
|
5 |
|
); |
250
|
|
|
|
251
|
5 |
|
return $messages; |
252
|
|
|
} |
253
|
|
|
|
254
|
|
|
/** |
255
|
|
|
* @param MessageInterface[] $messages |
256
|
|
|
* |
257
|
|
|
* @return array |
258
|
|
|
*/ |
259
|
1 |
View Code Duplication |
protected function createRejectEntries(array $messages) |
|
|
|
|
260
|
|
|
{ |
261
|
1 |
|
array_walk( |
262
|
1 |
|
$messages, |
263
|
|
|
function (MessageInterface &$message, $id) { |
264
|
1 |
|
$metadata = $message->getMetadata(); |
265
|
|
|
$message = [ |
266
|
1 |
|
'Id' => $id, |
267
|
1 |
|
'ReceiptHandle' => $metadata->get('ReceiptHandle'), |
268
|
1 |
|
'VisibilityTimeout' => 0, |
269
|
1 |
|
]; |
270
|
1 |
|
} |
271
|
1 |
|
); |
272
|
|
|
|
273
|
1 |
|
return $messages; |
274
|
|
|
} |
275
|
|
|
|
276
|
|
|
/** |
277
|
|
|
* @param MessageInterface[] $messages |
278
|
|
|
* |
279
|
|
|
* @return array |
280
|
|
|
*/ |
281
|
4 |
|
protected function createEnqueueEntries(array $messages) |
282
|
|
|
{ |
283
|
4 |
|
array_walk( |
284
|
4 |
|
$messages, |
285
|
4 |
|
function (MessageInterface &$message, $id) { |
286
|
4 |
|
$metadata = $message->getMetadata(); |
287
|
|
|
$message = [ |
288
|
4 |
|
'Id' => $id, |
289
|
4 |
|
'MessageBody' => $message->getBody(), |
290
|
4 |
|
'MessageAttributes' => $metadata->get('MessageAttributes') ?: [], |
291
|
4 |
|
]; |
292
|
4 |
|
if (!is_null($metadata->get('DelaySeconds'))) { |
293
|
2 |
|
$message['DelaySeconds'] = $metadata->get('DelaySeconds'); |
294
|
2 |
|
} |
295
|
4 |
|
} |
296
|
4 |
|
); |
297
|
|
|
|
298
|
4 |
|
return $messages; |
299
|
|
|
} |
300
|
|
|
|
301
|
|
|
/** |
302
|
|
|
* @param array $result |
303
|
|
|
* |
304
|
|
|
* @return array |
305
|
|
|
*/ |
306
|
7 |
|
protected function createMessageMetadata(array $result) |
307
|
|
|
{ |
308
|
7 |
|
return array_intersect_key( |
309
|
7 |
|
$result, |
310
|
|
|
[ |
311
|
7 |
|
'Attributes' => [], |
312
|
7 |
|
'MessageAttributes' => [], |
313
|
7 |
|
'MessageId' => null, |
314
|
7 |
|
'ReceiptHandle' => null, |
315
|
|
|
] |
316
|
7 |
|
); |
317
|
|
|
} |
318
|
|
|
|
319
|
|
|
/** |
320
|
|
|
* @param string $name |
321
|
|
|
* @param mixed $default |
322
|
|
|
* |
323
|
|
|
* @return mixed |
324
|
|
|
*/ |
325
|
8 |
|
protected function getOption($name, $default = null) |
326
|
|
|
{ |
327
|
8 |
|
return isset($this->options[$name]) ? $this->options[$name] : $default; |
328
|
|
|
} |
329
|
|
|
|
330
|
|
|
/** |
331
|
|
|
* @return string |
332
|
|
|
*/ |
333
|
17 |
|
protected function getQueueUrl() |
334
|
|
|
{ |
335
|
17 |
|
if (!$this->url) { |
336
|
17 |
|
$result = $this->client->createQueue([ |
337
|
17 |
|
'QueueName' => $this->name, |
338
|
17 |
|
'Attributes' => $this->options, |
339
|
17 |
|
]); |
340
|
|
|
|
341
|
17 |
|
$this->url = $result->get('QueueUrl'); |
342
|
17 |
|
} |
343
|
|
|
|
344
|
17 |
|
return $this->url; |
345
|
|
|
} |
346
|
|
|
|
347
|
|
|
/** |
348
|
|
|
* @return int |
349
|
|
|
*/ |
350
|
8 |
|
protected function getQueueVisibilityTimeout() |
351
|
|
|
{ |
352
|
8 |
|
if (!isset($this->options['VisibilityTimeout'])) { |
353
|
8 |
|
$result = $this->client->getQueueAttributes([ |
354
|
8 |
|
'QueueUrl' => $this->getQueueUrl(), |
355
|
8 |
|
'AttributeNames' => ['VisibilityTimeout'], |
356
|
8 |
|
]); |
357
|
|
|
|
358
|
8 |
|
$attributes = $result->get('Attributes'); |
359
|
8 |
|
$this->options['VisibilityTimeout'] = $attributes['VisibilityTimeout']; |
360
|
8 |
|
} |
361
|
|
|
|
362
|
8 |
|
return $this->options['VisibilityTimeout']; |
363
|
|
|
} |
364
|
|
|
|
365
|
|
|
/** |
366
|
|
|
* @return string |
367
|
|
|
*/ |
368
|
|
|
public function getQueueName() |
369
|
|
|
{ |
370
|
|
|
return $this->name; |
371
|
|
|
} |
372
|
|
|
} |
373
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.