Passed
Push — master ( 554209...998883 )
by Mihai
06:53
created

BatchConsumerCommand::execute()   B

Complexity

Conditions 7
Paths 18

Size

Total Lines 28
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 56

Importance

Changes 0
Metric Value
cc 7
eloc 14
c 0
b 0
f 0
nc 18
nop 2
dl 0
loc 28
ccs 0
cts 18
cp 0
crap 56
rs 8.8333
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Command;
4
5
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumer;
6
use PhpAmqpLib\Exception\AMQPTimeoutException;
7
use Symfony\Component\Console\Input\InputArgument;
8
use Symfony\Component\Console\Input\InputInterface;
9
use Symfony\Component\Console\Input\InputOption;
10
use Symfony\Component\Console\Output\OutputInterface;
11
12
final class BatchConsumerCommand extends BaseRabbitMqCommand
13
{
14
    /**
15
     * @var BatchConsumer
16
     */
17
    protected $consumer;
18
19
    /** @var int */
20
    protected $amount;
21
22
    public function stopConsumer()
23
    {
24
        if ($this->consumer instanceof BatchConsumer) {
0 ignored issues
show
introduced by
$this->consumer is always a sub-type of OldSound\RabbitMqBundle\RabbitMq\BatchConsumer.
Loading history...
25
            // Process current message, then halt consumer
26
            $this->consumer->forceStopConsumer();
27
28
            // Halt consumer if waiting for a new message from the queue
29
            try {
30
                $this->consumer->stopConsuming();
31
            } catch (AMQPTimeoutException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
32
        }
33
    }
34
35
    protected function configure()
36
    {
37
        parent::configure();
38
39
        $this
40
            ->setName('rabbitmq:batch:consumer')
41
            ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
42
            ->addOption('batches', 'b', InputOption::VALUE_OPTIONAL, 'Number of batches to consume', 0)
43
            ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
44
            ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null)
45
            ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
46
            ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
47
            ->setDescription('Executes a Batch Consumer');
48
        ;
49
    }
50
51
    /**
52
     * Executes the current command.
53
     *
54
     * @param   InputInterface      $input      An InputInterface instance
55
     * @param   OutputInterface     $output     An OutputInterface instance
56
     *
57
     * @return  integer                         0 if everything went fine, or an error code
58
     *
59
     * @throws  \InvalidArgumentException       When the number of batches to consume is less than 0
60
     * @throws  \BadFunctionCallException       When the pcntl is not installed and option -s is true
61
     */
62
    protected function execute(InputInterface $input, OutputInterface $output)
63
    {
64
        if (defined('AMQP_WITHOUT_SIGNALS') === false) {
65
            define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
66
        }
67
68
        if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) {
69
            if (!function_exists('pcntl_signal')) {
70
                throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called.");
71
            }
72
73
            pcntl_signal(SIGTERM, array(&$this, 'stopConsumer'));
74
            pcntl_signal(SIGINT, array(&$this, 'stopConsumer'));
75
        }
76
77
        if (defined('AMQP_DEBUG') === false) {
78
            define('AMQP_DEBUG', (bool) $input->getOption('debug'));
79
        }
80
81
        $this->amount = (int) $input->getOption('batches');
82
83
        if (0 > $this->amount) {
84
            throw new \InvalidArgumentException("The -b option should be null or greater than 0");
85
        }
86
87
        $this->initConsumer($input);
88
89
        return $this->consumer->consume($this->amount);
0 ignored issues
show
Bug introduced by
The method consume() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

89
        return $this->consumer->/** @scrutinizer ignore-call */ consume($this->amount);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
90
    }
91
92
    /**
93
     * @param   InputInterface  $input
94
     */
95
    protected function initConsumer(InputInterface $input)
96
    {
97
        $this->consumer = $this->getContainer()
98
            ->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
0 ignored issues
show
Bug introduced by
It seems like $input->getArgument('name') can also be of type string[]; however, parameter $args of sprintf() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

98
            ->get(sprintf($this->getConsumerService(), /** @scrutinizer ignore-type */ $input->getArgument('name')));
Loading history...
99
100
        if (null !== $input->getOption('memory-limit') &&
101
            ctype_digit((string) $input->getOption('memory-limit')) &&
102
            (int) $input->getOption('memory-limit') > 0
103
        ) {
104
            $this->consumer->setMemoryLimit($input->getOption('memory-limit'));
105
        }
106
        $this->consumer->setRoutingKey($input->getOption('route'));
107
    }
108
109
    /**
110
     * @return  string
111
     */
112
    protected function getConsumerService()
113
    {
114
        return 'old_sound_rabbit_mq.%s_batch';
115
    }
116
}
117