This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace Drupal\rabbitmq; |
||
4 | |||
5 | use Drupal\Core\Logger\LoggerChannelInterface; |
||
6 | use Drupal\Core\Queue\QueueFactory; |
||
7 | use Drupal\Core\Queue\QueueWorkerInterface; |
||
8 | use Drupal\Core\Queue\QueueWorkerManagerInterface; |
||
9 | use Drupal\Core\Queue\RequeueException; |
||
10 | use Drupal\Core\Queue\SuspendQueueException; |
||
11 | use Drupal\Core\StringTranslation\StringTranslationTrait; |
||
12 | use Drupal\Core\Url; |
||
13 | use Drupal\rabbitmq\Exception\Exception; |
||
14 | use Drupal\rabbitmq\Exception\InvalidArgumentException; |
||
15 | use Drupal\rabbitmq\Exception\InvalidWorkerException; |
||
16 | use Drupal\rabbitmq\Exception\OutOfRangeException; |
||
17 | use Drupal\rabbitmq\Exception\RuntimeException; |
||
18 | use Drupal\rabbitmq\Queue\Queue; |
||
19 | use Drupal\rabbitmq\Queue\QueueBase; |
||
20 | use PhpAmqpLib\Channel\AMQPChannel; |
||
21 | use PhpAmqpLib\Exception\AMQPIOWaitException; |
||
22 | use PhpAmqpLib\Exception\AMQPOutOfRangeException; |
||
23 | use PhpAmqpLib\Exception\AMQPRuntimeException; |
||
24 | use PhpAmqpLib\Exception\AMQPTimeoutException; |
||
25 | use PhpAmqpLib\Message\AMQPMessage; |
||
26 | |||
27 | /** |
||
28 | * Class Consumer provides a service wrapping queue consuming operations. |
||
29 | * |
||
30 | * Note that it does not carray the value of its options, but getters for them, |
||
31 | * to support multiple ways of accessing options, e.g. Drush vs Console vs Web. |
||
32 | */ |
||
33 | class Consumer { |
||
34 | use StringTranslationTrait; |
||
35 | |||
36 | const EXTENSION_PCNTL = 'pcntl'; |
||
37 | |||
38 | const OPTION_MAX_ITERATIONS = 'max_iterations'; |
||
39 | const OPTION_MEMORY_LIMIT = 'memory_limit'; |
||
40 | const OPTION_TIMEOUT = 'rabbitmq_timeout'; |
||
41 | |||
42 | // Known option names and their default value. |
||
43 | const OPTIONS = [ |
||
44 | self::OPTION_MAX_ITERATIONS => 0, |
||
45 | self::OPTION_MEMORY_LIMIT => -1, |
||
46 | self::OPTION_TIMEOUT => NULL, |
||
47 | ]; |
||
48 | |||
49 | /** |
||
50 | * Continue listening ? |
||
51 | * |
||
52 | * @var bool |
||
53 | */ |
||
54 | protected $continueListening = FALSE; |
||
55 | |||
56 | /** |
||
57 | * The rabbitmq logger channel. |
||
58 | * |
||
59 | * @var \Drupal\Core\Logger\LoggerChannelInterface |
||
60 | */ |
||
61 | protected $logger; |
||
62 | |||
63 | /** |
||
64 | * A callback providing the ability to read service runtime options. |
||
65 | * |
||
66 | * This is needed to support non-Drush use scenarios. |
||
67 | * |
||
68 | * @var callable |
||
69 | */ |
||
70 | protected $optionGetter; |
||
71 | |||
72 | /** |
||
73 | * Was the Pre-Flight Check successful ? Yes | No |Â Not yet run. |
||
74 | * |
||
75 | * @var bool|null |
||
76 | */ |
||
77 | protected $pfcOk = NULL; |
||
78 | |||
79 | /** |
||
80 | * The queue service. |
||
81 | * |
||
82 | * @var \Drupal\Core\Queue\QueueFactory |
||
83 | */ |
||
84 | protected $queueFactory; |
||
85 | |||
86 | /** |
||
87 | * The plugin.manager.queue_worker service. |
||
88 | * |
||
89 | * @var \Drupal\Core\Queue\QueueWorkerManagerInterface |
||
90 | */ |
||
91 | protected $workerManager; |
||
92 | |||
93 | /** |
||
94 | * Consumer constructor. |
||
95 | * |
||
96 | * @param \Drupal\Core\Queue\QueueWorkerManagerInterface $workerManager |
||
97 | * The plugin.manager.queue_worker service. |
||
98 | * @param \Drupal\Core\Queue\QueueFactory $queueFactory |
||
99 | * The queue service. |
||
100 | * @param \Drupal\Core\Logger\LoggerChannelInterface $logger |
||
101 | * The rabbitmq logger channel. |
||
102 | */ |
||
103 | public function __construct( |
||
104 | QueueWorkerManagerInterface $workerManager, |
||
105 | QueueFactory $queueFactory, |
||
106 | LoggerChannelInterface $logger |
||
107 | ) { |
||
108 | $this->logger = $logger; |
||
109 | $this->queueFactory = $queueFactory; |
||
110 | $this->workerManager = $workerManager; |
||
111 | } |
||
112 | |||
113 | /** |
||
114 | * Is the queue name valid ? |
||
115 | * |
||
116 | * @param string $queueName |
||
117 | * The requested name. |
||
118 | * |
||
119 | * @return bool |
||
120 | * Is is valid? |
||
121 | */ |
||
122 | public function isQueueNameValid(string $queueName): bool { |
||
123 | $workers = $this->workerManager->getDefinitions(); |
||
124 | if (!isset($workers[$queueName])) { |
||
125 | return drush_set_error('rabbitmq', $this->t('No known worker for queue @queue', [ |
||
126 | '@queue' => $queueName, |
||
127 | ])); |
||
128 | } |
||
129 | } |
||
130 | |||
131 | /** |
||
132 | * Decode the data received from the queue using a chain of decoder choices. |
||
133 | * |
||
134 | * - 1st/2nd choices: the one already set on the service instance |
||
135 | * - 1st: set on the service instance manually during or after construction. |
||
136 | * - 2nd: the one set on the service instance within consume() if the |
||
137 | * worker implements DecoderAwareInterface. |
||
138 | * - 3rd choice: a legacy-compatible JSON decoder. |
||
139 | * |
||
140 | * @param mixed $data |
||
141 | * The message payload to decode. |
||
142 | * |
||
143 | * @return mixed |
||
144 | * The decoded value. |
||
145 | */ |
||
146 | public function decode($data) { |
||
147 | if (isset($this->decoder)) { |
||
148 | return $this->decoder($data); |
||
149 | } |
||
150 | else { |
||
151 | return json_decode($data, TRUE); |
||
152 | } |
||
153 | } |
||
154 | |||
155 | /** |
||
156 | * Get the value of a queue consumer option. |
||
157 | * |
||
158 | * @param string $name |
||
159 | * The name of the option. |
||
160 | * |
||
161 | * @return mixed |
||
162 | * The value returned by the configured option getter, or NULL if the option |
||
163 | * is unknown. |
||
164 | */ |
||
165 | public function getOption(string $name) { |
||
166 | if (!array_key_exists($name, static::OPTIONS)) { |
||
167 | return NULL; |
||
168 | } |
||
169 | $getter = $this->optionGetter; |
||
170 | return is_callable($getter) ? $getter($name) : NULL; |
||
171 | } |
||
172 | |||
173 | /** |
||
174 | * Log an event about the queue run. |
||
175 | */ |
||
176 | public function logStart() { |
||
177 | $this->preFlightCheck(); |
||
178 | $maxIterations = $this->getOption(self::OPTION_MAX_ITERATIONS); |
||
179 | if ($maxIterations > 0) { |
||
180 | $readyMessage = "RabbitMQ worker ready to receive up to @count messages."; |
||
181 | $readyArgs = ['@count' => $maxIterations]; |
||
182 | } |
||
183 | else { |
||
184 | $readyMessage = "RabbitMQ worker ready to receive an unlimited number of messages."; |
||
185 | $readyArgs = []; |
||
186 | } |
||
187 | $this->logger->debug($readyMessage, $readyArgs); |
||
188 | } |
||
189 | |||
190 | /** |
||
191 | * Signal handler. |
||
192 | * |
||
193 | * @see \Drupal\rabbitmq\Consumer::consume() |
||
194 | * |
||
195 | * On a timeout signal, the connections is already closed, so do not attempt |
||
196 | * to shutdown the queue. |
||
197 | */ |
||
198 | public function onTimeout() { |
||
199 | $this->logger->info('Timeout reached'); |
||
200 | $this->stopListening(); |
||
201 | } |
||
202 | |||
203 | /** |
||
204 | * Main logic: consume the specified queue using AMQP. |
||
205 | * |
||
206 | * @param string $queueName |
||
207 | * The name of the queue to consume. |
||
208 | * |
||
209 | * @throws \Exception |
||
210 | */ |
||
211 | public function consume(string $queueName) { |
||
212 | $this->preFlightCheck(); |
||
213 | $this->startListening(); |
||
214 | $worker = $this->getWorker($queueName); |
||
215 | // Allow obtaining a decoder from the worker to have a sane default, while |
||
216 | // being able to override it on service instantiation. |
||
217 | if ($worker instanceof DecoderAwareWorkerInterface && !isset($this->decoder)) { |
||
218 | $this->setDecoder($worker->getDecoder()); |
||
219 | } |
||
220 | |||
221 | /* @var \Drupal\rabbitmq\Queue\queue $queue */ |
||
222 | $queue = $this->queueFactory->get($queueName); |
||
223 | assert($queue instanceof Queue); |
||
224 | |||
225 | $channel = $this->getChannel($queue); |
||
226 | assert($channel instanceof AMQPChannel); |
||
227 | $channel->basic_qos(NULL, 1, FALSE); |
||
228 | |||
229 | $maxIterations = $this->getOption(self::OPTION_MAX_ITERATIONS); |
||
230 | $memoryLimit = $this->getOption(self::OPTION_MEMORY_LIMIT); |
||
231 | $timeout = $this->getOption(self::OPTION_TIMEOUT); |
||
232 | if ($timeout) { |
||
233 | pcntl_signal(SIGALRM, [$this, 'onTimeout']); |
||
234 | } |
||
235 | $callback = $this->getCallback($worker, $queueName, $timeout); |
||
236 | |||
237 | while ($this->continueListening) { |
||
238 | try { |
||
239 | $channel->basic_consume($queueName, '', FALSE, FALSE, FALSE, FALSE, $callback); |
||
240 | |||
241 | // Begin listening for messages to process. |
||
242 | $iteration = 0; |
||
243 | while (count($channel->callbacks) && $this->continueListening) { |
||
244 | if ($timeout) { |
||
245 | pcntl_alarm($timeout); |
||
246 | } |
||
247 | $channel->wait(NULL, FALSE, $timeout); |
||
248 | if ($timeout) { |
||
249 | pcntl_alarm(0); |
||
250 | } |
||
251 | |||
252 | // Break on memory_limit reached. |
||
253 | if ($this->hitMemoryLimit($memoryLimit)) { |
||
254 | $this->stopListening(); |
||
255 | break; |
||
256 | } |
||
257 | |||
258 | // Break on max_iterations reached. |
||
259 | $iteration++; |
||
260 | if ($this->hitIterationsLimit($maxIterations, $iteration)) { |
||
261 | $this->stopListening(); |
||
262 | } |
||
263 | } |
||
264 | $this->stopListening(); |
||
265 | } |
||
266 | catch (AMQPIOWaitException $e) { |
||
267 | $this->stopListening(); |
||
268 | $channel->close(); |
||
269 | } |
||
270 | catch (AMQPTimeoutException $e) { |
||
271 | $this->startListening(); |
||
272 | } |
||
273 | catch (Exception $e) { |
||
274 | throw new Exception('Could not obtain channel for queue.', 0, $e); |
||
275 | } |
||
276 | } |
||
277 | } |
||
278 | |||
279 | /** |
||
280 | * Main logic: consume the specified queue using Queue API. |
||
281 | * |
||
282 | * @param string $queueName |
||
283 | * The name of the queue to consume. |
||
284 | * |
||
285 | * @throws \Exception |
||
286 | * |
||
287 | * @TODO Probably needs to do more on SuspendQueueException. |
||
288 | */ |
||
289 | public function consumeQueueApi(string $queueName) { |
||
290 | $this->preFlightCheck(); |
||
291 | $this->startListening(); |
||
292 | $worker = $this->getWorker($queueName); |
||
293 | // Allow obtaining a decoder from the worker to have a sane default, while |
||
294 | // being able to override it on service instantiation. |
||
295 | if ($worker instanceof DecoderAwareWorkerInterface && !isset($this->decoder)) { |
||
296 | $this->setDecoder($worker->getDecoder()); |
||
297 | } |
||
298 | |||
299 | /* @var \Drupal\rabbitmq\Queue\queue $queue */ |
||
300 | $queue = $this->queueFactory->get($queueName); |
||
301 | assert($queue instanceof Queue); |
||
302 | |||
303 | $maxIterations = $this->getOption(self::OPTION_MAX_ITERATIONS); |
||
304 | $memoryLimit = $this->getOption(self::OPTION_MEMORY_LIMIT); |
||
305 | $timeout = $this->getOption(self::OPTION_TIMEOUT); |
||
306 | if (!empty($timeout)) { |
||
307 | pcntl_signal(SIGALRM, [$this, 'onTimeout']); |
||
308 | } |
||
309 | else { |
||
310 | $timeout = 0; |
||
311 | } |
||
312 | |||
313 | $iteration = 0; |
||
314 | $startTime = microtime(TRUE); |
||
315 | do { |
||
316 | $item = NULL; |
||
0 ignored issues
–
show
|
|||
317 | if ($timeout) { |
||
318 | pcntl_alarm($timeout); |
||
319 | $item = $queue->claimItem(); |
||
320 | pcntl_alarm(0); |
||
321 | } |
||
322 | else { |
||
323 | $item = $queue->claimItem(); |
||
324 | } |
||
325 | |||
326 | // Break on memory_limit reached before process. |
||
327 | if ($this->hitMemoryLimit($memoryLimit)) { |
||
328 | $this->stopListening(); |
||
329 | break; |
||
330 | } |
||
331 | |||
332 | $currentTime = microtime(TRUE); |
||
333 | // If we did not get an object, do not try to process it. |
||
334 | if (!is_object($item)) { |
||
335 | usleep(10); |
||
336 | // Only loop if the current continuous wait did not exceed timeout. |
||
337 | if ($currentTime > $startTime + $timeout) { |
||
338 | break; |
||
339 | } |
||
340 | else { |
||
341 | continue; |
||
342 | } |
||
343 | } |
||
344 | |||
345 | // We got a normal item, try to handle it. |
||
346 | try { |
||
347 | // Call the queue worker. |
||
348 | $worker->processItem($item->data); |
||
349 | |||
350 | // Remove the item from the queue. |
||
351 | $queue->deleteItem($item); |
||
352 | $this->logger->debug('(Drush) Item @id acknowledged from @queue', [ |
||
353 | '@id' => $item->id, |
||
354 | '@queue' => $queueName, |
||
355 | ]); |
||
356 | } |
||
357 | // Reserved QueueAPI exception: releaseItem and continue work. |
||
358 | catch (RequeueException $e) { |
||
359 | $queue->releaseItem($item); |
||
360 | $this->logger->debug('(Drush) Item @id put back on @queue', [ |
||
361 | '@id' => $item->id, |
||
362 | '@queue' => $queueName, |
||
363 | ]); |
||
364 | } |
||
365 | // Reserved QueueAPI exception: stop working on this queue. |
||
366 | catch (SuspendQueueException $e) { |
||
367 | $queue->releaseItem($item); |
||
368 | $this->stopListening(); |
||
369 | } |
||
370 | // Restart wait period: we handled a valid item. |
||
371 | $startTime = microtime(TRUE); |
||
372 | |||
373 | // Break on memory_limit reached after process. |
||
374 | if ($this->hitMemoryLimit($memoryLimit)) { |
||
375 | $this->stopListening(); |
||
376 | break; |
||
377 | } |
||
378 | |||
379 | // Break on max_iterations reached. Only count actual items. |
||
380 | $iteration++; |
||
381 | if ($this->hitIterationsLimit($maxIterations, $iteration)) { |
||
382 | $this->stopListening(); |
||
383 | } |
||
384 | } while ($this->continueListening); |
||
385 | } |
||
386 | |||
387 | /** |
||
388 | * Provide a message callback for events. |
||
389 | * |
||
390 | * @param \Drupal\Core\Queue\QueueWorkerInterface $worker |
||
391 | * The worker plugin. |
||
392 | * @param string $queueName |
||
393 | * The queue name. |
||
394 | * @param int $timeout |
||
395 | * The queue wait timeout. Since it is only for queue wait, not worker wait, |
||
396 | * it has to be reset before starting work, and reinitialized when ending |
||
397 | * work. |
||
398 | * |
||
399 | * @return \Closure |
||
400 | * The callback. |
||
401 | */ |
||
402 | protected function getCallback( |
||
403 | QueueWorkerInterface $worker, |
||
404 | string $queueName, |
||
405 | int $timeout = 0 |
||
406 | ): \Closure { |
||
407 | $callback = function (AMQPMessage $msg) use ($worker, $queueName, $timeout) { |
||
408 | if ($timeout) { |
||
409 | pcntl_alarm(0); |
||
410 | } |
||
411 | $this->logger->info('(Drush) Received queued message: @id', [ |
||
412 | '@id' => $msg->delivery_info['delivery_tag'], |
||
413 | ]); |
||
414 | |||
415 | try { |
||
416 | // Build the item to pass to the queue worker. |
||
417 | $item = (object) [ |
||
418 | 'id' => $msg->delivery_info['delivery_tag'], |
||
419 | 'data' => $this->decode($msg->body), |
||
420 | ]; |
||
421 | |||
422 | // Call the queue worker. |
||
423 | $worker->processItem($item->data); |
||
424 | |||
425 | // Remove the item from the queue. |
||
426 | $msg->delivery_info['channel']->basic_ack($item->id); |
||
427 | $this->logger->info('(Drush) Item @id acknowledged from @queue', [ |
||
428 | '@id' => $item->id, |
||
429 | '@queue' => $queueName, |
||
430 | ]); |
||
431 | } |
||
432 | catch (Exception $e) { |
||
433 | watchdog_exception('rabbitmq', $e); |
||
434 | $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], |
||
435 | TRUE); |
||
436 | } |
||
437 | if ($timeout) { |
||
438 | pcntl_alarm($timeout); |
||
439 | } |
||
440 | }; |
||
441 | |||
442 | return $callback; |
||
443 | } |
||
444 | |||
445 | /** |
||
446 | * Get the channel instance for a given queue. |
||
447 | * |
||
448 | * Convert the various low-level known exceptions to module-level ones to make |
||
449 | * it easier to catch cleanly. |
||
450 | * |
||
451 | * @param \Drupal\rabbitmq\Queue\Queue $queue |
||
452 | * The queue from which to obtain a channel. |
||
453 | * |
||
454 | * @return \PhpAmqpLib\Channel\AMQPChannel |
||
455 | * The channel instance. |
||
456 | * |
||
457 | * @throws \Drupal\rabbitmq\Exception\InvalidArgumentException |
||
458 | * @throws \Drupal\rabbitmq\Exception\OutOfRangeException |
||
459 | * @throws \Drupal\rabbitmq\Exception\RuntimeException |
||
460 | */ |
||
461 | protected function getChannel(Queue $queue) { |
||
462 | try { |
||
463 | $channel = $queue->getChannel(); |
||
464 | } |
||
465 | // May be thrown by StreamIO::__construct() |
||
466 | catch (\InvalidArgumentException $e) { |
||
467 | throw new InvalidArgumentException($e->getMessage()); |
||
468 | } |
||
469 | // May be thrown during getChannel() |
||
470 | catch (AMQPRuntimeException $e) { |
||
471 | throw new RuntimeException($e->getMessage()); |
||
472 | } |
||
473 | // May be thrown during getChannel() |
||
474 | catch (AMQPOutOfRangeException $e) { |
||
475 | throw new OutOfRangeException($e->getMessage()); |
||
476 | } |
||
477 | |||
478 | return $channel; |
||
479 | } |
||
480 | |||
481 | /** |
||
482 | * Get a worker instance for a queue name. |
||
483 | * |
||
484 | * @param string $queueName |
||
485 | * The name of the queue for which to get a worker. |
||
486 | * |
||
487 | * @return \Drupal\Core\Queue\QueueWorkerInterface |
||
488 | * The worker instance. |
||
489 | * |
||
490 | * @throws \Drupal\rabbitmq\Exception\InvalidWorkerException |
||
491 | */ |
||
492 | protected function getWorker(string $queueName): QueueWorkerInterface { |
||
493 | // Before we start listening for messages, make sure the worker is valid. |
||
494 | $worker = $this->workerManager->createInstance($queueName); |
||
495 | if (!($worker instanceof QueueWorkerInterface)) { |
||
496 | throw new InvalidWorkerException('Invalid worker for requested queue.'); |
||
497 | } |
||
498 | return $worker; |
||
499 | } |
||
500 | |||
501 | /** |
||
502 | * Did consume() hit the max_iterations limit ? |
||
503 | * |
||
504 | * @param int $maxIterations |
||
505 | * The value of the max_iterations option. |
||
506 | * @param int $iteration |
||
507 | * The current number of iterations in the consume() loop. |
||
508 | * |
||
509 | * @return bool |
||
510 | * Did it ? |
||
511 | */ |
||
512 | protected function hitIterationsLimit(int $maxIterations, int $iteration) { |
||
513 | if ($maxIterations > 0 && $maxIterations <= $iteration) { |
||
514 | $this->logger->notice('RabbitMQ worker has reached max number of iterations: @count. Exiting.', |
||
515 | [ |
||
516 | '@count' => $maxIterations, |
||
517 | ]); |
||
518 | return TRUE; |
||
519 | } |
||
520 | |||
521 | return FALSE; |
||
522 | } |
||
523 | |||
524 | /** |
||
525 | * Evaluate whether worker should exit. |
||
526 | * |
||
527 | * If the --memory_limit option is set, check the memory usage |
||
528 | * and exit if the limit has been exceeded or met. |
||
529 | * |
||
530 | * @param int $memoryLimit |
||
531 | * The maximum memory the service may consume, or -1 for unlimited. |
||
532 | * |
||
533 | * @return bool |
||
534 | * - TRUE: consume() should stop, |
||
535 | * - FALSE: consume() may continue. |
||
536 | */ |
||
537 | protected function hitMemoryLimit(int $memoryLimit) { |
||
538 | // Evaluate whether worker should exit. |
||
539 | // If the --memory_limit option is set, check the memory usage |
||
540 | // and exit if the limit has been exceeded or met. |
||
541 | if ($memoryLimit > 0) { |
||
542 | $memoryUsage = memory_get_peak_usage() / 1024 / 1024; |
||
543 | if ($memoryUsage >= $memoryLimit) { |
||
544 | $this->logger->notice('RabbitMQ worker has reached or exceeded set memory limit of @limitMB and will now exit.', [ |
||
545 | '@limit' => $memoryLimit, |
||
546 | ]); |
||
547 | return TRUE; |
||
548 | } |
||
549 | } |
||
550 | |||
551 | return FALSE; |
||
552 | } |
||
553 | |||
554 | /** |
||
555 | * Implements hook_requirements(). |
||
556 | */ |
||
557 | public static function hookRequirements($phase, array &$req) { |
||
558 | $key = QueueBase::MODULE . '-consumer'; |
||
559 | $req[$key]['title'] = t('RabbitMQ Consumer'); |
||
560 | $options = [ |
||
561 | ':ext' => Url::fromUri('http://php.net/pcntl')->toString(), |
||
562 | '%option' => static::OPTION_TIMEOUT, |
||
563 | ]; |
||
564 | if (!extension_loaded(static::EXTENSION_PCNTL)) { |
||
565 | $req[$key]['description'] = t('Extension <a href=":ext">PCNTL</a> not present in PHP. Option %option is not available in the RabbitMQ consumer.', $options); |
||
566 | $req[$key]['severity'] = REQUIREMENT_WARNING; |
||
567 | } |
||
568 | else { |
||
569 | $req[$key]['description'] = t('Extension <a href=":ext">PCNTL</a> is present in PHP. Option %option is available in the RabbitMQ consumer.', $options); |
||
570 | $req[$key]['severity'] = REQUIREMENT_OK; |
||
571 | } |
||
572 | } |
||
573 | |||
574 | /** |
||
575 | * Ensures options are consistent with configuration. |
||
576 | * |
||
577 | * @throws \Drupal\rabbitmq\Exception\InvalidArgumentException |
||
578 | * Options are not compatible with configuration. |
||
579 | */ |
||
580 | protected function preFlightCheck() { |
||
581 | if ($this->pfcOk) { |
||
582 | return; |
||
583 | } |
||
584 | $this->pfcOk = FALSE; |
||
585 | $timeout = $this->getOption(self::OPTION_TIMEOUT); |
||
586 | if (!empty($timeout) && !extension_loaded(static::EXTENSION_PCNTL)) { |
||
587 | $message = $this->t('Option @option is not available without the @ext extension.', [ |
||
588 | '@option' => static::OPTION_TIMEOUT, |
||
589 | '@ext' => static::EXTENSION_PCNTL, |
||
590 | ]); |
||
591 | throw new InvalidArgumentException($message); |
||
592 | } |
||
593 | $this->pfcOk = TRUE; |
||
594 | } |
||
595 | |||
596 | /** |
||
597 | * Shutdown a queue. |
||
598 | * |
||
599 | * @param string $queueName |
||
600 | * The name of the queue, also the name of the QueueWorker plugin processing |
||
601 | * its items. |
||
602 | */ |
||
603 | public function shutdownQueue(string $queueName) { |
||
604 | $queue = $this->queueFactory->get($queueName); |
||
605 | if ($queue instanceof Queue) { |
||
606 | $queue->shutdown(); |
||
607 | } |
||
608 | } |
||
609 | |||
610 | /** |
||
611 | * Register a decoder for message payloads. |
||
612 | * |
||
613 | * @param callable $decoder |
||
614 | * The decoder. |
||
615 | */ |
||
616 | public function setDecoder(callable $decoder) { |
||
617 | $this->decoder = $decoder; |
||
618 | } |
||
619 | |||
620 | /** |
||
621 | * Register a method able to get option values. |
||
622 | * |
||
623 | * @param callable $optionGetter |
||
624 | * The getter. |
||
625 | */ |
||
626 | public function setOptionGetter(callable $optionGetter) { |
||
627 | $this->optionGetter = $optionGetter; |
||
628 | } |
||
629 | |||
630 | /** |
||
631 | * Mark listening as active. |
||
632 | */ |
||
633 | public function startListening() { |
||
634 | $this->continueListening = TRUE; |
||
635 | } |
||
636 | |||
637 | /** |
||
638 | * Mark listening as inactive. |
||
639 | */ |
||
640 | public function stopListening() { |
||
641 | $this->continueListening = FALSE; |
||
642 | } |
||
643 | |||
644 | } |
||
645 |
This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.
Both the
$myVar
assignment in line 1 and the$higher
assignment in line 2 are dead. The first because$myVar
is never used and the second because$higher
is always overwritten for every possible time line.