1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Kaliop\Queueing\Plugins\SQSBundle\Adapter\SQS; |
4
|
|
|
|
5
|
|
|
use Kaliop\QueueingBundle\Queue\MessageConsumerInterface; |
6
|
|
|
use Kaliop\QueueingBundle\Queue\ConsumerInterface; |
7
|
|
|
use Kaliop\QueueingBundle\Queue\SignalHandlingConsumerInterface; |
8
|
|
|
use Kaliop\QueueingBundle\Adapter\ForcedStopException; |
9
|
|
|
use Aws\Sqs\SqsClient; |
10
|
|
|
use Aws\TraceMiddleware; |
11
|
|
|
use Psr\Log\LoggerInterface; |
12
|
|
|
|
13
|
|
|
/** |
14
|
|
|
* @todo support using short polling even when given a total timeout - even though it will complicate consume() even more than it already is |
15
|
|
|
*/ |
16
|
|
|
class Consumer implements ConsumerInterface, SignalHandlingConsumerInterface |
17
|
|
|
{ |
18
|
|
|
/** @var \Aws\Sqs\SqsClient */ |
19
|
|
|
protected $client; |
20
|
|
|
protected $queueUrl; |
21
|
|
|
protected $queueName; |
22
|
|
|
protected $callback; |
23
|
|
|
|
24
|
|
|
protected $routingKey; |
25
|
|
|
protected $routingKeyRegexp; |
26
|
|
|
protected $logger; |
27
|
|
|
// The message attribute used to store content-type. To be kept in sync with the Producer |
28
|
|
|
protected $contentTypeAttribute = 'contentType'; |
29
|
|
|
protected $routingAttribute = 'routingKey'; |
30
|
|
|
protected $debug = false; |
31
|
|
|
protected $forceStop = false; |
32
|
|
|
protected $forceStopReason; |
33
|
|
|
protected $dispatchSignals = false; |
34
|
|
|
protected $memoryLimit = null; |
35
|
|
|
// NB: when changing the defaults below, alter as well KaliopQueueingPluginsSQSExtension::load |
36
|
|
|
/** @var int $requestBatchSize how many messages to receive in each poll by default */ |
37
|
|
|
protected $requestBatchSize = 1; |
38
|
|
|
/** @var int $requestTimeout how long to wait for messages in each request. Switches between long and short polling */ |
39
|
|
|
protected $requestTimeout = 0; |
40
|
|
|
/** @var int the minimum interval between two queue polls - in microseconds */ |
41
|
|
|
protected $pollingIntervalUs = 200000; |
42
|
|
|
/** @var int $gcProbability the probability of calling gc_collect_cycles at the end of every poll */ |
43
|
|
|
protected $gcProbability = 1; |
44
|
|
|
|
45
|
|
|
const MAX_MESSAGES_PER_REQUEST = 10; |
46
|
|
|
const MAX_REQUEST_TIMEOUT = 20; |
47
|
|
|
|
48
|
10 |
|
public function __construct(array $config) |
49
|
|
|
{ |
50
|
10 |
|
$this->client = new SqsClient($config); |
51
|
10 |
|
} |
52
|
|
|
|
53
|
|
|
public function setLogger(LoggerInterface $logger = null) |
54
|
|
|
{ |
55
|
|
|
$this->logger = $logger; |
56
|
|
|
|
57
|
|
|
return $this; |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* Enabled debug. At the moment can not disable it |
62
|
|
|
* @param bool|array $debug |
63
|
|
|
* @return $this |
64
|
|
|
* |
65
|
|
|
* @todo test if using $handlerList->removeByInstance we can disable debug as well |
66
|
|
|
*/ |
67
|
6 |
View Code Duplication |
public function setDebug($debug) |
|
|
|
|
68
|
|
|
{ |
69
|
6 |
|
if ($debug == $this->debug) { |
70
|
6 |
|
return $this; |
71
|
|
|
} |
72
|
|
|
if ($debug) { |
73
|
|
|
$handlerList = $this->client->getHandlerList(); |
74
|
|
|
$handlerList->interpose(new TraceMiddleware($debug === true ? [] : $debug)); |
|
|
|
|
75
|
|
|
} |
76
|
|
|
|
77
|
|
|
return $this; |
78
|
|
|
} |
79
|
|
|
|
80
|
|
|
/** |
81
|
|
|
* @param int $limit MB |
82
|
|
|
* @return Consumer |
83
|
|
|
*/ |
84
|
|
|
public function setMemoryLimit($limit) |
85
|
|
|
{ |
86
|
|
|
$this->memoryLimit = $limit; |
87
|
|
|
|
88
|
|
|
return $this; |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
/** |
92
|
|
|
* @param string $key |
93
|
|
|
* @return Consumer |
94
|
|
|
*/ |
95
|
10 |
|
public function setRoutingKey($key) |
96
|
|
|
{ |
97
|
10 |
|
$this->routingKey = (string)$key; |
98
|
10 |
|
$this->routingKeyRegexp = '/'.str_replace(array('\*', '#'), array('[^.]*', '.*'), preg_quote($this->routingKey, '/')).'/'; |
99
|
10 |
|
return $this; |
100
|
|
|
} |
101
|
|
|
|
102
|
|
|
/** |
103
|
|
|
* @param MessageConsumerInterface $callback |
104
|
|
|
* @return Consumer |
105
|
|
|
*/ |
106
|
6 |
|
public function setCallback($callback) |
107
|
|
|
{ |
108
|
6 |
|
if (! $callback instanceof \Kaliop\QueueingBundle\Queue\MessageConsumerInterface) { |
109
|
|
|
throw new \RuntimeException('Can not set callback to SQS Consumer, as it is not a MessageConsumerInterface'); |
110
|
|
|
} |
111
|
6 |
|
$this->callback = $callback; |
112
|
|
|
|
113
|
6 |
|
return $this; |
114
|
|
|
} |
115
|
|
|
|
116
|
10 |
|
public function setQueueName($queueName) |
117
|
|
|
{ |
118
|
10 |
|
$this->queueName = $queueName; |
119
|
|
|
|
120
|
10 |
|
return $this; |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
/** |
124
|
|
|
* The number of messages to download in every request to the SQS API. |
125
|
|
|
* Bigger numbers are better for performances, but there is a limit on the size of the response which SQS will send. |
126
|
|
|
* @param int $amount |
127
|
|
|
* @return Consumer |
128
|
|
|
*/ |
129
|
|
|
public function setRequestBatchSize($amount) |
130
|
|
|
{ |
131
|
|
|
$this->requestBatchSize = $amount; |
132
|
|
|
|
133
|
|
|
return $this; |
134
|
|
|
} |
135
|
|
|
|
136
|
|
|
public function setRequestTimeout($timeout) |
137
|
|
|
{ |
138
|
|
|
$this->requestTimeout = $timeout; |
139
|
|
|
|
140
|
|
|
return $this; |
141
|
|
|
} |
142
|
|
|
|
143
|
|
|
public function setPollingInterval($intervalUs) |
144
|
|
|
{ |
145
|
|
|
$this->pollingIntervalUs = $intervalUs; |
146
|
|
|
|
147
|
|
|
return $this; |
148
|
|
|
} |
149
|
|
|
|
150
|
|
|
public function setGCProbability($probability) |
151
|
|
|
{ |
152
|
|
|
$this->gcProbability = $probability; |
153
|
|
|
|
154
|
|
|
return $this; |
155
|
|
|
} |
156
|
|
|
|
157
|
|
|
/** |
158
|
|
|
* @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#receivemessage |
159
|
|
|
* |
160
|
|
|
* @param int $amount 0 for unlimited |
161
|
|
|
* @param int $timeout seconds 0 for unlimited. NB: any value > 0 activates 'long polling' mode |
162
|
|
|
* @return void |
163
|
|
|
*/ |
164
|
6 |
|
public function consume($amount, $timeout = 0) |
165
|
|
|
{ |
166
|
6 |
|
if ($timeout > 0) { |
167
|
6 |
|
$endTime = time() + $timeout; |
168
|
6 |
|
$remainingTime = $timeout; |
169
|
|
|
} |
170
|
|
|
|
171
|
6 |
|
$received = 0; |
172
|
|
|
|
173
|
|
|
$receiveParams = array( |
174
|
6 |
|
'QueueUrl' => $this->queueUrl, |
175
|
|
|
'AttributeNames' => array('All'), |
176
|
|
|
'MessageAttributeNames' => array('All') |
177
|
|
|
); |
178
|
|
|
|
179
|
6 |
|
while(true) { |
180
|
6 |
|
$reqTime = microtime(true); |
181
|
|
|
|
182
|
6 |
|
if ($timeout > 0) { |
183
|
6 |
|
$wait = $remainingTime; |
|
|
|
|
184
|
|
|
|
185
|
6 |
|
if ($wait > $this->requestTimeout) { |
186
|
6 |
|
$wait = $this->requestTimeout; |
187
|
|
|
} |
188
|
|
|
} else { |
189
|
|
|
$wait = $this->requestTimeout; |
190
|
|
|
} |
191
|
|
|
|
192
|
|
|
// we leave it up to the API to fail |
193
|
|
|
//if ($wait > static::MAX_REQUEST_TIMEOUT) { |
194
|
|
|
// $wait = static::MAX_REQUEST_TIMEOUT; |
195
|
|
|
//} |
196
|
|
|
|
197
|
6 |
|
if ($wait > 0) { |
198
|
|
|
// according to the spec, this is maximum wait time. If messages are available sooner, they get delivered immediately |
199
|
|
|
$receiveParams['WaitTimeSeconds'] = $wait; |
200
|
|
|
} else { |
201
|
6 |
|
if (isset($receiveParams['WaitTimeSeconds'])) { |
202
|
|
|
unset($receiveParams['WaitTimeSeconds']); |
203
|
|
|
} |
204
|
|
|
} |
205
|
|
|
|
206
|
6 |
|
if ($amount > 0) { |
207
|
6 |
|
$limit = $amount - $received; |
208
|
|
|
|
209
|
6 |
|
if ($limit > $this->requestBatchSize) { |
210
|
6 |
|
$limit = $this->requestBatchSize; |
211
|
|
|
} |
212
|
|
|
} else { |
213
|
|
|
$limit = $this->requestBatchSize; |
214
|
|
|
} |
215
|
|
|
|
216
|
|
|
// we leave it up to the API to fial |
217
|
|
|
//if ($limit > static::MAX_MESSAGES_PER_REQUEST) { |
218
|
|
|
// $limit = static::MAX_MESSAGES_PER_REQUEST; |
219
|
|
|
//} |
220
|
|
|
|
221
|
6 |
|
$receiveParams['MaxNumberOfMessages'] = $limit; |
222
|
|
|
|
223
|
6 |
|
$result = $this->client->receiveMessage($receiveParams); |
224
|
6 |
|
$messages = $result->get('Messages'); |
225
|
|
|
|
226
|
6 |
|
if (is_array($messages)) { |
227
|
6 |
|
foreach($messages as $message) { |
228
|
|
|
|
229
|
|
|
// How we implement routing keys with SQS: since it is not supported natively, we check if the route |
230
|
|
|
// matches after having downloaded the message. If it does not match, we just skip processing it. |
231
|
|
|
// Since we will not call deleteMessage, SQS will requeue the message in a short time. |
232
|
|
|
// This is far from optimal, but it might be better than nothing |
233
|
6 |
|
if (! $this->matchRoutingKey($message)) { |
234
|
1 |
|
continue; |
235
|
|
|
} |
236
|
|
|
|
237
|
6 |
|
$received++; |
238
|
|
|
|
239
|
|
|
// removing the message from the queue is manual with SQS |
240
|
6 |
|
$this->client->deleteMessage(array( |
241
|
6 |
|
'QueueUrl' => $this->queueUrl, |
242
|
6 |
|
'ReceiptHandle' => $message['ReceiptHandle'] |
243
|
|
|
)); |
244
|
|
|
|
245
|
6 |
|
$data = $message['Body']; |
246
|
6 |
|
unset($message['Body']); |
247
|
|
|
|
248
|
6 |
|
$contentType = isset( $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] ) ? |
249
|
6 |
|
$message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] : ''; |
250
|
|
|
|
251
|
6 |
|
if ($contentType != '') { |
252
|
6 |
|
$this->callback->receive(new Message($data, $message, $contentType, $this->queueName)); |
253
|
|
|
} else { |
254
|
|
|
if ($this->logger) { |
255
|
|
|
$this->logger->warning('The SQS Consumer received a message with no content-type attribute. Assuming default'); |
256
|
|
|
} |
257
|
|
|
|
258
|
6 |
|
$this->callback->receive(new Message($data, $message, null, $this->queueName)); |
259
|
|
|
} |
260
|
|
|
} |
261
|
|
|
} |
262
|
|
|
|
263
|
6 |
|
$this->maybeStopConsumer(); |
264
|
|
|
|
265
|
6 |
|
if ($amount > 0 && $received >= $amount) { |
266
|
6 |
|
return; |
267
|
|
|
} |
268
|
|
|
|
269
|
2 |
|
if ($timeout > 0 && ($remainingTime = ($endTime - time())) <= 0) { |
|
|
|
|
270
|
|
|
return; |
271
|
|
|
} |
272
|
|
|
|
273
|
|
|
// observe MAX 5 requests per sec per queue by default: sleep for 0.2 secs in between requests |
274
|
2 |
|
$passedUs = (microtime(true) - $reqTime) * 1000000; |
275
|
2 |
|
if ($passedUs < $this->pollingIntervalUs) { |
276
|
2 |
|
usleep($this->pollingIntervalUs - $passedUs); |
277
|
|
|
} |
278
|
|
|
} |
279
|
|
|
} |
280
|
|
|
|
281
|
|
|
/** |
282
|
|
|
* Adopt the RabbitMQ routing key algorithm: |
283
|
|
|
* - split on dots |
284
|
|
|
* - * matches one word (q: also empty ones?) |
285
|
|
|
* - # matches any words |
286
|
|
|
* |
287
|
|
|
* @todo the current implementation is naive and does probably not match RabbitMq if the routing key is something like aaa.*b.ccc |
288
|
|
|
* A better implementation would probably involve usage of a trie |
289
|
|
|
* Some pointers on how to implement it fast: http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-June/013564.html |
290
|
|
|
* @see setRoutingKey |
291
|
|
|
* |
292
|
|
|
* @param array $message |
293
|
|
|
* @return bool |
294
|
|
|
*/ |
295
|
6 |
|
protected function matchRoutingKey(array $message) |
296
|
|
|
{ |
297
|
6 |
|
if ($this->routingKey === null || $this->routingKey === '') { |
298
|
3 |
|
return true; |
299
|
|
|
} |
300
|
3 |
|
if (!isset($message['MessageAttributes'][$this->routingAttribute]['StringValue'])) { |
301
|
|
|
if ($this->logger) { |
302
|
|
|
$this->logger->warning('The SQS Consumer has a routing key set, and it received a message without routing information. Processing it anyway'); |
303
|
|
|
} |
304
|
|
|
return true; |
305
|
|
|
} |
306
|
|
|
|
307
|
3 |
|
return preg_match( |
308
|
3 |
|
$this->routingKeyRegexp, |
309
|
3 |
|
$message['MessageAttributes'][$this->routingAttribute]['StringValue'] |
310
|
|
|
); |
311
|
|
|
} |
312
|
|
|
|
313
|
|
|
/** |
314
|
|
|
* @param string $queueUrl the complete queue name as used by SQS |
315
|
|
|
* @return Consumer |
316
|
|
|
*/ |
317
|
10 |
|
public function setQueueUrl($queueUrl) |
318
|
|
|
{ |
319
|
10 |
|
$this->queueUrl = $queueUrl; |
320
|
|
|
|
321
|
10 |
|
return $this; |
322
|
|
|
} |
323
|
|
|
|
324
|
|
|
/** |
325
|
|
|
* @return string the complete queue name as used by SQS |
326
|
|
|
*/ |
327
|
|
|
public function getQueueUrl() |
328
|
|
|
{ |
329
|
|
|
return $this->queueUrl; |
330
|
|
|
} |
331
|
|
|
|
332
|
|
|
public function setHandleSignals($doHandle) |
333
|
|
|
{ |
334
|
|
|
$this->dispatchSignals = $doHandle; |
335
|
|
|
} |
336
|
|
|
|
337
|
|
|
|
338
|
|
|
public function forceStop($reason = '') |
339
|
|
|
{ |
340
|
|
|
$this->forceStop = true; |
341
|
|
|
$this->forceStopReason = $reason; |
342
|
|
|
} |
343
|
|
|
|
344
|
|
|
/** |
345
|
|
|
* Dispatches signals and throws an exception if user wants to stop. To be called at execution points when there is no data loss |
346
|
|
|
* |
347
|
|
|
* @throws ForcedStopException |
348
|
|
|
*/ |
349
|
6 |
|
protected function maybeStopConsumer() |
350
|
|
|
{ |
351
|
6 |
|
if ($this->dispatchSignals) { |
352
|
|
|
pcntl_signal_dispatch(); |
353
|
|
|
} |
354
|
|
|
|
355
|
6 |
|
if ($this->gcProbability > 0 && rand(1, 100) <= $this->gcProbability) { |
356
|
|
|
gc_collect_cycles(); |
357
|
|
|
} |
358
|
|
|
|
359
|
6 |
|
if ($this->memoryLimit > 0 && !$this->forceStop && memory_get_usage(true) >= ($this->memoryLimit * 1024 * 1024) ) { |
360
|
|
|
$this->forceStop("Memory limit of {$this->memoryLimit} MB reached while consuming messages"); |
361
|
|
|
} |
362
|
|
|
|
363
|
6 |
|
if ($this->forceStop) { |
364
|
|
|
throw new ForcedStopException($this->forceStopReason); |
365
|
|
|
} |
366
|
6 |
|
} |
367
|
|
|
} |
368
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.