This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace Drupal\rabbitmq; |
||
4 | |||
5 | use Drupal\Core\Logger\LoggerChannelInterface; |
||
6 | use Drupal\Core\Queue\QueueFactory; |
||
7 | use Drupal\Core\Queue\QueueWorkerInterface; |
||
8 | use Drupal\Core\Queue\QueueWorkerManagerInterface; |
||
9 | use Drupal\Core\Queue\RequeueException; |
||
10 | use Drupal\Core\Queue\SuspendQueueException; |
||
11 | use Drupal\Core\StringTranslation\StringTranslationTrait; |
||
12 | use Drupal\Core\Url; |
||
13 | use Drupal\rabbitmq\Exception\Exception; |
||
14 | use Drupal\rabbitmq\Exception\InvalidArgumentException; |
||
15 | use Drupal\rabbitmq\Exception\InvalidWorkerException; |
||
16 | use Drupal\rabbitmq\Exception\OutOfRangeException; |
||
17 | use Drupal\rabbitmq\Exception\RuntimeException; |
||
18 | use Drupal\rabbitmq\Queue\Queue; |
||
19 | use Drupal\rabbitmq\Queue\QueueBase; |
||
20 | use PhpAmqpLib\Channel\AMQPChannel; |
||
21 | use PhpAmqpLib\Exception\AMQPIOWaitException; |
||
22 | use PhpAmqpLib\Exception\AMQPOutOfRangeException; |
||
23 | use PhpAmqpLib\Exception\AMQPRuntimeException; |
||
24 | use PhpAmqpLib\Exception\AMQPTimeoutException; |
||
25 | use PhpAmqpLib\Message\AMQPMessage; |
||
26 | |||
27 | /** |
||
28 | * Class Consumer provides a service wrapping queue consuming operations. |
||
29 | * |
||
30 | * Note that it does not carray the value of its options, but getters for them, |
||
31 | * to support multiple ways of accessing options, e.g. Drush vs Console vs Web. |
||
32 | */ |
||
33 | class Consumer { |
||
34 | use StringTranslationTrait; |
||
35 | |||
36 | const EXTENSION_PCNTL = 'pcntl'; |
||
37 | |||
38 | const OPTION_MAX_ITERATIONS = 'max_iterations'; |
||
39 | const OPTION_MEMORY_LIMIT = 'memory_limit'; |
||
40 | const OPTION_TIMEOUT = 'rabbitmq_timeout'; |
||
41 | |||
42 | // Known option names and their default value. |
||
43 | const OPTIONS = [ |
||
44 | self::OPTION_MAX_ITERATIONS => 0, |
||
45 | self::OPTION_MEMORY_LIMIT => -1, |
||
46 | self::OPTION_TIMEOUT => NULL, |
||
47 | ]; |
||
48 | |||
49 | /** |
||
50 | * Continue listening ? |
||
51 | * |
||
52 | * @var bool |
||
53 | */ |
||
54 | protected $continueListening = FALSE; |
||
55 | |||
56 | /** |
||
57 | * The rabbitmq logger channel. |
||
58 | * |
||
59 | * @var \Drupal\Core\Logger\LoggerChannelInterface |
||
60 | */ |
||
61 | protected $logger; |
||
62 | |||
63 | /** |
||
64 | * A callback providing the ability to read service runtime options. |
||
65 | * |
||
66 | * This is needed to support non-Drush use scenarios. |
||
67 | * |
||
68 | * @var callable |
||
69 | */ |
||
70 | protected $optionGetter; |
||
71 | |||
72 | /** |
||
73 | * Was the Pre-Flight Check successful ? Yes | No |Â Not yet run. |
||
74 | * |
||
75 | * @var bool|null |
||
76 | */ |
||
77 | protected $pfcOk = NULL; |
||
78 | |||
79 | /** |
||
80 | * The queue service. |
||
81 | * |
||
82 | * @var \Drupal\Core\Queue\QueueFactory |
||
83 | */ |
||
84 | protected $queueFactory; |
||
85 | |||
86 | /** |
||
87 | * The plugin.manager.queue_worker service. |
||
88 | * |
||
89 | * @var \Drupal\Core\Queue\QueueWorkerManagerInterface |
||
90 | */ |
||
91 | protected $workerManager; |
||
92 | |||
93 | /** |
||
94 | * Consumer constructor. |
||
95 | * |
||
96 | * @param \Drupal\Core\Queue\QueueWorkerManagerInterface $workerManager |
||
97 | * The plugin.manager.queue_worker service. |
||
98 | * @param \Drupal\Core\Queue\QueueFactory $queueFactory |
||
99 | * The queue service. |
||
100 | * @param \Drupal\Core\Logger\LoggerChannelInterface $logger |
||
101 | * The rabbitmq logger channel. |
||
102 | */ |
||
103 | public function __construct( |
||
104 | QueueWorkerManagerInterface $workerManager, |
||
105 | QueueFactory $queueFactory, |
||
106 | LoggerChannelInterface $logger |
||
107 | ) { |
||
108 | $this->logger = $logger; |
||
109 | $this->queueFactory = $queueFactory; |
||
110 | $this->workerManager = $workerManager; |
||
111 | } |
||
112 | |||
113 | /** |
||
114 | * Is the queue name valid ? |
||
115 | * |
||
116 | * @param string $queueName |
||
117 | * The requested name. |
||
118 | * |
||
119 | * @return bool |
||
120 | * Is is valid? |
||
121 | */ |
||
122 | public function isQueueNameValid(string $queueName): bool { |
||
123 | $workers = $this->workerManager->getDefinitions(); |
||
124 | if (!isset($workers[$queueName])) { |
||
125 | return drush_set_error('rabbitmq', $this->t('No known worker for queue @queue', [ |
||
126 | '@queue' => $queueName, |
||
127 | ])); |
||
128 | } |
||
129 | } |
||
130 | |||
131 | /** |
||
132 | * Decode the data received from the queue using a chain of decoder choices. |
||
133 | * |
||
134 | * - 1st/2nd choices: the one already set on the service instance |
||
135 | * - 1st: set on the service instance manually during or after construction. |
||
136 | * - 2nd: the one set on the service instance within consume() if the |
||
137 | * worker implements DecoderAwareInterface. |
||
138 | * - 3rd choice: a legacy-compatible JSON decoder. |
||
139 | * |
||
140 | * @param mixed $data |
||
141 | * The message payload to decode. |
||
142 | * |
||
143 | * @return mixed |
||
144 | * The decoded value. |
||
145 | */ |
||
146 | public function decode($data) { |
||
147 | if (isset($this->decoder)) { |
||
148 | return $this->decoder($data); |
||
149 | } |
||
150 | else { |
||
151 | return json_decode($data, TRUE); |
||
152 | } |
||
153 | } |
||
154 | |||
155 | /** |
||
156 | * Get the value of a queue consumer option. |
||
157 | * |
||
158 | * @param string $name |
||
159 | * The name of the option. |
||
160 | * |
||
161 | * @return mixed |
||
162 | * The value returned by the configured option getter, or NULL if the option |
||
163 | * is unknown. |
||
164 | */ |
||
165 | public function getOption(string $name) { |
||
166 | if (!array_key_exists($name, static::OPTIONS)) { |
||
167 | return NULL; |
||
168 | } |
||
169 | $getter = $this->optionGetter; |
||
170 | return is_callable($getter) ? $getter($name) : NULL; |
||
171 | } |
||
172 | |||
173 | /** |
||
174 | * Log an event about the queue run. |
||
175 | */ |
||
176 | public function logStart() { |
||
177 | $this->preFlightCheck(); |
||
178 | $maxIterations = $this->getOption(self::OPTION_MAX_ITERATIONS); |
||
179 | if ($maxIterations > 0) { |
||
180 | $readyMessage = "RabbitMQ worker ready to receive up to @count messages."; |
||
181 | $readyArgs = ['@count' => $maxIterations]; |
||
182 | } |
||
183 | else { |
||
184 | $readyMessage = "RabbitMQ worker ready to receive an unlimited number of messages."; |
||
185 | $readyArgs = []; |
||
186 | } |
||
187 | $this->logger->debug($readyMessage, $readyArgs); |
||
188 | } |
||
189 | |||
190 | /** |
||
191 | * Signal handler. |
||
192 | * |
||
193 | * @see \Drupal\rabbitmq\Consumer::consume() |
||
194 | * |
||
195 | * On a timeout signal, the connections is already closed, so do not attempt |
||
196 | * to shutdown the queue. |
||
197 | */ |
||
198 | public function onTimeout() { |
||
199 | $this->logger->info('Timeout reached'); |
||
200 | $this->stopListening(); |
||
201 | } |
||
202 | |||
203 | /** |
||
204 | * Main logic: consume the specified queue using AMQP. |
||
205 | * |
||
206 | * @param string $queueName |
||
207 | * The name of the queue to consume. |
||
208 | * |
||
209 | * @throws \Exception |
||
210 | */ |
||
211 | public function consume(string $queueName) { |
||
212 | $this->preFlightCheck(); |
||
213 | $this->startListening(); |
||
214 | $worker = $this->getWorker($queueName); |
||
215 | // Allow obtaining a decoder from the worker to have a sane default, while |
||
216 | // being able to override it on service instantiation. |
||
217 | if ($worker instanceof DecoderAwareWorkerInterface && !isset($this->decoder)) { |
||
218 | $this->setDecoder($worker->getDecoder()); |
||
219 | } |
||
220 | |||
221 | /* @var \Drupal\rabbitmq\Queue\queue $queue */ |
||
222 | $queue = $this->queueFactory->get($queueName); |
||
223 | assert($queue instanceof Queue); |
||
224 | |||
225 | $channel = $this->getChannel($queue); |
||
226 | assert($channel instanceof AMQPChannel); |
||
227 | $channel->basic_qos(NULL, 1, FALSE); |
||
228 | |||
229 | $maxIterations = $this->getOption(self::OPTION_MAX_ITERATIONS); |
||
230 | $memoryLimit = $this->getOption(self::OPTION_MEMORY_LIMIT); |
||
231 | $timeout = $this->getOption(self::OPTION_TIMEOUT); |
||
232 | if ($timeout) { |
||
233 | pcntl_signal(SIGALRM, [$this, 'onTimeout']); |
||
234 | } |
||
235 | $callback = $this->getCallback($worker, $queueName, $timeout); |
||
236 | |||
237 | while ($this->continueListening) { |
||
238 | try { |
||
239 | $channel->basic_consume($queueName, '', FALSE, FALSE, FALSE, FALSE, $callback); |
||
240 | |||
241 | // Begin listening for messages to process. |
||
242 | $iteration = 0; |
||
243 | while (count($channel->callbacks) && $this->continueListening) { |
||
244 | if ($timeout) { |
||
245 | pcntl_alarm($timeout); |
||
246 | } |
||
247 | $channel->wait(NULL, FALSE, $timeout); |
||
248 | if ($timeout) { |
||
249 | pcntl_alarm(0); |
||
250 | } |
||
251 | |||
252 | // Break on memory_limit reached. |
||
253 | if ($this->hitMemoryLimit($memoryLimit)) { |
||
254 | $this->stopListening(); |
||
255 | break; |
||
256 | } |
||
257 | |||
258 | // Break on max_iterations reached. |
||
259 | $iteration++; |
||
260 | if ($this->hitIterationsLimit($maxIterations, $iteration)) { |
||
261 | $this->stopListening(); |
||
262 | } |
||
263 | } |
||
264 | $this->stopListening(); |
||
265 | } |
||
266 | catch (AMQPIOWaitException $e) { |
||
267 | $this->stopListening(); |
||
268 | $channel->close(); |
||
269 | } |
||
270 | catch (AMQPTimeoutException $e) { |
||
271 | $this->startListening(); |
||
272 | } |
||
273 | catch (Exception $e) { |
||
274 | throw new Exception('Could not obtain channel for queue.', 0, $e); |
||
275 | } |
||
276 | } |
||
277 | } |
||
278 | |||
279 | /** |
||
280 | * Main logic: consume the specified queue using Queue API. |
||
281 | * |
||
282 | * @param string $queueName |
||
283 | * The name of the queue to consume. |
||
284 | * |
||
285 | * @throws \Exception |
||
286 | * |
||
287 | * @TODO Probably needs to do more on SuspendQueueException. |
||
288 | */ |
||
289 | public function consumeQueueApi(string $queueName) { |
||
290 | $this->preFlightCheck(); |
||
291 | $this->startListening(); |
||
292 | $worker = $this->getWorker($queueName); |
||
293 | // Allow obtaining a decoder from the worker to have a sane default, while |
||
294 | // being able to override it on service instantiation. |
||
295 | if ($worker instanceof DecoderAwareWorkerInterface && !isset($this->decoder)) { |
||
296 | $this->setDecoder($worker->getDecoder()); |
||
297 | } |
||
298 | |||
299 | /* @var \Drupal\rabbitmq\Queue\queue $queue */ |
||
300 | $queue = $this->queueFactory->get($queueName); |
||
301 | assert($queue instanceof Queue); |
||
302 | |||
303 | $maxIterations = $this->getOption(self::OPTION_MAX_ITERATIONS); |
||
304 | $memoryLimit = $this->getOption(self::OPTION_MEMORY_LIMIT); |
||
305 | $timeout = $this->getOption(self::OPTION_TIMEOUT); |
||
306 | if (!empty($timeout)) { |
||
307 | pcntl_signal(SIGALRM, [$this, 'onTimeout']); |
||
308 | } |
||
309 | else { |
||
310 | $timeout = 0; |
||
311 | } |
||
312 | |||
313 | $iteration = 0; |
||
314 | $startTime = microtime(TRUE); |
||
315 | do { |
||
316 | $item = NULL; |
||
0 ignored issues
–
show
|
|||
317 | if ($timeout) { |
||
318 | pcntl_alarm($timeout); |
||
319 | $item = $queue->claimItem(); |
||
320 | pcntl_alarm(0); |
||
321 | } |
||
322 | else { |
||
323 | $item = $queue->claimItem(); |
||
324 | } |
||
325 | |||
326 | // Break on memory_limit reached before process. |
||
327 | if ($this->hitMemoryLimit($memoryLimit)) { |
||
328 | $this->stopListening(); |
||
329 | break; |
||
330 | } |
||
331 | |||
332 | $currentTime = microtime(TRUE); |
||
333 | // If we did not get an object, do not try to process it. |
||
334 | if (!is_object($item)) { |
||
335 | usleep(10); |
||
336 | // Only loop if the current continuous wait did not exceed timeout. |
||
337 | if ($currentTime > $startTime + $timeout) { |
||
338 | break; |
||
339 | } |
||
340 | else { |
||
341 | continue; |
||
342 | } |
||
343 | } |
||
344 | |||
345 | // We got a normal item, try to handle it. |
||
346 | try { |
||
347 | // Call the queue worker. |
||
348 | $worker->processItem($item->data); |
||
349 | |||
350 | // Remove the item from the queue. |
||
351 | $queue->deleteItem($item); |
||
352 | $this->logger->debug('(Drush) Item @id acknowledged from @queue', [ |
||
353 | '@id' => $item->id, |
||
354 | '@queue' => $queueName, |
||
355 | ]); |
||
356 | } |
||
357 | // Reserved QueueAPI exception: releaseItem and continue work. |
||
358 | catch (RequeueException $e) { |
||
0 ignored issues
–
show
The class
Drupal\Core\Queue\RequeueException does not exist. Did you forget a USE statement, or did you not list all dependencies?
Scrutinizer analyzes your It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis. ![]() |
|||
359 | $queue->releaseItem($item); |
||
360 | $this->logger->debug('(Drush) Item @id put back on @queue', [ |
||
361 | '@id' => $item->id, |
||
362 | '@queue' => $queueName, |
||
363 | ]); |
||
364 | } |
||
365 | // Reserved QueueAPI exception: stop working on this queue. |
||
366 | catch (SuspendQueueException $e) { |
||
0 ignored issues
–
show
The class
Drupal\Core\Queue\SuspendQueueException does not exist. Did you forget a USE statement, or did you not list all dependencies?
Scrutinizer analyzes your It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis. ![]() |
|||
367 | $queue->releaseItem($item); |
||
368 | $this->stopListening(); |
||
369 | } |
||
370 | // Restart wait period: we handled a valid item. |
||
371 | $startTime = microtime(TRUE); |
||
372 | |||
373 | // Break on memory_limit reached after process. |
||
374 | if ($this->hitMemoryLimit($memoryLimit)) { |
||
375 | $this->stopListening(); |
||
376 | break; |
||
377 | } |
||
378 | |||
379 | // Break on max_iterations reached. Only count actual items. |
||
380 | $iteration++; |
||
381 | if ($this->hitIterationsLimit($maxIterations, $iteration)) { |
||
382 | $this->stopListening(); |
||
383 | } |
||
384 | } while ($this->continueListening); |
||
385 | } |
||
386 | |||
387 | /** |
||
388 | * Provide a message callback for events. |
||
389 | * |
||
390 | * @param \Drupal\Core\Queue\QueueWorkerInterface $worker |
||
391 | * The worker plugin. |
||
392 | * @param string $queueName |
||
393 | * The queue name. |
||
394 | * @param int $timeout |
||
395 | * The queue wait timeout. Since it is only for queue wait, not worker wait, |
||
396 | * it has to be reset before starting work, and reinitialized when ending |
||
397 | * work. |
||
398 | * |
||
399 | * @return \Closure |
||
400 | * The callback. |
||
401 | */ |
||
402 | protected function getCallback( |
||
403 | QueueWorkerInterface $worker, |
||
404 | string $queueName, |
||
405 | int $timeout = 0 |
||
406 | ): \Closure { |
||
407 | $callback = function (AMQPMessage $msg) use ($worker, $queueName, $timeout) { |
||
408 | if ($timeout) { |
||
409 | pcntl_alarm(0); |
||
410 | } |
||
411 | $this->logger->info('(Drush) Received queued message: @id', [ |
||
412 | '@id' => $msg->delivery_info['delivery_tag'], |
||
413 | ]); |
||
414 | |||
415 | try { |
||
416 | // Build the item to pass to the queue worker. |
||
417 | $item = (object) [ |
||
418 | 'id' => $msg->delivery_info['delivery_tag'], |
||
419 | 'data' => $this->decode($msg->body), |
||
420 | ]; |
||
421 | |||
422 | // Call the queue worker. |
||
423 | $worker->processItem($item->data); |
||
424 | |||
425 | // Remove the item from the queue. |
||
426 | $msg->delivery_info['channel']->basic_ack($item->id); |
||
427 | $this->logger->info('(Drush) Item @id acknowledged from @queue', [ |
||
428 | '@id' => $item->id, |
||
429 | '@queue' => $queueName, |
||
430 | ]); |
||
431 | } |
||
432 | catch (Exception $e) { |
||
433 | watchdog_exception('rabbitmq', $e); |
||
434 | $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], |
||
435 | TRUE); |
||
436 | } |
||
437 | if ($timeout) { |
||
438 | pcntl_alarm($timeout); |
||
439 | } |
||
440 | }; |
||
441 | |||
442 | return $callback; |
||
443 | } |
||
444 | |||
445 | /** |
||
446 | * Get the channel instance for a given queue. |
||
447 | * |
||
448 | * Convert the various low-level known exceptions to module-level ones to make |
||
449 | * it easier to catch cleanly. |
||
450 | * |
||
451 | * @param \Drupal\rabbitmq\Queue\Queue $queue |
||
452 | * The queue from which to obtain a channel. |
||
453 | * |
||
454 | * @return \PhpAmqpLib\Channel\AMQPChannel |
||
455 | * The channel instance. |
||
456 | * |
||
457 | * @throws \Drupal\rabbitmq\Exception\InvalidArgumentException |
||
458 | * @throws \Drupal\rabbitmq\Exception\OutOfRangeException |
||
459 | * @throws \Drupal\rabbitmq\Exception\RuntimeException |
||
460 | */ |
||
461 | protected function getChannel(Queue $queue) { |
||
462 | try { |
||
463 | $channel = $queue->getChannel(); |
||
464 | } |
||
465 | // May be thrown by StreamIO::__construct() |
||
466 | catch (\InvalidArgumentException $e) { |
||
467 | throw new InvalidArgumentException($e->getMessage()); |
||
468 | } |
||
469 | // May be thrown during getChannel() |
||
470 | catch (AMQPRuntimeException $e) { |
||
471 | throw new RuntimeException($e->getMessage()); |
||
472 | } |
||
473 | // May be thrown during getChannel() |
||
474 | catch (AMQPOutOfRangeException $e) { |
||
475 | throw new OutOfRangeException($e->getMessage()); |
||
476 | } |
||
477 | |||
478 | return $channel; |
||
479 | } |
||
480 | |||
481 | /** |
||
482 | * Get a worker instance for a queue name. |
||
483 | * |
||
484 | * @param string $queueName |
||
485 | * The name of the queue for which to get a worker. |
||
486 | * |
||
487 | * @return \Drupal\Core\Queue\QueueWorkerInterface |
||
488 | * The worker instance. |
||
489 | * |
||
490 | * @throws \Drupal\rabbitmq\Exception\InvalidWorkerException |
||
491 | */ |
||
492 | protected function getWorker(string $queueName): QueueWorkerInterface { |
||
493 | // Before we start listening for messages, make sure the worker is valid. |
||
494 | $worker = $this->workerManager->createInstance($queueName); |
||
495 | if (!($worker instanceof QueueWorkerInterface)) { |
||
0 ignored issues
–
show
The class
Drupal\Core\Queue\QueueWorkerInterface does not exist. Did you forget a USE statement, or did you not list all dependencies?
This error could be the result of: 1. Missing dependenciesPHP Analyzer uses your Are you sure this class is defined by one of your dependencies, or did you maybe
not list a dependency in either the 2. Missing use statementPHP does not complain about undefined classes in if ($x instanceof DoesNotExist) {
// Do something.
}
If you have not tested against this specific condition, such errors might go unnoticed. ![]() |
|||
496 | throw new InvalidWorkerException('Invalid worker for requested queue.'); |
||
497 | } |
||
498 | return $worker; |
||
499 | } |
||
500 | |||
501 | /** |
||
502 | * Did consume() hit the max_iterations limit ? |
||
503 | * |
||
504 | * @param int $maxIterations |
||
505 | * The value of the max_iterations option. |
||
506 | * @param int $iteration |
||
507 | * The current number of iterations in the consume() loop. |
||
508 | * |
||
509 | * @return bool |
||
510 | * Did it ? |
||
511 | */ |
||
512 | protected function hitIterationsLimit(int $maxIterations, int $iteration) { |
||
513 | if ($maxIterations > 0 && $maxIterations <= $iteration) { |
||
514 | $this->logger->notice('RabbitMQ worker has reached max number of iterations: @count. Exiting.', |
||
515 | [ |
||
516 | '@count' => $maxIterations, |
||
517 | ]); |
||
518 | return TRUE; |
||
519 | } |
||
520 | |||
521 | return FALSE; |
||
522 | } |
||
523 | |||
524 | /** |
||
525 | * Evaluate whether worker should exit. |
||
526 | * |
||
527 | * If the --memory_limit option is set, check the memory usage |
||
528 | * and exit if the limit has been exceeded or met. |
||
529 | * |
||
530 | * @param int $memoryLimit |
||
531 | * The maximum memory the service may consume, or -1 for unlimited. |
||
532 | * |
||
533 | * @return bool |
||
534 | * - TRUE: consume() should stop, |
||
535 | * - FALSE: consume() may continue. |
||
536 | */ |
||
537 | protected function hitMemoryLimit(int $memoryLimit) { |
||
538 | // Evaluate whether worker should exit. |
||
539 | // If the --memory_limit option is set, check the memory usage |
||
540 | // and exit if the limit has been exceeded or met. |
||
541 | if ($memoryLimit > 0) { |
||
542 | $memoryUsage = memory_get_peak_usage() / 1024 / 1024; |
||
543 | if ($memoryUsage >= $memoryLimit) { |
||
544 | $this->logger->notice('RabbitMQ worker has reached or exceeded set memory limit of @limitMB and will now exit.', [ |
||
545 | '@limit' => $memoryLimit, |
||
546 | ]); |
||
547 | return TRUE; |
||
548 | } |
||
549 | } |
||
550 | |||
551 | return FALSE; |
||
552 | } |
||
553 | |||
554 | /** |
||
555 | * Implements hook_requirements(). |
||
556 | */ |
||
557 | public static function hookRequirements($phase, array &$req) { |
||
558 | $key = QueueBase::MODULE . '-consumer'; |
||
559 | $req[$key]['title'] = t('RabbitMQ Consumer'); |
||
560 | $options = [ |
||
561 | ':ext' => Url::fromUri('http://php.net/pcntl')->toString(), |
||
562 | '%option' => static::OPTION_TIMEOUT, |
||
563 | ]; |
||
564 | if (!extension_loaded(static::EXTENSION_PCNTL)) { |
||
565 | $req[$key]['description'] = t('Extension <a href=":ext">PCNTL</a> not present in PHP. Option %option is not available in the RabbitMQ consumer.', $options); |
||
566 | $req[$key]['severity'] = REQUIREMENT_WARNING; |
||
567 | } |
||
568 | else { |
||
569 | $req[$key]['description'] = t('Extension <a href=":ext">PCNTL</a> is present in PHP. Option %option is available in the RabbitMQ consumer.', $options); |
||
570 | $req[$key]['severity'] = REQUIREMENT_OK; |
||
571 | } |
||
572 | } |
||
573 | |||
574 | /** |
||
575 | * Ensures options are consistent with configuration. |
||
576 | * |
||
577 | * @throws \Drupal\rabbitmq\Exception\InvalidArgumentException |
||
578 | * Options are not compatible with configuration. |
||
579 | */ |
||
580 | protected function preFlightCheck() { |
||
581 | if ($this->pfcOk) { |
||
582 | return; |
||
583 | } |
||
584 | $this->pfcOk = FALSE; |
||
585 | $timeout = $this->getOption(self::OPTION_TIMEOUT); |
||
586 | if (!empty($timeout) && !extension_loaded(static::EXTENSION_PCNTL)) { |
||
587 | $message = $this->t('Option @option is not available without the @ext extension.', [ |
||
588 | '@option' => static::OPTION_TIMEOUT, |
||
589 | '@ext' => static::EXTENSION_PCNTL, |
||
590 | ]); |
||
591 | throw new InvalidArgumentException($message); |
||
592 | } |
||
593 | $this->pfcOk = TRUE; |
||
594 | } |
||
595 | |||
596 | /** |
||
597 | * Shutdown a queue. |
||
598 | * |
||
599 | * @param string $queueName |
||
600 | * The name of the queue, also the name of the QueueWorker plugin processing |
||
601 | * its items. |
||
602 | */ |
||
603 | public function shutdownQueue(string $queueName) { |
||
604 | $queue = $this->queueFactory->get($queueName); |
||
605 | if ($queue instanceof Queue) { |
||
606 | $queue->shutdown(); |
||
607 | } |
||
608 | } |
||
609 | |||
610 | /** |
||
611 | * Register a decoder for message payloads. |
||
612 | * |
||
613 | * @param callable $decoder |
||
614 | * The decoder. |
||
615 | */ |
||
616 | public function setDecoder(callable $decoder) { |
||
617 | $this->decoder = $decoder; |
||
618 | } |
||
619 | |||
620 | /** |
||
621 | * Register a method able to get option values. |
||
622 | * |
||
623 | * @param callable $optionGetter |
||
624 | * The getter. |
||
625 | */ |
||
626 | public function setOptionGetter(callable $optionGetter) { |
||
627 | $this->optionGetter = $optionGetter; |
||
628 | } |
||
629 | |||
630 | /** |
||
631 | * Mark listening as active. |
||
632 | */ |
||
633 | public function startListening() { |
||
634 | $this->continueListening = TRUE; |
||
635 | } |
||
636 | |||
637 | /** |
||
638 | * Mark listening as inactive. |
||
639 | */ |
||
640 | public function stopListening() { |
||
641 | $this->continueListening = FALSE; |
||
642 | } |
||
643 | |||
644 | } |
||
645 |
This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.
Both the
$myVar
assignment in line 1 and the$higher
assignment in line 2 are dead. The first because$myVar
is never used and the second because$higher
is always overwritten for every possible time line.