1
|
|
|
<?php |
2
|
|
|
namespace NeedleProject\LaravelRabbitMq\Entity; |
3
|
|
|
|
4
|
|
|
use NeedleProject\LaravelRabbitMq\AMQPConnection; |
5
|
|
|
use NeedleProject\LaravelRabbitMq\ConsumerInterface; |
6
|
|
|
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface; |
7
|
|
|
use NeedleProject\LaravelRabbitMq\PublisherInterface; |
8
|
|
|
use PhpAmqpLib\Channel\AMQPChannel; |
9
|
|
|
use PhpAmqpLib\Exception\AMQPTimeoutException; |
10
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
11
|
|
|
use Psr\Log\LoggerAwareInterface; |
12
|
|
|
use Psr\Log\LoggerAwareTrait; |
13
|
|
|
|
14
|
|
|
/** |
15
|
|
|
* Class QueueEntity |
16
|
|
|
* |
17
|
|
|
* @package NeedleProject\LaravelRabbitMq\Entity |
18
|
|
|
* @author Adrian Tilita <[email protected]> |
19
|
|
|
*/ |
20
|
|
|
class QueueEntity implements PublisherInterface, ConsumerInterface, LoggerAwareInterface |
21
|
|
|
{ |
22
|
|
|
use LoggerAwareTrait; |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* @const array Default connections parameters |
26
|
|
|
*/ |
27
|
|
|
const DEFAULTS = [ |
28
|
|
|
'passive' => false, |
29
|
|
|
'durable' => false, |
30
|
|
|
'exclusive' => false, |
31
|
|
|
'auto_delete' => false, |
32
|
|
|
'internal' => false, |
33
|
|
|
'nowait' => false, |
34
|
|
|
]; |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* @var AMQPConnection |
38
|
|
|
*/ |
39
|
|
|
protected $connection; |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* @var string |
43
|
|
|
*/ |
44
|
|
|
protected $aliasName; |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* @var array |
48
|
|
|
*/ |
49
|
|
|
protected $attributes; |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* @var int |
53
|
|
|
*/ |
54
|
|
|
protected $prefetchCount = 1; |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* @var null|string|MessageProcessorInterface |
58
|
|
|
*/ |
59
|
|
|
protected $messageProcessor = null; |
60
|
|
|
|
61
|
|
|
/** |
62
|
|
|
* @var int |
63
|
|
|
*/ |
64
|
|
|
protected $limitMessageCount; |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* @var int |
68
|
|
|
*/ |
69
|
|
|
protected $limitSecondsUptime; |
70
|
|
|
|
71
|
|
|
/** |
72
|
|
|
* @var int |
73
|
|
|
*/ |
74
|
|
|
protected $limitMemoryConsumption; |
75
|
|
|
|
76
|
|
|
/** |
77
|
|
|
* @var double |
78
|
|
|
*/ |
79
|
|
|
protected $startTime = 0; |
80
|
|
|
|
81
|
|
|
/** |
82
|
|
|
* @param AMQPConnection $connection |
83
|
|
|
* @param string $aliasName |
84
|
|
|
* @param array $exchangeDetails |
85
|
|
|
* @return QueueEntity |
86
|
|
|
*/ |
87
|
11 |
|
public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails) |
88
|
|
|
{ |
89
|
11 |
|
return new static( |
90
|
|
|
$connection, |
91
|
|
|
$aliasName, |
92
|
11 |
|
array_merge(self::DEFAULTS, $exchangeDetails) |
93
|
|
|
); |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
/** |
97
|
|
|
* @return string |
98
|
|
|
*/ |
99
|
3 |
|
public function getAliasName(): string |
100
|
|
|
{ |
101
|
3 |
|
return $this->aliasName; |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
/** |
105
|
|
|
* ExchangeEntity constructor. |
106
|
|
|
* |
107
|
|
|
* @param AMQPConnection $connection |
108
|
|
|
* @param string $aliasName |
109
|
|
|
* @param array $attributes |
110
|
|
|
*/ |
111
|
11 |
|
public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = []) |
112
|
|
|
{ |
113
|
11 |
|
$this->connection = $connection; |
114
|
11 |
|
$this->aliasName = $aliasName; |
115
|
11 |
|
$this->attributes = $attributes; |
116
|
11 |
|
} |
117
|
|
|
|
118
|
|
|
/** |
119
|
|
|
* @param int $prefetchCount |
120
|
|
|
* @return ConsumerInterface |
121
|
|
|
*/ |
122
|
2 |
|
public function setPrefetchCount(int $prefetchCount): ConsumerInterface |
123
|
|
|
{ |
124
|
2 |
|
$this->prefetchCount = $prefetchCount; |
125
|
2 |
|
return $this; |
126
|
|
|
} |
127
|
|
|
|
128
|
|
|
/** |
129
|
|
|
* @param string $messageProcessor |
130
|
|
|
* @return ConsumerInterface |
131
|
|
|
*/ |
132
|
2 |
|
public function setMessageProcessor(string $messageProcessor): ConsumerInterface |
133
|
|
|
{ |
134
|
2 |
|
$this->messageProcessor = $messageProcessor; |
135
|
2 |
|
return $this; |
136
|
|
|
} |
137
|
|
|
|
138
|
|
|
/** |
139
|
|
|
* @return AMQPConnection |
140
|
|
|
*/ |
141
|
4 |
|
protected function getConnection(): AMQPConnection |
142
|
|
|
{ |
143
|
4 |
|
return $this->connection; |
144
|
|
|
} |
145
|
|
|
|
146
|
|
|
/** |
147
|
|
|
* @return AMQPChannel |
148
|
|
|
*/ |
149
|
4 |
|
protected function getChannel(): AMQPChannel |
150
|
|
|
{ |
151
|
4 |
|
return $this->getConnection()->getChannel(); |
152
|
|
|
} |
153
|
|
|
|
154
|
|
|
/** |
155
|
|
|
* Create the Queue |
156
|
|
|
*/ |
157
|
1 |
|
public function create() |
158
|
|
|
{ |
159
|
1 |
|
$this->getChannel() |
160
|
1 |
|
->queue_declare( |
161
|
1 |
|
$this->attributes['name'], |
162
|
1 |
|
$this->attributes['passive'], |
163
|
1 |
|
$this->attributes['durable'], |
164
|
1 |
|
$this->attributes['exclusive'], |
165
|
1 |
|
$this->attributes['auto_delete'], |
166
|
1 |
|
$this->attributes['internal'], |
167
|
1 |
|
$this->attributes['nowait'] |
168
|
|
|
); |
169
|
1 |
|
} |
170
|
|
|
|
171
|
1 |
View Code Duplication |
public function bind() |
|
|
|
|
172
|
|
|
{ |
173
|
1 |
|
if (isset($this->attributes['bind'])) { |
174
|
1 |
|
foreach ($this->attributes['bind'] as $bindItem) { |
175
|
1 |
|
$this->getChannel() |
176
|
1 |
|
->queue_bind( |
177
|
1 |
|
$this->attributes['name'], |
178
|
1 |
|
$bindItem['exchange'], |
179
|
1 |
|
$bindItem['routing_key'] |
180
|
|
|
); |
181
|
|
|
} |
182
|
|
|
} |
183
|
1 |
|
} |
184
|
|
|
|
185
|
|
|
/** |
186
|
|
|
* Delete the queue |
187
|
|
|
*/ |
188
|
1 |
|
public function delete() |
189
|
|
|
{ |
190
|
1 |
|
$this->getChannel()->queue_delete($this->attributes['name']); |
191
|
1 |
|
} |
192
|
|
|
|
193
|
|
|
|
194
|
|
|
/** |
195
|
|
|
* Publish a message |
196
|
|
|
* |
197
|
|
|
* @param string $message |
198
|
|
|
* @param string $routingKey |
199
|
|
|
* @return void |
200
|
|
|
*/ |
201
|
1 |
|
public function publish(string $message, string $routingKey = '') |
202
|
|
|
{ |
203
|
1 |
|
$this->getChannel() |
204
|
1 |
|
->basic_publish( |
205
|
1 |
|
new AMQPMessage($message), |
206
|
1 |
|
'', |
207
|
1 |
|
$this->attributes['name'], |
208
|
1 |
|
true |
209
|
|
|
); |
210
|
1 |
|
} |
211
|
|
|
|
212
|
|
|
/** |
213
|
|
|
* {@inheritdoc} |
214
|
|
|
* |
215
|
|
|
* @param int $messages |
216
|
|
|
* @param int $seconds |
217
|
|
|
* @param int $maxMemory |
218
|
|
|
* @return int |
219
|
|
|
*/ |
220
|
|
|
public function startConsuming(int $messages, int $seconds, int $maxMemory) |
221
|
|
|
{ |
222
|
|
|
$this->setupConsumer($messages, $seconds, $maxMemory); |
223
|
|
|
while (false === $this->shouldStopConsuming()) { |
224
|
|
|
try { |
225
|
|
|
$this->getChannel()->wait(null, false, 1); |
226
|
|
|
} catch (AMQPTimeoutException $e) { |
227
|
|
|
$this->logger->debug("Timeout exceeded, reconnecting!"); |
228
|
|
|
usleep(1000); |
229
|
|
|
$this->getConnection()->reconnect(); |
230
|
|
|
$this->setupChannelConsumer(); |
231
|
|
|
} catch (\Throwable $e) { |
232
|
|
|
// stop the consumer |
233
|
|
|
$this->stopConsuming(); |
234
|
|
|
$this->logger->critical(sprintf( |
235
|
|
|
"Stopped consuming: %s in %s:%d", |
236
|
|
|
$e->getMessage(), |
237
|
|
|
(string)$e->getFile(), |
238
|
|
|
(int)$e->getLine() |
239
|
|
|
)); |
240
|
|
|
return 1; |
241
|
|
|
} |
242
|
|
|
} |
243
|
|
|
return 0; |
244
|
|
|
} |
245
|
|
|
|
246
|
|
|
/** |
247
|
|
|
* @return bool |
248
|
|
|
*/ |
249
|
|
|
protected function shouldStopConsuming(): bool |
250
|
|
|
{ |
251
|
|
|
if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) { |
252
|
|
|
$this->logger->debug( |
253
|
|
|
"Stopped consumer", |
254
|
|
|
[ |
255
|
|
|
'limit' => 'time_limit', |
256
|
|
|
'value' => sprintf("%.2f", microtime(true) - $this->startTime) |
257
|
|
|
] |
258
|
|
|
); |
259
|
|
|
return true; |
260
|
|
|
} |
261
|
|
|
if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) { |
262
|
|
|
$this->logger->debug( |
263
|
|
|
"Stopped consumer", |
264
|
|
|
[ |
265
|
|
|
'limit' => 'memory_limit', |
266
|
|
|
'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2) |
267
|
|
|
] |
268
|
|
|
); |
269
|
|
|
return true; |
270
|
|
|
} |
271
|
|
|
|
272
|
|
|
if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) { |
273
|
|
|
$this->logger->debug( |
274
|
|
|
"Stopped consumer", |
275
|
|
|
['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()] |
276
|
|
|
); |
277
|
|
|
return true; |
278
|
|
|
} |
279
|
|
|
return false; |
280
|
|
|
} |
281
|
|
|
|
282
|
|
|
/** |
283
|
|
|
* Stop the consumer |
284
|
|
|
*/ |
285
|
|
|
public function stopConsuming() |
286
|
|
|
{ |
287
|
|
|
$this->logger->debug("Stopping consumer!"); |
288
|
|
|
$this->getChannel()->basic_cancel($this->getConsumerTag(), false, true); |
289
|
|
|
} |
290
|
|
|
|
291
|
|
|
/** |
292
|
|
|
* Setup the consumer |
293
|
|
|
* |
294
|
|
|
* @param int $messages |
295
|
|
|
* @param int $seconds |
296
|
|
|
* @param int $maxMemory |
297
|
|
|
*/ |
298
|
|
|
protected function setupConsumer(int $messages, int $seconds, int $maxMemory) |
299
|
|
|
{ |
300
|
|
|
$this->limitMessageCount = $messages; |
301
|
|
|
$this->limitSecondsUptime = $seconds; |
302
|
|
|
$this->limitMemoryConsumption = $maxMemory; |
303
|
|
|
|
304
|
|
|
$this->startTime = microtime(true); |
305
|
|
|
|
306
|
|
|
$this->setupChannelConsumer(); |
307
|
|
|
|
308
|
|
|
$this->registerShutdownHandler(); |
309
|
|
|
$this->handleKillSignals(); |
310
|
|
|
} |
311
|
|
|
|
312
|
|
|
private function setupChannelConsumer() |
313
|
|
|
{ |
314
|
|
|
$this->getChannel() |
315
|
|
|
->basic_qos(null, $this->prefetchCount, true); |
316
|
|
|
|
317
|
|
|
$this->getChannel() |
318
|
|
|
->basic_consume( |
319
|
|
|
$this->attributes['name'], |
320
|
|
|
$this->getConsumerTag(), |
321
|
|
|
false, |
322
|
|
|
false, |
323
|
|
|
false, |
324
|
|
|
false, |
325
|
|
|
[ |
326
|
|
|
$this, |
327
|
|
|
'consume' |
328
|
|
|
] |
329
|
|
|
); |
330
|
|
|
} |
331
|
|
|
|
332
|
|
|
/** |
333
|
|
|
* Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted" |
334
|
|
|
*/ |
335
|
|
|
private function registerShutdownHandler() |
336
|
|
|
{ |
337
|
|
|
$consumer = $this; |
338
|
|
|
register_shutdown_function(function () use ($consumer) { |
339
|
|
|
$consumer->stopConsuming(); |
340
|
|
|
}); |
341
|
|
|
} |
342
|
|
|
|
343
|
|
|
/** |
344
|
|
|
* Register signals |
345
|
|
|
*/ |
346
|
|
|
private function handleKillSignals() |
347
|
|
|
{ |
348
|
|
|
if (extension_loaded('pcntl')) { |
349
|
|
|
pcntl_signal(SIGTERM, [$this, 'catchKillSignal']); |
350
|
|
|
pcntl_signal(SIGKILL, [$this, 'catchKillSignal']); |
351
|
|
|
pcntl_signal(SIGSTOP, [$this, 'catchKillSignal']); |
352
|
|
|
|
353
|
|
|
if (function_exists('pcntl_signal_dispatch')) { |
354
|
|
|
// let the signal go forward |
355
|
|
|
pcntl_signal_dispatch(); |
356
|
|
|
} |
357
|
|
|
} |
358
|
|
|
} |
359
|
|
|
|
360
|
|
|
/** |
361
|
|
|
* Handle Kill Signals |
362
|
|
|
* @param int $signalNumber |
363
|
|
|
*/ |
364
|
|
|
private function catchKillSignal(int $signalNumber) |
365
|
|
|
{ |
366
|
|
|
$this->stopConsuming(); |
367
|
|
|
$this->logger->debug(sprintf("Caught signal %d", $signalNumber)); |
368
|
|
|
} |
369
|
|
|
|
370
|
|
|
/** |
371
|
|
|
* It is the tag that is listed in RabbitMQ UI as the consumer "name" |
372
|
|
|
* |
373
|
|
|
* @return string |
374
|
|
|
*/ |
375
|
|
|
private function getConsumerTag(): string |
376
|
|
|
{ |
377
|
|
|
return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid()); |
378
|
|
|
} |
379
|
|
|
|
380
|
|
|
/** |
381
|
|
|
* @return MessageProcessorInterface |
382
|
|
|
*/ |
383
|
|
|
private function getMessageProcessor(): MessageProcessorInterface |
384
|
|
|
{ |
385
|
|
|
if (!($this->messageProcessor instanceof MessageProcessorInterface)) { |
386
|
|
|
$this->messageProcessor = app($this->messageProcessor); |
387
|
|
|
} |
388
|
|
|
return $this->messageProcessor; |
389
|
|
|
} |
390
|
|
|
|
391
|
|
|
/** |
392
|
|
|
* @param AMQPMessage $message |
393
|
|
|
* @throws \Throwable |
394
|
|
|
*/ |
395
|
|
|
public function consume(AMQPMessage $message) |
396
|
|
|
{ |
397
|
|
|
$this->logger->debug("Consumed message", [$message->getBody()]); |
398
|
|
|
try { |
399
|
|
|
$this->getMessageProcessor()->consume($message); |
400
|
|
|
} catch (\Throwable $e) { |
401
|
|
|
$this->logger->notice( |
402
|
|
|
sprintf( |
403
|
|
|
"Got %s from %s in %d", |
404
|
|
|
$e->getMessage(), |
405
|
|
|
(string)$e->getFile(), |
406
|
|
|
(int)$e->getLine() |
407
|
|
|
) |
408
|
|
|
); |
409
|
|
|
// let the exception slide, the processor should handle |
410
|
|
|
// exception, this is just a notice that should not |
411
|
|
|
// ever appear |
412
|
|
|
throw $e; |
413
|
|
|
} |
414
|
|
|
} |
415
|
|
|
} |
416
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.