1
|
|
|
<?php |
2
|
|
|
namespace NeedleProject\LaravelRabbitMq\Entity; |
3
|
|
|
|
4
|
|
|
use NeedleProject\LaravelRabbitMq\AMQPConnection; |
5
|
|
|
use NeedleProject\LaravelRabbitMq\ConsumerInterface; |
6
|
|
|
use NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor; |
7
|
|
|
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface; |
8
|
|
|
use NeedleProject\LaravelRabbitMq\PublisherInterface; |
9
|
|
|
use PhpAmqpLib\Channel\AMQPChannel; |
10
|
|
|
use PhpAmqpLib\Exception\AMQPProtocolChannelException; |
11
|
|
|
use PhpAmqpLib\Exception\AMQPTimeoutException; |
12
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
13
|
|
|
use Psr\Log\LoggerAwareInterface; |
14
|
|
|
use Psr\Log\LoggerAwareTrait; |
15
|
|
|
|
16
|
|
|
/** |
17
|
|
|
* Class QueueEntity |
18
|
|
|
* |
19
|
|
|
* @package NeedleProject\LaravelRabbitMq\Entity |
20
|
|
|
* @author Adrian Tilita <[email protected]> |
21
|
|
|
*/ |
22
|
|
|
class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityInterface, LoggerAwareInterface |
23
|
|
|
{ |
24
|
|
|
use LoggerAwareTrait; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* @const array Default connections parameters |
28
|
|
|
*/ |
29
|
|
|
const DEFAULTS = [ |
30
|
|
|
// Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED) |
31
|
|
|
'passive' => false, |
32
|
|
|
// Entities with durable will be re-created uppon server restart |
33
|
|
|
'durable' => false, |
34
|
|
|
// whether to use it by only one channel, then it gets deleted |
35
|
|
|
'exclusive' => false, |
36
|
|
|
// Whether to delete it when the queue has no event on it |
37
|
|
|
'auto_delete' => false, |
38
|
|
|
// Whether the exchange can be used by a publisher or block it (declared just for internal "wiring") |
39
|
|
|
'internal' => false, |
40
|
|
|
// Whether to receive a Declare confirmation |
41
|
|
|
'nowait' => false, |
42
|
|
|
// Whether to auto create the entity before publishing/consuming it |
43
|
|
|
'auto_create' => false, |
44
|
|
|
// whether to "hide" the exception on re-declare. |
45
|
|
|
// if the `passive` filter is set true, this is redundant |
46
|
|
|
'throw_exception_on_redeclare' => true, |
47
|
|
|
// whether to throw on exception when trying to |
48
|
|
|
// bind to an in-existent queue/exchange |
49
|
|
|
'throw_exception_on_bind_fail' => true, |
50
|
|
|
]; |
51
|
|
|
|
52
|
|
|
/** |
53
|
|
|
* @var AMQPConnection |
54
|
|
|
*/ |
55
|
|
|
protected $connection; |
56
|
|
|
|
57
|
|
|
/** |
58
|
|
|
* @var string |
59
|
|
|
*/ |
60
|
|
|
protected $aliasName; |
61
|
|
|
|
62
|
|
|
/** |
63
|
|
|
* @var array |
64
|
|
|
*/ |
65
|
|
|
protected $attributes; |
66
|
|
|
|
67
|
|
|
/** |
68
|
|
|
* @var int |
69
|
|
|
*/ |
70
|
|
|
protected $prefetchCount = 1; |
71
|
|
|
|
72
|
|
|
/** |
73
|
|
|
* @var null|string|MessageProcessorInterface |
74
|
|
|
*/ |
75
|
|
|
protected $messageProcessor = null; |
76
|
|
|
|
77
|
|
|
/** |
78
|
|
|
* @var int |
79
|
|
|
*/ |
80
|
|
|
protected $limitMessageCount; |
81
|
|
|
|
82
|
|
|
/** |
83
|
|
|
* @var int |
84
|
|
|
*/ |
85
|
|
|
protected $limitSecondsUptime; |
86
|
|
|
|
87
|
|
|
/** |
88
|
|
|
* @var int |
89
|
|
|
*/ |
90
|
|
|
protected $limitMemoryConsumption; |
91
|
|
|
|
92
|
|
|
/** |
93
|
|
|
* @var double |
94
|
|
|
*/ |
95
|
|
|
protected $startTime = 0; |
96
|
|
|
|
97
|
|
|
/** |
98
|
|
|
* @param AMQPConnection $connection |
99
|
|
|
* @param string $aliasName |
100
|
|
|
* @param array $exchangeDetails |
101
|
|
|
* @return QueueEntity |
102
|
|
|
*/ |
103
|
17 |
|
public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails) |
104
|
|
|
{ |
105
|
17 |
|
return new static( |
106
|
|
|
$connection, |
107
|
|
|
$aliasName, |
108
|
17 |
|
array_merge(self::DEFAULTS, $exchangeDetails) |
109
|
|
|
); |
110
|
|
|
} |
111
|
|
|
|
112
|
|
|
/** |
113
|
|
|
* @return string |
114
|
|
|
*/ |
115
|
3 |
|
public function getAliasName(): string |
116
|
|
|
{ |
117
|
3 |
|
return $this->aliasName; |
118
|
|
|
} |
119
|
|
|
|
120
|
|
|
/** |
121
|
|
|
* ExchangeEntity constructor. |
122
|
|
|
* |
123
|
|
|
* @param AMQPConnection $connection |
124
|
|
|
* @param string $aliasName |
125
|
|
|
* @param array $attributes |
126
|
|
|
*/ |
127
|
17 |
|
public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = []) |
128
|
|
|
{ |
129
|
17 |
|
$this->connection = $connection; |
130
|
17 |
|
$this->aliasName = $aliasName; |
131
|
17 |
|
$this->attributes = $attributes; |
132
|
17 |
|
} |
133
|
|
|
|
134
|
|
|
/** |
135
|
|
|
* @param int $prefetchCount |
136
|
|
|
* @return ConsumerInterface |
137
|
|
|
*/ |
138
|
2 |
|
public function setPrefetchCount(int $prefetchCount): ConsumerInterface |
139
|
|
|
{ |
140
|
2 |
|
$this->prefetchCount = $prefetchCount; |
141
|
2 |
|
return $this; |
142
|
|
|
} |
143
|
|
|
|
144
|
|
|
/** |
145
|
|
|
* @param string $messageProcessor |
146
|
|
|
* @return ConsumerInterface |
147
|
|
|
*/ |
148
|
2 |
|
public function setMessageProcessor(string $messageProcessor): ConsumerInterface |
149
|
|
|
{ |
150
|
2 |
|
$this->messageProcessor = $messageProcessor; |
151
|
2 |
|
return $this; |
152
|
|
|
} |
153
|
|
|
|
154
|
|
|
/** |
155
|
|
|
* @return AMQPConnection |
156
|
|
|
*/ |
157
|
8 |
|
protected function getConnection(): AMQPConnection |
158
|
|
|
{ |
159
|
8 |
|
return $this->connection; |
160
|
|
|
} |
161
|
|
|
|
162
|
|
|
/** |
163
|
|
|
* @return AMQPChannel |
164
|
|
|
*/ |
165
|
8 |
|
protected function getChannel(): AMQPChannel |
166
|
|
|
{ |
167
|
8 |
|
return $this->getConnection()->getChannel(); |
168
|
|
|
} |
169
|
|
|
|
170
|
|
|
/** |
171
|
|
|
* Create the Queue |
172
|
|
|
*/ |
173
|
4 |
View Code Duplication |
public function create() |
|
|
|
|
174
|
|
|
{ |
175
|
|
|
try { |
176
|
4 |
|
$this->getChannel() |
177
|
4 |
|
->queue_declare( |
178
|
4 |
|
$this->attributes['name'], |
179
|
4 |
|
$this->attributes['passive'], |
180
|
4 |
|
$this->attributes['durable'], |
181
|
4 |
|
$this->attributes['exclusive'], |
182
|
4 |
|
$this->attributes['auto_delete'], |
183
|
4 |
|
$this->attributes['internal'], |
184
|
4 |
|
$this->attributes['nowait'] |
185
|
|
|
); |
186
|
2 |
|
} catch (AMQPProtocolChannelException $e) { |
187
|
|
|
// 406 is a soft error triggered for precondition failure (when redeclaring with different parameters) |
188
|
2 |
|
if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) { |
|
|
|
|
189
|
1 |
|
throw $e; |
190
|
|
|
} |
191
|
|
|
// a failure trigger channels closing process |
192
|
1 |
|
$this->reconnect(); |
193
|
|
|
} |
194
|
3 |
|
} |
195
|
|
|
|
196
|
4 |
View Code Duplication |
public function bind() |
|
|
|
|
197
|
|
|
{ |
198
|
4 |
|
if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) { |
199
|
1 |
|
return; |
200
|
|
|
} |
201
|
3 |
|
foreach ($this->attributes['bind'] as $bindItem) { |
202
|
|
|
try { |
203
|
3 |
|
$this->getChannel() |
204
|
3 |
|
->queue_bind( |
205
|
3 |
|
$this->attributes['name'], |
206
|
3 |
|
$bindItem['exchange'], |
207
|
3 |
|
$bindItem['routing_key'] |
208
|
|
|
); |
209
|
1 |
|
} catch (AMQPProtocolChannelException $e) { |
210
|
|
|
// 404 is the code for trying to bind to an non-existing entity |
211
|
1 |
|
if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) { |
|
|
|
|
212
|
1 |
|
throw $e; |
213
|
|
|
} |
214
|
2 |
|
$this->reconnect(); |
215
|
|
|
} |
216
|
|
|
} |
217
|
2 |
|
} |
218
|
|
|
|
219
|
|
|
/** |
220
|
|
|
* Delete the queue |
221
|
|
|
*/ |
222
|
1 |
|
public function delete() |
223
|
|
|
{ |
224
|
1 |
|
$this->getChannel()->queue_delete($this->attributes['name']); |
225
|
1 |
|
} |
226
|
|
|
|
227
|
|
|
/** |
228
|
|
|
* {@inheritdoc} |
229
|
|
|
*/ |
230
|
1 |
|
public function reconnect() |
231
|
|
|
{ |
232
|
1 |
|
$this->getConnection()->reconnect(); |
233
|
1 |
|
} |
234
|
|
|
|
235
|
|
|
/** |
236
|
|
|
* Publish a message |
237
|
|
|
* |
238
|
|
|
* @param string $message |
239
|
|
|
* @param string $routingKey |
240
|
|
|
* @return mixed|void |
241
|
|
|
* @throws AMQPProtocolChannelException |
242
|
|
|
*/ |
243
|
2 |
View Code Duplication |
public function publish(string $message, string $routingKey = '') |
|
|
|
|
244
|
|
|
{ |
245
|
2 |
|
if ($this->attributes['auto_create'] === true) { |
246
|
1 |
|
$this->create(); |
247
|
1 |
|
$this->bind(); |
248
|
|
|
} |
249
|
2 |
|
$this->getChannel() |
250
|
2 |
|
->basic_publish( |
251
|
2 |
|
new AMQPMessage($message), |
252
|
2 |
|
'', |
253
|
2 |
|
$this->attributes['name'], |
254
|
2 |
|
true |
255
|
|
|
); |
256
|
2 |
|
} |
257
|
|
|
|
258
|
|
|
/** |
259
|
|
|
* {@inheritdoc} |
260
|
|
|
* |
261
|
|
|
* @param int $messages |
262
|
|
|
* @param int $seconds |
263
|
|
|
* @param int $maxMemory |
264
|
|
|
* @return int |
265
|
|
|
*/ |
266
|
|
|
public function startConsuming(int $messages, int $seconds, int $maxMemory) |
267
|
|
|
{ |
268
|
|
|
$this->setupConsumer($messages, $seconds, $maxMemory); |
269
|
|
|
while (false === $this->shouldStopConsuming()) { |
270
|
|
|
try { |
271
|
|
|
$this->getChannel()->wait(null, false, 1); |
272
|
|
|
} catch (AMQPTimeoutException $e) { |
273
|
|
|
usleep(1000); |
274
|
|
|
$this->getConnection()->reconnect(); |
275
|
|
|
$this->setupChannelConsumer(); |
276
|
|
|
} catch (\Throwable $e) { |
277
|
|
|
// stop the consumer |
278
|
|
|
$this->stopConsuming(); |
279
|
|
|
$this->logger->notice(sprintf( |
280
|
|
|
"Stopped consuming: %s in %s:%d", |
281
|
|
|
get_class($e) . ' - ' . $e->getMessage(), |
282
|
|
|
(string)$e->getFile(), |
283
|
|
|
(int)$e->getLine() |
284
|
|
|
)); |
285
|
|
|
return 1; |
286
|
|
|
} |
287
|
|
|
} |
288
|
|
|
return 0; |
289
|
|
|
} |
290
|
|
|
|
291
|
|
|
/** |
292
|
|
|
* @return bool |
293
|
|
|
*/ |
294
|
|
|
protected function shouldStopConsuming(): bool |
295
|
|
|
{ |
296
|
|
|
if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) { |
297
|
|
|
$this->logger->debug( |
298
|
|
|
"Stopped consumer", |
299
|
|
|
[ |
300
|
|
|
'limit' => 'time_limit', |
301
|
|
|
'value' => sprintf("%.2f", microtime(true) - $this->startTime) |
302
|
|
|
] |
303
|
|
|
); |
304
|
|
|
return true; |
305
|
|
|
} |
306
|
|
|
if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) { |
307
|
|
|
$this->logger->debug( |
308
|
|
|
"Stopped consumer", |
309
|
|
|
[ |
310
|
|
|
'limit' => 'memory_limit', |
311
|
|
|
'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2) |
312
|
|
|
] |
313
|
|
|
); |
314
|
|
|
return true; |
315
|
|
|
} |
316
|
|
|
|
317
|
|
|
if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) { |
318
|
|
|
$this->logger->debug( |
319
|
|
|
"Stopped consumer", |
320
|
|
|
['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()] |
321
|
|
|
); |
322
|
|
|
return true; |
323
|
|
|
} |
324
|
|
|
return false; |
325
|
|
|
} |
326
|
|
|
|
327
|
|
|
/** |
328
|
|
|
* Stop the consumer |
329
|
|
|
*/ |
330
|
|
|
public function stopConsuming() |
331
|
|
|
{ |
332
|
|
|
try { |
333
|
|
|
$this->getChannel()->basic_cancel($this->getConsumerTag(), false, true); |
334
|
|
|
} catch (\Throwable $e) { |
335
|
|
|
$this->logger->notice("Got " . $e->getMessage() . " of type " . get_class($e)); |
336
|
|
|
} |
337
|
|
|
} |
338
|
|
|
|
339
|
|
|
/** |
340
|
|
|
* Setup the consumer |
341
|
|
|
* |
342
|
|
|
* @param int $messages |
343
|
|
|
* @param int $seconds |
344
|
|
|
* @param int $maxMemory |
345
|
|
|
*/ |
346
|
|
|
protected function setupConsumer(int $messages, int $seconds, int $maxMemory) |
347
|
|
|
{ |
348
|
|
|
$this->limitMessageCount = $messages; |
349
|
|
|
$this->limitSecondsUptime = $seconds; |
350
|
|
|
$this->limitMemoryConsumption = $maxMemory; |
351
|
|
|
|
352
|
|
|
$this->startTime = microtime(true); |
353
|
|
|
|
354
|
|
|
$this->setupChannelConsumer(); |
355
|
|
|
|
356
|
|
|
$this->registerShutdownHandler(); |
357
|
|
|
$this->handleKillSignals(); |
358
|
|
|
} |
359
|
|
|
|
360
|
|
|
private function setupChannelConsumer() |
361
|
|
|
{ |
362
|
|
|
if ($this->attributes['auto_create'] === true) { |
363
|
|
|
$this->create(); |
364
|
|
|
$this->bind(); |
365
|
|
|
} |
366
|
|
|
|
367
|
|
|
$this->getChannel() |
368
|
|
|
->basic_qos(null, $this->prefetchCount, true); |
369
|
|
|
|
370
|
|
|
$this->getChannel() |
371
|
|
|
->basic_consume( |
372
|
|
|
$this->attributes['name'], |
373
|
|
|
$this->getConsumerTag(), |
374
|
|
|
false, |
375
|
|
|
false, |
376
|
|
|
false, |
377
|
|
|
false, |
378
|
|
|
[ |
379
|
|
|
$this, |
380
|
|
|
'consume' |
381
|
|
|
] |
382
|
|
|
); |
383
|
|
|
} |
384
|
|
|
|
385
|
|
|
/** |
386
|
|
|
* Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted" |
387
|
|
|
*/ |
388
|
|
|
private function registerShutdownHandler() |
389
|
|
|
{ |
390
|
|
|
$consumer = $this; |
391
|
|
|
register_shutdown_function(function () use ($consumer) { |
392
|
|
|
$consumer->stopConsuming(); |
393
|
|
|
}); |
394
|
|
|
} |
395
|
|
|
|
396
|
|
|
/** |
397
|
|
|
* Register signals |
398
|
|
|
*/ |
399
|
|
|
protected function handleKillSignals() |
400
|
|
|
{ |
401
|
|
|
if (extension_loaded('pcntl')) { |
402
|
|
|
pcntl_signal(SIGTERM, [$this, 'catchKillSignal']); |
403
|
|
|
pcntl_signal(SIGINT, [$this, 'catchKillSignal']); |
404
|
|
|
|
405
|
|
|
if (function_exists('pcntl_signal_dispatch')) { |
406
|
|
|
// let the signal go forward |
407
|
|
|
pcntl_signal_dispatch(); |
408
|
|
|
} |
409
|
|
|
} |
410
|
|
|
} |
411
|
|
|
|
412
|
|
|
/** |
413
|
|
|
* Handle Kill Signals |
414
|
|
|
* @param int $signalNumber |
415
|
|
|
*/ |
416
|
|
|
public function catchKillSignal(int $signalNumber) |
417
|
|
|
{ |
418
|
|
|
$this->stopConsuming(); |
419
|
|
|
$this->logger->debug(sprintf("Caught signal %d", $signalNumber)); |
420
|
|
|
} |
421
|
|
|
|
422
|
|
|
/** |
423
|
|
|
* It is the tag that is listed in RabbitMQ UI as the consumer "name" |
424
|
|
|
* |
425
|
|
|
* @return string |
426
|
|
|
*/ |
427
|
|
|
private function getConsumerTag(): string |
428
|
|
|
{ |
429
|
|
|
return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid()); |
430
|
|
|
} |
431
|
|
|
|
432
|
|
|
/** |
433
|
|
|
* @return MessageProcessorInterface |
434
|
|
|
*/ |
435
|
1 |
|
private function getMessageProcessor(): MessageProcessorInterface |
436
|
|
|
{ |
437
|
1 |
|
if (!($this->messageProcessor instanceof MessageProcessorInterface)) { |
438
|
|
|
$this->messageProcessor = app($this->messageProcessor); |
439
|
|
|
if ($this->messageProcessor instanceof AbstractMessageProcessor) { |
440
|
|
|
$this->messageProcessor->setLogger($this->logger); |
441
|
|
|
} |
442
|
|
|
} |
443
|
1 |
|
return $this->messageProcessor; |
444
|
|
|
} |
445
|
|
|
|
446
|
|
|
/** |
447
|
|
|
* @param AMQPMessage $message |
448
|
|
|
* @throws \Throwable |
449
|
|
|
*/ |
450
|
1 |
|
public function consume(AMQPMessage $message) |
451
|
|
|
{ |
452
|
|
|
try { |
453
|
1 |
|
$this->getMessageProcessor()->consume($message); |
454
|
1 |
|
$this->logger->debug("Consumed message", [$message->getBody()]); |
455
|
|
|
} catch (\Throwable $e) { |
456
|
|
|
$this->logger->notice( |
457
|
|
|
sprintf( |
458
|
|
|
"Got %s from %s in %d", |
459
|
|
|
$e->getMessage(), |
460
|
|
|
(string)$e->getFile(), |
461
|
|
|
(int)$e->getLine() |
462
|
|
|
) |
463
|
|
|
); |
464
|
|
|
// let the exception slide, the processor should handle |
465
|
|
|
// exception, this is just a notice that should not |
466
|
|
|
// ever appear |
467
|
|
|
throw $e; |
468
|
|
|
} |
469
|
1 |
|
} |
470
|
|
|
} |
471
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.