1 | <?php |
||
2 | /** |
||
3 | * Author: Joker |
||
4 | * Date: 2020-05-08 13:57 |
||
5 | */ |
||
6 | |||
7 | namespace JokerProject\LaravelAliyunAmqp\Entity; |
||
8 | |||
9 | use JokerProject\LaravelAliyunAmqp\AMQPConnection; |
||
10 | use JokerProject\LaravelAliyunAmqp\ConsumerInterface; |
||
11 | use JokerProject\LaravelAliyunAmqp\Processor\AbstractMessageProcessor; |
||
12 | use JokerProject\LaravelAliyunAmqp\Processor\MessageProcessorInterface; |
||
13 | use JokerProject\LaravelAliyunAmqp\PublisherInterface; |
||
14 | use PhpAmqpLib\Channel\AMQPChannel; |
||
15 | use PhpAmqpLib\Exception\AMQPProtocolChannelException; |
||
16 | use PhpAmqpLib\Exception\AMQPTimeoutException; |
||
17 | use PhpAmqpLib\Message\AMQPMessage; |
||
18 | use Psr\Log\LoggerAwareInterface; |
||
19 | use Psr\Log\LoggerAwareTrait; |
||
20 | use PhpAmqpLib\Exception\AMQPChannelClosedException; |
||
21 | |||
22 | /** |
||
23 | * Class QueueEntity |
||
24 | * |
||
25 | * @package JokerProject\LaravelAliyunAmqp\Entity |
||
26 | */ |
||
27 | class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityInterface, LoggerAwareInterface |
||
28 | { |
||
29 | use LoggerAwareTrait; |
||
30 | |||
31 | /** |
||
32 | * @const int Retry count when a Channel Closed exeption is thrown |
||
33 | */ |
||
34 | const MAX_RETRIES = 3; |
||
35 | |||
36 | /** |
||
37 | * @const array Default connections parameters |
||
38 | */ |
||
39 | const DEFAULTS = [ |
||
40 | // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED) |
||
41 | 'passive' => false, |
||
42 | // Entities with durable will be re-created uppon server restart |
||
43 | 'durable' => false, |
||
44 | // whether to use it by only one channel, then it gets deleted |
||
45 | 'exclusive' => false, |
||
46 | // Whether to delete it when the queue has no event on it |
||
47 | 'auto_delete' => false, |
||
48 | // Whether the exchange can be used by a publisher or block it (declared just for internal "wiring") |
||
49 | 'internal' => false, |
||
50 | // Whether to receive a Declare confirmation |
||
51 | 'nowait' => false, |
||
52 | // Whether to auto create the entity before publishing/consuming it |
||
53 | 'auto_create' => false, |
||
54 | // whether to "hide" the exception on re-declare. |
||
55 | // if the `passive` filter is set true, this is redundant |
||
56 | 'throw_exception_on_redeclare' => true, |
||
57 | // whether to throw on exception when trying to |
||
58 | // bind to an in-existent queue/exchange |
||
59 | 'throw_exception_on_bind_fail' => true, |
||
60 | ]; |
||
61 | |||
62 | /** |
||
63 | * @var AMQPConnection |
||
64 | */ |
||
65 | protected $connection; |
||
66 | |||
67 | /** |
||
68 | * @var string |
||
69 | */ |
||
70 | protected $aliasName; |
||
71 | |||
72 | /** |
||
73 | * @var array |
||
74 | */ |
||
75 | protected $attributes; |
||
76 | |||
77 | /** |
||
78 | * @var int |
||
79 | */ |
||
80 | protected $prefetchCount = 1; |
||
81 | |||
82 | /** |
||
83 | * @var null|string|MessageProcessorInterface |
||
84 | */ |
||
85 | protected $messageProcessor = null; |
||
86 | |||
87 | /** |
||
88 | * @var int |
||
89 | */ |
||
90 | protected $limitMessageCount; |
||
91 | |||
92 | /** |
||
93 | * @var int |
||
94 | */ |
||
95 | protected $limitSecondsUptime; |
||
96 | |||
97 | /** |
||
98 | * @var int |
||
99 | */ |
||
100 | protected $limitMemoryConsumption; |
||
101 | |||
102 | /** |
||
103 | * @var double |
||
104 | */ |
||
105 | protected $startTime = 0; |
||
106 | |||
107 | /** |
||
108 | * @var int |
||
109 | */ |
||
110 | protected $retryCount = 0; |
||
111 | |||
112 | /** |
||
113 | * @param AMQPConnection $connection |
||
114 | * @param string $aliasName |
||
115 | * @param array $exchangeDetails |
||
116 | * @return QueueEntity |
||
117 | */ |
||
118 | public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails) |
||
119 | { |
||
120 | return new static( |
||
121 | $connection, |
||
122 | $aliasName, |
||
123 | array_merge(self::DEFAULTS, $exchangeDetails) |
||
124 | ); |
||
125 | } |
||
126 | |||
127 | /** |
||
128 | * @return string |
||
129 | */ |
||
130 | public function getAliasName(): string |
||
131 | { |
||
132 | return $this->aliasName; |
||
133 | } |
||
134 | |||
135 | /** |
||
136 | * ExchangeEntity constructor. |
||
137 | * |
||
138 | * @param AMQPConnection $connection |
||
139 | * @param string $aliasName |
||
140 | * @param array $attributes |
||
141 | */ |
||
142 | public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = []) |
||
143 | { |
||
144 | $this->connection = $connection; |
||
145 | $this->aliasName = $aliasName; |
||
146 | $this->attributes = $attributes; |
||
147 | } |
||
148 | |||
149 | /** |
||
150 | * @param int $prefetchCount |
||
151 | * @return ConsumerInterface |
||
152 | */ |
||
153 | public function setPrefetchCount(int $prefetchCount): ConsumerInterface |
||
154 | { |
||
155 | $this->prefetchCount = $prefetchCount; |
||
156 | return $this; |
||
157 | } |
||
158 | |||
159 | /** |
||
160 | * @param string $messageProcessor |
||
161 | * @return ConsumerInterface |
||
162 | */ |
||
163 | public function setMessageProcessor(string $messageProcessor): ConsumerInterface |
||
164 | { |
||
165 | $this->messageProcessor = $messageProcessor; |
||
166 | return $this; |
||
167 | } |
||
168 | |||
169 | /** |
||
170 | * @return AMQPConnection |
||
171 | */ |
||
172 | protected function getConnection(): AMQPConnection |
||
173 | { |
||
174 | return $this->connection; |
||
175 | } |
||
176 | |||
177 | /** |
||
178 | * @return AMQPChannel |
||
179 | */ |
||
180 | protected function getChannel(): AMQPChannel |
||
181 | { |
||
182 | return $this->getConnection()->getChannel(); |
||
183 | } |
||
184 | |||
185 | /** |
||
186 | * Create the Queue |
||
187 | */ |
||
188 | public function create() |
||
189 | { |
||
190 | try { |
||
191 | $this->getChannel() |
||
192 | ->queue_declare( |
||
193 | $this->attributes['name'], |
||
194 | $this->attributes['passive'], |
||
195 | $this->attributes['durable'], |
||
196 | $this->attributes['exclusive'], |
||
197 | $this->attributes['auto_delete'], |
||
198 | $this->attributes['internal'], |
||
199 | $this->attributes['nowait'] |
||
200 | ); |
||
201 | } catch (AMQPProtocolChannelException $e) { |
||
202 | // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters) |
||
203 | if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) { |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
204 | throw $e; |
||
205 | } |
||
206 | // a failure trigger channels closing process |
||
207 | $this->reconnect(); |
||
208 | } |
||
209 | } |
||
210 | |||
211 | public function bind() |
||
212 | { |
||
213 | if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) { |
||
214 | return; |
||
215 | } |
||
216 | foreach ($this->attributes['bind'] as $bindItem) { |
||
217 | try { |
||
218 | $this->getChannel() |
||
219 | ->queue_bind( |
||
220 | $this->attributes['name'], |
||
221 | $bindItem['exchange'], |
||
222 | $bindItem['routing_key'] |
||
223 | ); |
||
224 | } catch (AMQPProtocolChannelException $e) { |
||
225 | // 404 is the code for trying to bind to an non-existing entity |
||
226 | if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) { |
||
227 | throw $e; |
||
228 | } |
||
229 | $this->reconnect(); |
||
230 | } |
||
231 | } |
||
232 | } |
||
233 | |||
234 | /** |
||
235 | * Delete the queue |
||
236 | */ |
||
237 | public function delete() |
||
238 | { |
||
239 | $this->getChannel()->queue_delete($this->attributes['name']); |
||
240 | } |
||
241 | |||
242 | /** |
||
243 | * {@inheritdoc} |
||
244 | */ |
||
245 | public function reconnect() |
||
246 | { |
||
247 | $this->getConnection()->reconnect(); |
||
248 | } |
||
249 | |||
250 | /** |
||
251 | * Publish a message |
||
252 | * |
||
253 | * @param string $message |
||
254 | * @param string $routingKey |
||
255 | * @return mixed|void |
||
256 | * @throws AMQPProtocolChannelException |
||
257 | */ |
||
258 | public function publish(string $message, string $routingKey = '') |
||
259 | { |
||
260 | if ($this->attributes['auto_create'] === true) { |
||
261 | $this->create(); |
||
262 | $this->bind(); |
||
263 | } |
||
264 | |||
265 | try { |
||
266 | if (!$this->getChannel()->is_open()) { |
||
267 | $this->getConnection()->reconnect(); |
||
268 | } |
||
269 | $channel = $this->getChannel(); |
||
270 | $channel->basic_publish( |
||
271 | new AMQPMessage($message), |
||
272 | '', |
||
273 | $this->attributes['name'], |
||
274 | true |
||
275 | ); |
||
276 | $this->retryCount = 0; |
||
277 | $this->connection->close(); |
||
278 | } catch (AMQPChannelClosedException $exception) { |
||
279 | $this->retryCount++; |
||
280 | // Retry publishing with re-connect |
||
281 | if ($this->retryCount < self::MAX_RETRIES) { |
||
282 | $this->getConnection()->reconnect(); |
||
283 | $this->publish($message, $routingKey); |
||
284 | $this->connection->close(); |
||
285 | return; |
||
286 | } |
||
287 | throw $exception; |
||
288 | } |
||
289 | } |
||
290 | |||
291 | /** |
||
292 | * {@inheritdoc} |
||
293 | * |
||
294 | * @param int $messages |
||
295 | * @param int $seconds |
||
296 | * @param int $maxMemory |
||
297 | * @return int |
||
298 | */ |
||
299 | public function startConsuming(int $messages, int $seconds, int $maxMemory) |
||
300 | { |
||
301 | $this->setupConsumer($messages, $seconds, $maxMemory); |
||
302 | while (false === $this->shouldStopConsuming()) { |
||
303 | try { |
||
304 | $this->getChannel()->wait(); |
||
305 | } catch (AMQPTimeoutException $e) { |
||
306 | $this->logger->debug('startConsuming Exception--->'.$e->getMessage()); |
||
307 | usleep(1000); |
||
308 | $this->getConnection()->reconnect(); |
||
309 | $this->setupChannelConsumer(); |
||
310 | } catch (\Throwable $e) { |
||
311 | // stop the consumer |
||
312 | $this->stopConsuming(); |
||
313 | $this->logger->notice(sprintf( |
||
314 | "Stopped consuming: %s in %s:%d", |
||
315 | get_class($e) . ' - ' . $e->getMessage(), |
||
316 | (string)$e->getFile(), |
||
317 | (int)$e->getLine() |
||
318 | )); |
||
319 | return 1; |
||
320 | } |
||
321 | } |
||
322 | return 0; |
||
323 | } |
||
324 | |||
325 | /** |
||
326 | * @return bool |
||
327 | */ |
||
328 | protected function shouldStopConsuming(): bool |
||
329 | { |
||
330 | if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) { |
||
331 | $this->logger->debug( |
||
332 | "Stopped consumer, Please make sure you are using supervisor to auto restart consumer", |
||
333 | [ |
||
334 | 'limit' => 'time_limit', |
||
335 | 'value' => sprintf("%.2f", microtime(true) - $this->startTime) |
||
336 | ] |
||
337 | ); |
||
338 | return true; |
||
339 | } |
||
340 | if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) { |
||
341 | $this->logger->debug( |
||
342 | "Stopped consumer, Please make sure you are using supervisor to auto restart consumer", |
||
343 | [ |
||
344 | 'limit' => 'memory_limit', |
||
345 | 'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2) |
||
346 | ] |
||
347 | ); |
||
348 | return true; |
||
349 | } |
||
350 | |||
351 | if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) { |
||
352 | $this->logger->debug( |
||
353 | "Stopped consumer, Please make sure you are using supervisor to auto restart consumer", |
||
354 | ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()] |
||
355 | ); |
||
356 | return true; |
||
357 | } |
||
358 | return false; |
||
359 | } |
||
360 | |||
361 | /** |
||
362 | * Stop the consumer |
||
363 | */ |
||
364 | public function stopConsuming() |
||
365 | { |
||
366 | try { |
||
367 | $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true); |
||
368 | } catch (\Throwable $e) { |
||
369 | $this->logger->notice("Got " . $e->getMessage() . " of type " . get_class($e)); |
||
370 | } |
||
371 | } |
||
372 | |||
373 | /** |
||
374 | * Setup the consumer |
||
375 | * |
||
376 | * @param int $messages |
||
377 | * @param int $seconds |
||
378 | * @param int $maxMemory |
||
379 | */ |
||
380 | protected function setupConsumer(int $messages, int $seconds, int $maxMemory) |
||
381 | { |
||
382 | $this->limitMessageCount = $messages; |
||
383 | $this->limitSecondsUptime = $seconds; |
||
384 | $this->limitMemoryConsumption = $maxMemory; |
||
385 | |||
386 | $this->startTime = microtime(true); |
||
387 | |||
388 | $this->setupChannelConsumer(); |
||
389 | |||
390 | $this->registerShutdownHandler(); |
||
391 | $this->handleKillSignals(); |
||
392 | } |
||
393 | |||
394 | private function setupChannelConsumer() |
||
395 | { |
||
396 | if ($this->attributes['auto_create'] === true) { |
||
397 | $this->create(); |
||
398 | $this->bind(); |
||
399 | } |
||
400 | |||
401 | $this->getChannel() |
||
402 | ->basic_qos(null, $this->prefetchCount, true); |
||
403 | |||
404 | $this->getChannel() |
||
405 | ->basic_consume( |
||
406 | $this->attributes['name'], |
||
407 | $this->getConsumerTag(), |
||
408 | false, |
||
409 | false, |
||
410 | false, |
||
411 | false, |
||
412 | [ |
||
413 | $this, |
||
414 | 'consume' |
||
415 | ] |
||
416 | ); |
||
417 | } |
||
418 | |||
419 | /** |
||
420 | * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted" |
||
421 | */ |
||
422 | private function registerShutdownHandler() |
||
423 | { |
||
424 | $consumer = $this; |
||
425 | register_shutdown_function(function () use ($consumer) { |
||
426 | $consumer->stopConsuming(); |
||
427 | }); |
||
428 | } |
||
429 | |||
430 | /** |
||
431 | * Register signals |
||
432 | */ |
||
433 | protected function handleKillSignals() |
||
434 | { |
||
435 | if (extension_loaded('pcntl')) { |
||
436 | pcntl_signal(SIGTERM, [$this, 'catchKillSignal']); |
||
437 | pcntl_signal(SIGINT, [$this, 'catchKillSignal']); |
||
438 | |||
439 | if (function_exists('pcntl_signal_dispatch')) { |
||
440 | // let the signal go forward |
||
441 | pcntl_signal_dispatch(); |
||
442 | } |
||
443 | } |
||
444 | } |
||
445 | |||
446 | /** |
||
447 | * Handle Kill Signals |
||
448 | * @param int $signalNumber |
||
449 | */ |
||
450 | public function catchKillSignal(int $signalNumber) |
||
451 | { |
||
452 | $this->stopConsuming(); |
||
453 | $this->logger->debug(sprintf("Caught signal %d", $signalNumber)); |
||
454 | } |
||
455 | |||
456 | /** |
||
457 | * It is the tag that is listed in RabbitMQ UI as the consumer "name" |
||
458 | * |
||
459 | * @return string |
||
460 | */ |
||
461 | private function getConsumerTag(): string |
||
462 | { |
||
463 | return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid()); |
||
464 | } |
||
465 | |||
466 | /** |
||
467 | * @return MessageProcessorInterface |
||
468 | */ |
||
469 | private function getMessageProcessor(): MessageProcessorInterface |
||
470 | { |
||
471 | if (!($this->messageProcessor instanceof MessageProcessorInterface)) { |
||
472 | $this->messageProcessor = app($this->messageProcessor); |
||
0 ignored issues
–
show
It seems like
app($this->messageProcessor) can also be of type Illuminate\Contracts\Foundation\Application . However, the property $messageProcessor is declared as type JokerProject\LaravelAliy...orInterface|null|string . Maybe add an additional type check?
Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly. For example, imagine you have a variable Either this assignment is in error or a type check should be added for that assignment. class Id
{
public $id;
public function __construct($id)
{
$this->id = $id;
}
}
class Account
{
/** @var Id $id */
public $id;
}
$account_id = false;
if (starsAreRight()) {
$account_id = new Id(42);
}
$account = new Account();
if ($account instanceof Id)
{
$account->id = $account_id;
}
![]() |
|||
473 | if ($this->messageProcessor instanceof AbstractMessageProcessor) { |
||
474 | $this->messageProcessor->setLogger($this->logger); |
||
475 | } |
||
476 | } |
||
477 | return $this->messageProcessor; |
||
478 | } |
||
479 | |||
480 | /** |
||
481 | * @param AMQPMessage $message |
||
482 | * @throws \Throwable |
||
483 | */ |
||
484 | public function consume(AMQPMessage $message) |
||
485 | { |
||
486 | try { |
||
487 | $this->getMessageProcessor()->consume($message); |
||
488 | $this->logger->debug("Consumed message", [$message->getBody()]); |
||
489 | } catch (\Throwable $e) { |
||
490 | $this->logger->notice( |
||
491 | sprintf( |
||
492 | "Got %s from %s in %d", |
||
493 | $e->getMessage(), |
||
494 | (string)$e->getFile(), |
||
495 | (int)$e->getLine() |
||
496 | ) |
||
497 | ); |
||
498 | // let the exception slide, the processor should handle |
||
499 | // exception, this is just a notice that should not |
||
500 | // ever appear |
||
501 | throw $e; |
||
502 | } |
||
503 | } |
||
504 | } |
||
505 |