Issues (48)

Command/BatchConsumerCommand.php (3 issues)

1
<?php
2
3
namespace OldSound\RabbitMqBundle\Command;
4
5
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumer;
6
use PhpAmqpLib\Exception\AMQPTimeoutException;
7
use Symfony\Component\Console\Input\InputArgument;
8
use Symfony\Component\Console\Input\InputInterface;
9
use Symfony\Component\Console\Input\InputOption;
10
use Symfony\Component\Console\Output\OutputInterface;
11
12
final class BatchConsumerCommand extends BaseRabbitMqCommand
13
{
14
    /**
15
     * @var BatchConsumer
16
     */
17
    protected $consumer;
18
19
    public function stopConsumer()
20
    {
21
        if ($this->consumer instanceof BatchConsumer) {
0 ignored issues
show
$this->consumer is always a sub-type of OldSound\RabbitMqBundle\RabbitMq\BatchConsumer.
Loading history...
22
            // Process current message, then halt consumer
23
            $this->consumer->forceStopConsumer();
24
25
            // Halt consumer if waiting for a new message from the queue
26
            try {
27
                $this->consumer->stopConsuming();
28
            } catch (AMQPTimeoutException $e) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
29
            }
30
        }
31
    }
32
33
    protected function configure(): void
34
    {
35
        parent::configure();
36
37
        $this
38
            ->setName('rabbitmq:batch:consumer')
39
            ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
40
            ->addOption('batches', 'b', InputOption::VALUE_OPTIONAL, 'Number of batches to consume', '0')
41
            ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
42
            ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process')
43
            ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
44
            ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
45
            ->setDescription('Executes a Batch Consumer');
46
    }
47
48
    /**
49
     * Executes the current command.
50
     *
51
     * @param   InputInterface      $input      An InputInterface instance
52
     * @param   OutputInterface     $output     An OutputInterface instance
53
     *
54
     * @return  integer                         0 if everything went fine, or an error code
55
     *
56
     * @throws  \InvalidArgumentException       When the number of batches to consume is less than 0
57
     * @throws  \BadFunctionCallException       When the pcntl is not installed and option -s is true
58
     */
59
    protected function execute(InputInterface $input, OutputInterface $output): int
60
    {
61
        if (defined('AMQP_WITHOUT_SIGNALS') === false) {
62
            define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
63
        }
64
65
        if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) {
66
            if (!function_exists('pcntl_signal')) {
67
                throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called.");
68
            }
69
70
            pcntl_signal(SIGTERM, [&$this, 'stopConsumer']);
71
            pcntl_signal(SIGINT, [&$this, 'stopConsumer']);
72
        }
73
74
        if (defined('AMQP_DEBUG') === false) {
75
            define('AMQP_DEBUG', (bool) $input->getOption('debug'));
76
        }
77
78
        $batchAmountTarget = (int)$input->getOption('batches');
79
        if (0 > $batchAmountTarget) {
80
            throw new \InvalidArgumentException("The -b option should be greater than 0");
81
        }
82
83
        $this->initConsumer($input);
84
85
        return $this->consumer->consume($batchAmountTarget);
0 ignored issues
show
The method consume() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

85
        return $this->consumer->/** @scrutinizer ignore-call */ consume($batchAmountTarget);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
86
    }
87
88
    /**
89
     * @param   InputInterface  $input
90
     */
91
    protected function initConsumer(InputInterface $input)
92
    {
93
        $this->consumer = $this->getContainer()
94
            ->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
95
96
        if (null !== $input->getOption('memory-limit') &&
97
            ctype_digit((string) $input->getOption('memory-limit')) &&
98
            (int) $input->getOption('memory-limit') > 0
99
        ) {
100
            $this->consumer->setMemoryLimit($input->getOption('memory-limit'));
101
        }
102
        $this->consumer->setRoutingKey($input->getOption('route'));
103
    }
104
105
    /**
106
     * @return  string
107
     */
108
    protected function getConsumerService()
109
    {
110
        return 'old_sound_rabbit_mq.%s_batch';
111
    }
112
}
113