Issues (9)

Security Analysis    no request data  

This project does not seem to handle request data directly as such no vulnerable execution paths were found.

  Cross-Site Scripting
Cross-Site Scripting enables an attacker to inject code into the response of a web-request that is viewed by other users. It can for example be used to bypass access controls, or even to take over other users' accounts.
  File Exposure
File Exposure allows an attacker to gain access to local files that he should not be able to access. These files can for example include database credentials, or other configuration files.
  File Manipulation
File Manipulation enables an attacker to write custom data to files. This potentially leads to injection of arbitrary code on the server.
  Object Injection
Object Injection enables an attacker to inject an object into PHP code, and can lead to arbitrary code execution, file exposure, or file manipulation attacks.
  Code Injection
Code Injection enables an attacker to execute arbitrary code on the server.
  Response Splitting
Response Splitting can be used to send arbitrary responses.
  File Inclusion
File Inclusion enables an attacker to inject custom files into PHP's file loading mechanism, either explicitly passed to include, or for example via PHP's auto-loading mechanism.
  Command Injection
Command Injection enables an attacker to inject a shell command that is execute with the privileges of the web-server. This can be used to expose sensitive data, or gain access of your server.
  SQL Injection
SQL Injection enables an attacker to execute arbitrary SQL code on your database server gaining access to user data, or manipulating user data.
  XPath Injection
XPath Injection enables an attacker to modify the parts of XML document that are read. If that XML document is for example used for authentication, this can lead to further vulnerabilities similar to SQL Injection.
  LDAP Injection
LDAP Injection enables an attacker to inject LDAP statements potentially granting permission to run unauthorized queries, or modify content inside the LDAP tree.
  Header Injection
  Other Vulnerability
This category comprises other attack vectors such as manipulating the PHP runtime, loading custom extensions, freezing the runtime, or similar.
  Regex Injection
Regex Injection enables an attacker to execute arbitrary code in your PHP process.
  XML Injection
XML Injection enables an attacker to read files on your local filesystem including configuration files, or can be abused to freeze your web-server process.
  Variable Injection
Variable Injection enables an attacker to overwrite program variables with custom data, and can lead to further vulnerabilities.
Unfortunately, the security analysis is currently not available for your project. If you are a non-commercial open-source project, please contact support to gain access.

src/Consumer.php (3 issues)

Severity

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
namespace Drupal\rabbitmq;
4
5
use Drupal\Core\Logger\LoggerChannelInterface;
6
use Drupal\Core\Queue\QueueFactory;
7
use Drupal\Core\Queue\QueueWorkerInterface;
8
use Drupal\Core\Queue\QueueWorkerManagerInterface;
9
use Drupal\Core\Queue\RequeueException;
10
use Drupal\Core\Queue\SuspendQueueException;
11
use Drupal\Core\StringTranslation\StringTranslationTrait;
12
use Drupal\Core\Url;
13
use Drupal\rabbitmq\Exception\Exception;
14
use Drupal\rabbitmq\Exception\InvalidArgumentException;
15
use Drupal\rabbitmq\Exception\InvalidWorkerException;
16
use Drupal\rabbitmq\Exception\OutOfRangeException;
17
use Drupal\rabbitmq\Exception\RuntimeException;
18
use Drupal\rabbitmq\Queue\Queue;
19
use Drupal\rabbitmq\Queue\QueueBase;
20
use PhpAmqpLib\Channel\AMQPChannel;
21
use PhpAmqpLib\Exception\AMQPIOWaitException;
22
use PhpAmqpLib\Exception\AMQPOutOfRangeException;
23
use PhpAmqpLib\Exception\AMQPRuntimeException;
24
use PhpAmqpLib\Exception\AMQPTimeoutException;
25
use PhpAmqpLib\Message\AMQPMessage;
26
27
/**
28
 * Class Consumer provides a service wrapping queue consuming operations.
29
 *
30
 * Note that it does not carray the value of its options, but getters for them,
31
 * to support multiple ways of accessing options, e.g. Drush vs Console vs Web.
32
 */
33
class Consumer {
34
  use StringTranslationTrait;
35
36
  const EXTENSION_PCNTL = 'pcntl';
37
38
  const OPTION_MAX_ITERATIONS = 'max_iterations';
39
  const OPTION_MEMORY_LIMIT = 'memory_limit';
40
  const OPTION_TIMEOUT = 'rabbitmq_timeout';
41
42
  // Known option names and their default value.
43
  const OPTIONS = [
44
    self::OPTION_MAX_ITERATIONS => 0,
45
    self::OPTION_MEMORY_LIMIT => -1,
46
    self::OPTION_TIMEOUT => NULL,
47
  ];
48
49
  /**
50
   * Continue listening ?
51
   *
52
   * @var bool
53
   */
54
  protected $continueListening = FALSE;
55
56
  /**
57
   * The rabbitmq logger channel.
58
   *
59
   * @var \Drupal\Core\Logger\LoggerChannelInterface
60
   */
61
  protected $logger;
62
63
  /**
64
   * A callback providing the ability to read service runtime options.
65
   *
66
   * This is needed to support non-Drush use scenarios.
67
   *
68
   * @var callable
69
   */
70
  protected $optionGetter;
71
72
  /**
73
   * Was the Pre-Flight Check successful ? Yes | No | Not yet run.
74
   *
75
   * @var bool|null
76
   */
77
  protected $pfcOk = NULL;
78
79
  /**
80
   * The queue service.
81
   *
82
   * @var \Drupal\Core\Queue\QueueFactory
83
   */
84
  protected $queueFactory;
85
86
  /**
87
   * The plugin.manager.queue_worker service.
88
   *
89
   * @var \Drupal\Core\Queue\QueueWorkerManagerInterface
90
   */
91
  protected $workerManager;
92
93
  /**
94
   * Consumer constructor.
95
   *
96
   * @param \Drupal\Core\Queue\QueueWorkerManagerInterface $workerManager
97
   *   The plugin.manager.queue_worker service.
98
   * @param \Drupal\Core\Queue\QueueFactory $queueFactory
99
   *   The queue service.
100
   * @param \Drupal\Core\Logger\LoggerChannelInterface $logger
101
   *   The rabbitmq logger channel.
102
   */
103
  public function __construct(
104
    QueueWorkerManagerInterface $workerManager,
105
    QueueFactory $queueFactory,
106
    LoggerChannelInterface $logger
107
  ) {
108
    $this->logger = $logger;
109
    $this->queueFactory = $queueFactory;
110
    $this->workerManager = $workerManager;
111
  }
112
113
  /**
114
   * Is the queue name valid ?
115
   *
116
   * @param string $queueName
117
   *   The requested name.
118
   *
119
   * @return bool
120
   *   Is is valid?
121
   */
122
  public function isQueueNameValid(string $queueName): bool {
123
    $workers = $this->workerManager->getDefinitions();
124
    if (!isset($workers[$queueName])) {
125
      return drush_set_error('rabbitmq', $this->t('No known worker for queue @queue', [
126
        '@queue' => $queueName,
127
      ]));
128
    }
129
  }
130
131
  /**
132
   * Decode the data received from the queue using a chain of decoder choices.
133
   *
134
   * - 1st/2nd choices: the one already set on the service instance
135
   *   - 1st: set on the service instance manually during or after construction.
136
   *   - 2nd: the one set on the service instance within consume() if the
137
   *     worker implements DecoderAwareInterface.
138
   * - 3rd choice: a legacy-compatible JSON decoder.
139
   *
140
   * @param mixed $data
141
   *   The message payload to decode.
142
   *
143
   * @return mixed
144
   *   The decoded value.
145
   */
146
  public function decode($data) {
147
    if (isset($this->decoder)) {
148
      return $this->decoder($data);
149
    }
150
    else {
151
      return json_decode($data, TRUE);
152
    }
153
  }
154
155
  /**
156
   * Get the value of a queue consumer option.
157
   *
158
   * @param string $name
159
   *   The name of the option.
160
   *
161
   * @return mixed
162
   *   The value returned by the configured option getter, or NULL if the option
163
   *   is unknown.
164
   */
165
  public function getOption(string $name) {
166
    if (!array_key_exists($name, static::OPTIONS)) {
167
      return NULL;
168
    }
169
    $getter = $this->optionGetter;
170
    return is_callable($getter) ? $getter($name) : NULL;
171
  }
172
173
  /**
174
   * Log an event about the queue run.
175
   */
176
  public function logStart() {
177
    $this->preFlightCheck();
178
    $maxIterations = $this->getOption(self::OPTION_MAX_ITERATIONS);
179
    if ($maxIterations > 0) {
180
      $readyMessage = "RabbitMQ worker ready to receive up to @count messages.";
181
      $readyArgs = ['@count' => $maxIterations];
182
    }
183
    else {
184
      $readyMessage = "RabbitMQ worker ready to receive an unlimited number of messages.";
185
      $readyArgs = [];
186
    }
187
    $this->logger->debug($readyMessage, $readyArgs);
188
  }
189
190
  /**
191
   * Signal handler.
192
   *
193
   * @see \Drupal\rabbitmq\Consumer::consume()
194
   *
195
   * On a timeout signal, the connections is already closed, so do not attempt
196
   * to shutdown the queue.
197
   */
198
  public function onTimeout() {
199
    $this->logger->info('Timeout reached');
200
    $this->stopListening();
201
  }
202
203
  /**
204
   * Main logic: consume the specified queue using AMQP.
205
   *
206
   * @param string $queueName
207
   *   The name of the queue to consume.
208
   *
209
   * @throws \Exception
210
   */
211
  public function consume(string $queueName) {
212
    $this->preFlightCheck();
213
    $this->startListening();
214
    $worker = $this->getWorker($queueName);
215
    // Allow obtaining a decoder from the worker to have a sane default, while
216
    // being able to override it on service instantiation.
217
    if ($worker instanceof DecoderAwareWorkerInterface && !isset($this->decoder)) {
218
      $this->setDecoder($worker->getDecoder());
219
    }
220
221
    /* @var \Drupal\rabbitmq\Queue\queue $queue */
222
    $queue = $this->queueFactory->get($queueName);
223
    assert($queue instanceof Queue);
224
225
    $channel = $this->getChannel($queue);
226
    assert($channel instanceof AMQPChannel);
227
    $channel->basic_qos(NULL, 1, FALSE);
228
229
    $maxIterations = $this->getOption(self::OPTION_MAX_ITERATIONS);
230
    $memoryLimit = $this->getOption(self::OPTION_MEMORY_LIMIT);
231
    $timeout = $this->getOption(self::OPTION_TIMEOUT);
232
    if ($timeout) {
233
      pcntl_signal(SIGALRM, [$this, 'onTimeout']);
234
    }
235
    $callback = $this->getCallback($worker, $queueName, $timeout);
236
237
    while ($this->continueListening) {
238
      try {
239
        $channel->basic_consume($queueName, '', FALSE, FALSE, FALSE, FALSE, $callback);
240
241
        // Begin listening for messages to process.
242
        $iteration = 0;
243
        while (count($channel->callbacks) && $this->continueListening) {
244
          if ($timeout) {
245
            pcntl_alarm($timeout);
246
          }
247
          $channel->wait(NULL, FALSE, $timeout);
248
          if ($timeout) {
249
            pcntl_alarm(0);
250
          }
251
252
          // Break on memory_limit reached.
253
          if ($this->hitMemoryLimit($memoryLimit)) {
254
            $this->stopListening();
255
            break;
256
          }
257
258
          // Break on max_iterations reached.
259
          $iteration++;
260
          if ($this->hitIterationsLimit($maxIterations, $iteration)) {
261
            $this->stopListening();
262
          }
263
        }
264
        $this->stopListening();
265
      }
266
      catch (AMQPIOWaitException $e) {
267
        $this->stopListening();
268
        $channel->close();
269
      }
270
      catch (AMQPTimeoutException $e) {
271
        $this->startListening();
272
      }
273
      catch (Exception $e) {
274
        throw new Exception('Could not obtain channel for queue.', 0, $e);
275
      }
276
    }
277
  }
278
279
  /**
280
   * Main logic: consume the specified queue using Queue API.
281
   *
282
   * @param string $queueName
283
   *   The name of the queue to consume.
284
   *
285
   * @throws \Exception
286
   *
287
   * @TODO Probably needs to do more on SuspendQueueException.
288
   */
289
  public function consumeQueueApi(string $queueName) {
290
    $this->preFlightCheck();
291
    $this->startListening();
292
    $worker = $this->getWorker($queueName);
293
    // Allow obtaining a decoder from the worker to have a sane default, while
294
    // being able to override it on service instantiation.
295
    if ($worker instanceof DecoderAwareWorkerInterface && !isset($this->decoder)) {
296
      $this->setDecoder($worker->getDecoder());
297
    }
298
299
    /* @var \Drupal\rabbitmq\Queue\queue $queue */
300
    $queue = $this->queueFactory->get($queueName);
301
    assert($queue instanceof Queue);
302
303
    $maxIterations = $this->getOption(self::OPTION_MAX_ITERATIONS);
304
    $memoryLimit = $this->getOption(self::OPTION_MEMORY_LIMIT);
305
    $timeout = $this->getOption(self::OPTION_TIMEOUT);
306
    if (!empty($timeout)) {
307
      pcntl_signal(SIGALRM, [$this, 'onTimeout']);
308
    }
309
    else {
310
      $timeout = 0;
311
    }
312
313
    $iteration = 0;
314
    $startTime = microtime(TRUE);
315
    do {
316
      $item = NULL;
317
      if ($timeout) {
318
        pcntl_alarm($timeout);
319
        $item = $queue->claimItem();
320
        pcntl_alarm(0);
321
      }
322
      else {
323
        $item = $queue->claimItem();
324
      }
325
326
      // Break on memory_limit reached before process.
327
      if ($this->hitMemoryLimit($memoryLimit)) {
328
        $this->stopListening();
329
        break;
330
      }
331
332
      $currentTime = microtime(TRUE);
333
      // If we did not get an object, do not try to process it.
334
      if (!is_object($item)) {
335
        usleep(10);
336
        // Only loop if the current continuous wait did not exceed timeout.
337
        if ($currentTime > $startTime + $timeout) {
338
          break;
339
        }
340
        else {
341
          continue;
342
        }
343
      }
344
345
      // We got a normal item, try to handle it.
346
      try {
347
        // Call the queue worker.
348
        $worker->processItem($item->data);
349
350
        // Remove the item from the queue.
351
        $queue->deleteItem($item);
352
        $this->logger->debug('(Drush) Item @id acknowledged from @queue', [
353
          '@id' => $item->id,
354
          '@queue' => $queueName,
355
        ]);
356
      }
357
      // Reserved QueueAPI exception: releaseItem and continue work.
358
      catch (RequeueException $e) {
0 ignored issues
show
The class Drupal\Core\Queue\RequeueException does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
359
        $queue->releaseItem($item);
360
        $this->logger->debug('(Drush) Item @id put back on @queue', [
361
          '@id' => $item->id,
362
          '@queue' => $queueName,
363
        ]);
364
      }
365
      // Reserved QueueAPI exception: stop working on this queue.
366
      catch (SuspendQueueException $e) {
0 ignored issues
show
The class Drupal\Core\Queue\SuspendQueueException does not exist. Did you forget a USE statement, or did you not list all dependencies?

Scrutinizer analyzes your composer.json/composer.lock file if available to determine the classes, and functions that are defined by your dependencies.

It seems like the listed class was neither found in your dependencies, nor was it found in the analyzed files in your repository. If you are using some other form of dependency management, you might want to disable this analysis.

Loading history...
367
        $queue->releaseItem($item);
368
        $this->stopListening();
369
      }
370
      // Restart wait period: we handled a valid item.
371
      $startTime = microtime(TRUE);
372
373
      // Break on memory_limit reached after process.
374
      if ($this->hitMemoryLimit($memoryLimit)) {
375
        $this->stopListening();
376
        break;
377
      }
378
379
      // Break on max_iterations reached. Only count actual items.
380
      $iteration++;
381
      if ($this->hitIterationsLimit($maxIterations, $iteration)) {
382
        $this->stopListening();
383
      }
384
    } while ($this->continueListening);
385
  }
386
387
  /**
388
   * Provide a message callback for events.
389
   *
390
   * @param \Drupal\Core\Queue\QueueWorkerInterface $worker
391
   *   The worker plugin.
392
   * @param string $queueName
393
   *   The queue name.
394
   * @param int $timeout
395
   *   The queue wait timeout. Since it is only for queue wait, not worker wait,
396
   *   it has to be reset before starting work, and reinitialized when ending
397
   *   work.
398
   *
399
   * @return \Closure
400
   *   The callback.
401
   */
402
  protected function getCallback(
403
    QueueWorkerInterface $worker,
404
    string $queueName,
405
    int $timeout = 0
406
  ): \Closure {
407
    $callback = function (AMQPMessage $msg) use ($worker, $queueName, $timeout) {
408
      if ($timeout) {
409
        pcntl_alarm(0);
410
      }
411
      $this->logger->info('(Drush) Received queued message: @id', [
412
        '@id' => $msg->delivery_info['delivery_tag'],
413
      ]);
414
415
      try {
416
        // Build the item to pass to the queue worker.
417
        $item = (object) [
418
          'id' => $msg->delivery_info['delivery_tag'],
419
          'data' => $this->decode($msg->body),
420
        ];
421
422
        // Call the queue worker.
423
        $worker->processItem($item->data);
424
425
        // Remove the item from the queue.
426
        $msg->delivery_info['channel']->basic_ack($item->id);
427
        $this->logger->info('(Drush) Item @id acknowledged from @queue', [
428
          '@id' => $item->id,
429
          '@queue' => $queueName,
430
        ]);
431
      }
432
      catch (Exception $e) {
433
        watchdog_exception('rabbitmq', $e);
434
        $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'],
435
          TRUE);
436
      }
437
      if ($timeout) {
438
        pcntl_alarm($timeout);
439
      }
440
    };
441
442
    return $callback;
443
  }
444
445
  /**
446
   * Get the channel instance for a given queue.
447
   *
448
   * Convert the various low-level known exceptions to module-level ones to make
449
   * it easier to catch cleanly.
450
   *
451
   * @param \Drupal\rabbitmq\Queue\Queue $queue
452
   *   The queue from which to obtain a channel.
453
   *
454
   * @return \PhpAmqpLib\Channel\AMQPChannel
455
   *   The channel instance.
456
   *
457
   * @throws \Drupal\rabbitmq\Exception\InvalidArgumentException
458
   * @throws \Drupal\rabbitmq\Exception\OutOfRangeException
459
   * @throws \Drupal\rabbitmq\Exception\RuntimeException
460
   */
461
  protected function getChannel(Queue $queue) {
462
    try {
463
      $channel = $queue->getChannel();
464
    }
465
    // May be thrown by StreamIO::__construct()
466
    catch (\InvalidArgumentException $e) {
467
      throw new InvalidArgumentException($e->getMessage());
468
    }
469
    // May be thrown during getChannel()
470
    catch (AMQPRuntimeException $e) {
471
      throw new RuntimeException($e->getMessage());
472
    }
473
    // May be thrown during getChannel()
474
    catch (AMQPOutOfRangeException $e) {
475
      throw new OutOfRangeException($e->getMessage());
476
    }
477
478
    return $channel;
479
  }
480
481
  /**
482
   * Get a worker instance for a queue name.
483
   *
484
   * @param string $queueName
485
   *   The name of the queue for which to get a worker.
486
   *
487
   * @return \Drupal\Core\Queue\QueueWorkerInterface
488
   *   The worker instance.
489
   *
490
   * @throws \Drupal\rabbitmq\Exception\InvalidWorkerException
491
   */
492
  protected function getWorker(string $queueName): QueueWorkerInterface {
493
    // Before we start listening for messages, make sure the worker is valid.
494
    $worker = $this->workerManager->createInstance($queueName);
495
    if (!($worker instanceof QueueWorkerInterface)) {
0 ignored issues
show
The class Drupal\Core\Queue\QueueWorkerInterface does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
496
      throw new InvalidWorkerException('Invalid worker for requested queue.');
497
    }
498
    return $worker;
499
  }
500
501
  /**
502
   * Did consume() hit the max_iterations limit ?
503
   *
504
   * @param int $maxIterations
505
   *   The value of the max_iterations option.
506
   * @param int $iteration
507
   *   The current number of iterations in the consume() loop.
508
   *
509
   * @return bool
510
   *   Did it ?
511
   */
512
  protected function hitIterationsLimit(int $maxIterations, int $iteration) {
513
    if ($maxIterations > 0 && $maxIterations <= $iteration) {
514
      $this->logger->notice('RabbitMQ worker has reached max number of iterations: @count. Exiting.',
515
        [
516
          '@count' => $maxIterations,
517
        ]);
518
      return TRUE;
519
    }
520
521
    return FALSE;
522
  }
523
524
  /**
525
   * Evaluate whether worker should exit.
526
   *
527
   * If the --memory_limit option is set, check the memory usage
528
   * and exit if the limit has been exceeded or met.
529
   *
530
   * @param int $memoryLimit
531
   *   The maximum memory the service may consume, or -1 for unlimited.
532
   *
533
   * @return bool
534
   *   - TRUE: consume() should stop,
535
   *   - FALSE: consume() may continue.
536
   */
537
  protected function hitMemoryLimit(int $memoryLimit) {
538
    // Evaluate whether worker should exit.
539
    // If the --memory_limit option is set, check the memory usage
540
    // and exit if the limit has been exceeded or met.
541
    if ($memoryLimit > 0) {
542
      $memoryUsage = memory_get_peak_usage() / 1024 / 1024;
543
      if ($memoryUsage >= $memoryLimit) {
544
        $this->logger->notice('RabbitMQ worker has reached or exceeded set memory limit of @limitMB and will now exit.', [
545
          '@limit' => $memoryLimit,
546
        ]);
547
        return TRUE;
548
      }
549
    }
550
551
    return FALSE;
552
  }
553
554
  /**
555
   * Implements hook_requirements().
556
   */
557
  public static function hookRequirements($phase, array &$req) {
558
    $key = QueueBase::MODULE . '-consumer';
559
    $req[$key]['title'] = t('RabbitMQ Consumer');
560
    $options = [
561
      ':ext' => Url::fromUri('http://php.net/pcntl')->toString(),
562
      '%option' => static::OPTION_TIMEOUT,
563
    ];
564
    if (!extension_loaded(static::EXTENSION_PCNTL)) {
565
      $req[$key]['description'] = t('Extension <a href=":ext">PCNTL</a> not present in PHP. Option  %option is not available in the RabbitMQ consumer.', $options);
566
      $req[$key]['severity'] = REQUIREMENT_WARNING;
567
    }
568
    else {
569
      $req[$key]['description'] = t('Extension <a href=":ext">PCNTL</a> is present in PHP. Option   %option is available in the RabbitMQ consumer.', $options);
570
      $req[$key]['severity'] = REQUIREMENT_OK;
571
    }
572
  }
573
574
  /**
575
   * Ensures options are consistent with configuration.
576
   *
577
   * @throws \Drupal\rabbitmq\Exception\InvalidArgumentException
578
   *   Options are not compatible with configuration.
579
   */
580
  protected function preFlightCheck() {
581
    if ($this->pfcOk) {
582
      return;
583
    }
584
    $this->pfcOk = FALSE;
585
    $timeout = $this->getOption(self::OPTION_TIMEOUT);
586
    if (!empty($timeout) && !extension_loaded(static::EXTENSION_PCNTL)) {
587
      $message = $this->t('Option @option is not available without the @ext extension.', [
588
        '@option' => static::OPTION_TIMEOUT,
589
        '@ext' => static::EXTENSION_PCNTL,
590
      ]);
591
      throw new InvalidArgumentException($message);
592
    }
593
    $this->pfcOk = TRUE;
594
  }
595
596
  /**
597
   * Shutdown a queue.
598
   *
599
   * @param string $queueName
600
   *   The name of the queue, also the name of the QueueWorker plugin processing
601
   *   its items.
602
   */
603
  public function shutdownQueue(string $queueName) {
604
    $queue = $this->queueFactory->get($queueName);
605
    if ($queue instanceof Queue) {
606
      $queue->shutdown();
607
    }
608
  }
609
610
  /**
611
   * Register a decoder for message payloads.
612
   *
613
   * @param callable $decoder
614
   *   The decoder.
615
   */
616
  public function setDecoder(callable $decoder) {
617
    $this->decoder = $decoder;
618
  }
619
620
  /**
621
   * Register a method able to get option values.
622
   *
623
   * @param callable $optionGetter
624
   *   The getter.
625
   */
626
  public function setOptionGetter(callable $optionGetter) {
627
    $this->optionGetter = $optionGetter;
628
  }
629
630
  /**
631
   * Mark listening as active.
632
   */
633
  public function startListening() {
634
    $this->continueListening = TRUE;
635
  }
636
637
  /**
638
   * Mark listening as inactive.
639
   */
640
  public function stopListening() {
641
    $this->continueListening = FALSE;
642
  }
643
644
}
645