BatchConsumerCommand::execute()   B
last analyzed

Complexity

Conditions 7
Paths 18

Size

Total Lines 28
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 56

Importance

Changes 0
Metric Value
cc 7
eloc 14
nc 18
nop 2
dl 0
loc 28
ccs 0
cts 21
cp 0
crap 56
rs 8.8333
c 0
b 0
f 0
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Command;
4
5
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumer;
6
use PhpAmqpLib\Exception\AMQPTimeoutException;
7
use Symfony\Component\Console\Input\InputArgument;
8
use Symfony\Component\Console\Input\InputInterface;
9
use Symfony\Component\Console\Input\InputOption;
10
use Symfony\Component\Console\Output\OutputInterface;
11
12
final class BatchConsumerCommand extends BaseRabbitMqCommand
13
{
14
    /**
15
     * @var BatchConsumer
16
     */
17
    protected $consumer;
18
19
    public function stopConsumer()
20
    {
21
        if ($this->consumer instanceof BatchConsumer) {
0 ignored issues
show
introduced by
$this->consumer is always a sub-type of OldSound\RabbitMqBundle\RabbitMq\BatchConsumer.
Loading history...
22
            // Process current message, then halt consumer
23
            $this->consumer->forceStopConsumer();
24
25
            // Halt consumer if waiting for a new message from the queue
26
            try {
27
                $this->consumer->stopConsuming();
28
            } catch (AMQPTimeoutException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
29
        }
30
    }
31
32
    protected function configure()
33
    {
34
        parent::configure();
35
36
        $this
37
            ->setName('rabbitmq:batch:consumer')
38
            ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
39
            ->addOption('batches', 'b', InputOption::VALUE_OPTIONAL, 'Number of batches to consume', 0)
40
            ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
41
            ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null)
42
            ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
43
            ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
44
            ->setDescription('Executes a Batch Consumer');
45
        ;
46
    }
47
48
    /**
49
     * Executes the current command.
50
     *
51
     * @param   InputInterface      $input      An InputInterface instance
52
     * @param   OutputInterface     $output     An OutputInterface instance
53
     *
54
     * @return  integer                         0 if everything went fine, or an error code
55
     *
56
     * @throws  \InvalidArgumentException       When the number of batches to consume is less than 0
57
     * @throws  \BadFunctionCallException       When the pcntl is not installed and option -s is true
58
     */
59
    protected function execute(InputInterface $input, OutputInterface $output)
60
    {
61
        if (defined('AMQP_WITHOUT_SIGNALS') === false) {
62
            define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
63
        }
64
65
        if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) {
66
            if (!function_exists('pcntl_signal')) {
67
                throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called.");
68
            }
69
70
            pcntl_signal(SIGTERM, array(&$this, 'stopConsumer'));
71
            pcntl_signal(SIGINT, array(&$this, 'stopConsumer'));
72
        }
73
74
        if (defined('AMQP_DEBUG') === false) {
75
            define('AMQP_DEBUG', (bool) $input->getOption('debug'));
76
        }
77
78
        $batchAmountTarget = (int) $input->getOption('batches');
79
80
        if (0 > $batchAmountTarget) {
81
            throw new \InvalidArgumentException("The -b option should be greater than 0");
82
        }
83
84
        $this->initConsumer($input);
85
86
        return $this->consumer->consume($batchAmountTarget);
0 ignored issues
show
Bug introduced by
The method consume() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

86
        return $this->consumer->/** @scrutinizer ignore-call */ consume($batchAmountTarget);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
87
    }
88
89
    /**
90
     * @param   InputInterface  $input
91
     */
92
    protected function initConsumer(InputInterface $input)
93
    {
94
        $this->consumer = $this->getContainer()
95
            ->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
0 ignored issues
show
Bug introduced by
It seems like $input->getArgument('name') can also be of type string[]; however, parameter $values of sprintf() does only seem to accept double|integer|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

95
            ->get(sprintf($this->getConsumerService(), /** @scrutinizer ignore-type */ $input->getArgument('name')));
Loading history...
96
97
        if (null !== $input->getOption('memory-limit') &&
98
            ctype_digit((string) $input->getOption('memory-limit')) &&
99
            (int) $input->getOption('memory-limit') > 0
100
        ) {
101
            $this->consumer->setMemoryLimit($input->getOption('memory-limit'));
102
        }
103
        $this->consumer->setRoutingKey($input->getOption('route'));
104
    }
105
106
    /**
107
     * @return  string
108
     */
109
    protected function getConsumerService()
110
    {
111
        return 'old_sound_rabbit_mq.%s_batch';
112
    }
113
}
114