Issues (73)

Command/BaseConsumerCommand.php (2 issues)

1
<?php
2
3
namespace OldSound\RabbitMqBundle\Command;
4
5
use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer as Consumer;
6
use PhpAmqpLib\Exception\AMQPTimeoutException;
7
use Symfony\Component\Console\Input\InputArgument;
8
use Symfony\Component\Console\Input\InputInterface;
9
use Symfony\Component\Console\Input\InputOption;
10
use Symfony\Component\Console\Output\OutputInterface;
11
12
abstract class BaseConsumerCommand extends BaseRabbitMqCommand
13
{
14
    protected $consumer;
15
16
    protected $amount;
17
18
    abstract protected function getConsumerService();
19
20
    public function stopConsumer()
21
    {
22
        if ($this->consumer instanceof Consumer) {
23
            // Process current message, then halt consumer
24
            $this->consumer->forceStopConsumer();
25
26
            // Halt consumer if waiting for a new message from the queue
27
            try {
28
                $this->consumer->stopConsuming();
29
            } catch (AMQPTimeoutException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
30
        }
31
    }
32
33
    public function restartConsumer()
34
    {
35
        // TODO: Implement restarting of consumer
36
    }
37
38 3
    protected function configure()
39
    {
40 3
        parent::configure();
41
42
        $this
43 3
            ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
44 3
            ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0)
45 3
            ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
46 3
            ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process (MB)', null)
47 3
            ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
48 3
            ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
49
        ;
50 3
    }
51
52
    /**
53
     * Executes the current command.
54
     *
55
     * @param InputInterface  $input  An InputInterface instance
56
     * @param OutputInterface $output An OutputInterface instance
57
     *
58
     * @return integer 0 if everything went fine, or an error code
59
     *
60
     * @throws \InvalidArgumentException When the number of messages to consume is less than 0
61
     * @throws \BadFunctionCallException When the pcntl is not installed and option -s is true
62
     */
63
    protected function execute(InputInterface $input, OutputInterface $output)
64
    {
65
        if (defined('AMQP_WITHOUT_SIGNALS') === false) {
66
            define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
67
        }
68
69
        if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) {
70
            if (!function_exists('pcntl_signal')) {
71
                throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called.");
72
            }
73
74
            pcntl_signal(SIGTERM, array(&$this, 'stopConsumer'));
75
            pcntl_signal(SIGINT, array(&$this, 'stopConsumer'));
76
            pcntl_signal(SIGHUP, array(&$this, 'restartConsumer'));
77
        }
78
79
        if (defined('AMQP_DEBUG') === false) {
80
            define('AMQP_DEBUG', (bool) $input->getOption('debug'));
81
        }
82
83
        $this->amount = $input->getOption('messages');
84
85
        if (0 > (int) $this->amount) {
86
            throw new \InvalidArgumentException("The -m option should be null or greater than 0");
87
        }
88
        $this->initConsumer($input);
89
90
        return $this->consumer->consume($this->amount);
0 ignored issues
show
The method consume() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

90
        return $this->consumer->/** @scrutinizer ignore-call */ consume($this->amount);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
91
    }
92
93
    protected function initConsumer($input)
94
    {
95
        $this->consumer = $this->getContainer()
96
                ->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
97
98
        if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) {
99
            $this->consumer->setMemoryLimit($input->getOption('memory-limit'));
100
        }
101
        $this->consumer->setRoutingKey($input->getOption('route'));
102
    }
103
}
104