Issues (48)

Command/BaseConsumerCommand.php (2 issues)

1
<?php
2
3
namespace OldSound\RabbitMqBundle\Command;
4
5
use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer as Consumer;
6
use PhpAmqpLib\Exception\AMQPTimeoutException;
7
use Symfony\Component\Console\Input\InputArgument;
8
use Symfony\Component\Console\Input\InputInterface;
9
use Symfony\Component\Console\Input\InputOption;
10
use Symfony\Component\Console\Output\OutputInterface;
11
12
abstract class BaseConsumerCommand extends BaseRabbitMqCommand
13
{
14
    protected $consumer;
15
16
    /** @var int */
17
    protected $amount;
18
19
    abstract protected function getConsumerService();
20
21
    public function stopConsumer()
22
    {
23
        if ($this->consumer instanceof Consumer) {
24
            // Process current message, then halt consumer
25
            $this->consumer->forceStopConsumer();
26
27
            // Halt consumer if waiting for a new message from the queue
28
            try {
29
                $this->consumer->stopConsuming();
30
            } catch (AMQPTimeoutException $e) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
31
            }
32
        }
33
    }
34
35
    public function restartConsumer()
36
    {
37
        // TODO: Implement restarting of consumer
38
    }
39
40 3
    protected function configure(): void
41
    {
42 3
        parent::configure();
43
44
        $this
45 3
            ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
46 3
            ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', '0')
47 3
            ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
48 3
            ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process (MB)')
49 3
            ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
50 3
            ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
51
        ;
52 3
    }
53
54
    protected function initialize(InputInterface $input, OutputInterface $output): void
55
    {
56
        $this->amount = (int)$input->getOption('messages');
57
        if (0 > $this->amount) {
58
            throw new \InvalidArgumentException("The -m option should be null or greater than 0");
59
        }
60
    }
61
62
    /**
63
     * Executes the current command.
64
     *
65
     * @param InputInterface  $input  An InputInterface instance
66
     * @param OutputInterface $output An OutputInterface instance
67
     *
68
     * @return integer 0 if everything went fine, or an error code
69
     *
70
     * @throws \InvalidArgumentException When the number of messages to consume is less than 0
71
     * @throws \BadFunctionCallException When the pcntl is not installed and option -s is true
72
     */
73
    protected function execute(InputInterface $input, OutputInterface $output): int
74
    {
75
        if (defined('AMQP_WITHOUT_SIGNALS') === false) {
76
            define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
77
        }
78
79
        if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) {
80
            if (!function_exists('pcntl_signal')) {
81
                throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called.");
82
            }
83
84
            pcntl_signal(SIGTERM, [&$this, 'stopConsumer']);
85
            pcntl_signal(SIGINT, [&$this, 'stopConsumer']);
86
            pcntl_signal(SIGHUP, [&$this, 'restartConsumer']);
87
        }
88
89
        if (defined('AMQP_DEBUG') === false) {
90
            define('AMQP_DEBUG', (bool) $input->getOption('debug'));
91
        }
92
93
        $this->initConsumer($input);
94
95
        return $this->consumer->consume($this->amount);
0 ignored issues
show
The method consume() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

95
        return $this->consumer->/** @scrutinizer ignore-call */ consume($this->amount);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
96
    }
97
98
    protected function initConsumer(InputInterface $input)
99
    {
100
        $this->consumer = $this->getContainer()
101
                ->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
102
103
        if ($input->hasOption('memory-limit')) {
104
            $memoryLimit = (int)$input->getOption('memory-limit');
105
            if ($memoryLimit > 0) {
106
                $this->consumer->setMemoryLimit($memoryLimit);
107
            }
108
        }
109
        $this->consumer->setRoutingKey($input->getOption('route'));
110
    }
111
}
112