eMAGTechLabs /
RabbitMqBundle
| 1 | <?php |
||||
| 2 | |||||
| 3 | namespace OldSound\RabbitMqBundle\Command; |
||||
| 4 | |||||
| 5 | use OldSound\RabbitMqBundle\Consumer\ConsumersRegistry; |
||||
|
0 ignored issues
–
show
|
|||||
| 6 | use OldSound\RabbitMqBundle\Declarations\DeclarationsRegistry; |
||||
| 7 | 1 | use OldSound\RabbitMqBundle\Declarations\Declarator; |
|||
| 8 | use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent; |
||||
|
0 ignored issues
–
show
The type
OldSound\RabbitMqBundle\...rProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?
The issue could also be caused by a filter entry in the build configuration.
If the path has been excluded in your configuration, e.g. filter:
dependency_paths: ["lib/*"]
For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths Loading history...
|
|||||
| 9 | 1 | use OldSound\RabbitMqBundle\Event\AfterProcessingMessagesEvent; |
|||
| 10 | 1 | use OldSound\RabbitMqBundle\Event\OnConsumeEvent; |
|||
| 11 | 1 | use OldSound\RabbitMqBundle\EventListener\MemoryLimitListener; |
|||
| 12 | 1 | use OldSound\RabbitMqBundle\EventListener\PcntlSignalDispatchSubscriber; |
|||
| 13 | use OldSound\RabbitMqBundle\RabbitMq\Consumer; |
||||
| 14 | use PhpAmqpLib\Exception\AMQPTimeoutException; |
||||
| 15 | use Symfony\Component\Console\Command\Command; |
||||
| 16 | use Symfony\Component\Console\Exception\InvalidArgumentException; |
||||
| 17 | use Symfony\Component\Console\Input\InputArgument; |
||||
| 18 | use Symfony\Component\Console\Input\InputInterface; |
||||
| 19 | use Symfony\Component\Console\Input\InputOption; |
||||
| 20 | use Symfony\Component\Console\Logger\ConsoleLogger; |
||||
| 21 | use Symfony\Component\Console\Output\OutputInterface; |
||||
| 22 | use Symfony\Component\DependencyInjection\ContainerAwareTrait; |
||||
| 23 | |||||
| 24 | class ConsumerCommand extends Command |
||||
| 25 | { |
||||
| 26 | use ContainerAwareTrait; |
||||
| 27 | /** @var iterable|Consumer[] */ |
||||
| 28 | protected $consumers; |
||||
| 29 | |||||
| 30 | protected function configure() |
||||
| 31 | { |
||||
| 32 | $this |
||||
| 33 | ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name') |
||||
| 34 | ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0) |
||||
| 35 | ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process (MB)', null) |
||||
| 36 | ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging') |
||||
| 37 | ->addOption('skip-declare', null, InputOption::VALUE_NONE, 'Skip declare exhanges, queues and bindings') |
||||
| 38 | ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals') |
||||
| 39 | ; |
||||
| 40 | $this->setDescription('Executes a consumer'); |
||||
| 41 | $this->setName('rabbitmq:consumer'); |
||||
| 42 | } |
||||
| 43 | |||||
| 44 | /** |
||||
| 45 | * Executes the current command. |
||||
| 46 | * |
||||
| 47 | * @param InputInterface $input An InputInterface instance |
||||
| 48 | * @param OutputInterface $output An OutputInterface instance |
||||
| 49 | * |
||||
| 50 | * @return integer 0 if everything went fine, or an error code |
||||
| 51 | * |
||||
| 52 | * @throws \InvalidArgumentException When the number of messages to consume is less than 0 |
||||
| 53 | * @throws \BadFunctionCallException When the pcntl is not installed and option -s is true |
||||
| 54 | */ |
||||
| 55 | protected function execute(InputInterface $input, OutputInterface $output) |
||||
| 56 | { |
||||
| 57 | $consumerName = $input->getArgument('name'); |
||||
| 58 | $alias = sprintf('old_sound_rabbit_mq.consumer.%s', $consumerName); |
||||
|
0 ignored issues
–
show
It seems like
$consumerName can also be of type string[]; however, parameter $args of sprintf() does only seem to accept string, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 59 | if (!$this->container->has($alias)) { |
||||
| 60 | $containerNames = $this->container->getParameter('old_sound_rabbit_mq.allowed_consumer_names'); |
||||
| 61 | throw new InvalidArgumentException(sprintf('Consumer %s is undefined. Allowed ones: %s', $consumerName, join(', ', $containerNames))); |
||||
| 62 | } |
||||
| 63 | |||||
| 64 | /** @var Consumer $consumer */ |
||||
| 65 | $consumer = $this->container->get($alias); |
||||
| 66 | |||||
| 67 | if ( |
||||
| 68 | !is_null($input->getOption('memory-limit')) && |
||||
| 69 | ctype_digit((string) $input->getOption('memory-limit')) && |
||||
| 70 | $input->getOption('memory-limit') > 0 |
||||
| 71 | ) { |
||||
| 72 | $consumer->getEventDispatcher()->addListener( |
||||
| 73 | AfterProcessingMessagesEvent::NAME, |
||||
| 74 | new MemoryLimitListener($input->getOption('memory-limit')) |
||||
| 75 | ); |
||||
| 76 | } |
||||
| 77 | |||||
| 78 | if (defined('AMQP_WITHOUT_SIGNALS') === false) { |
||||
| 79 | define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals')); |
||||
| 80 | } |
||||
| 81 | |||||
| 82 | if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) { |
||||
| 83 | if (!function_exists('pcntl_signal')) { |
||||
| 84 | throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called."); |
||||
| 85 | } |
||||
| 86 | if (!function_exists('pcntl_signal_dispatch')) { |
||||
| 87 | throw new \BadFunctionCallException("Function 'pcntl_signal_dispatch' is referenced in the php.ini 'disable_functions' and can't be called."); |
||||
| 88 | } |
||||
| 89 | $consumer->getEventDispatcher()->addSubscriber(new PcntlSignalDispatchSubscriber($consumer)); |
||||
| 90 | } |
||||
| 91 | |||||
| 92 | if (defined('AMQP_DEBUG') === false) { // TODO remove?! |
||||
| 93 | define('AMQP_DEBUG', (bool) $input->getOption('debug')); |
||||
| 94 | } |
||||
| 95 | |||||
| 96 | $this->amount = $input->getOption('messages'); |
||||
|
0 ignored issues
–
show
|
|||||
| 97 | |||||
| 98 | if (0 > (int) $this->amount) { |
||||
| 99 | throw new \InvalidArgumentException("The -m option should be null or greater than 0"); |
||||
| 100 | } |
||||
| 101 | |||||
| 102 | if (!$input->getOption('skip-declare')) { |
||||
| 103 | $this->declareForConsumer($consumer, $output); |
||||
| 104 | } |
||||
| 105 | |||||
| 106 | return $consumer->consume($this->amount); |
||||
| 107 | } |
||||
| 108 | |||||
| 109 | private function declareForConsumer(Consumer $consumer, OutputInterface $output) |
||||
| 110 | { |
||||
| 111 | $declarator = new Declarator($consumer->getChannel()); |
||||
| 112 | $declarator->setLogger( |
||||
| 113 | new ConsoleLogger($output) |
||||
| 114 | ); |
||||
| 115 | $declarationRegistry = $this->container->get('old_sound_rabbit_mq.declaration_registry'); |
||||
| 116 | foreach($consumer->getQueueConsumings() as $queueConsuming) { |
||||
| 117 | $declarator->declareForQueueDeclaration($queueConsuming->queueName, $declarationRegistry); |
||||
|
0 ignored issues
–
show
It seems like
$declarationRegistry can also be of type null; however, parameter $declarationsRegistry of OldSound\RabbitMqBundle\...reForQueueDeclaration() does only seem to accept OldSound\RabbitMqBundle\...ns\DeclarationsRegistry, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 118 | } |
||||
| 119 | } |
||||
| 120 | } |
||||
| 121 |
The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g.
excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths