Passed
Pull Request — master (#38)
by Aleksandr
07:19 queued 02:12
created

GroupConsumerCommand::configure()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 17
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 12
c 1
b 0
f 1
nc 1
nop 0
dl 0
loc 17
ccs 0
cts 14
cp 0
crap 2
rs 9.8666
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Command;
4
5
use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer;
6
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use Symfony\Component\Console\Command\Command;
10
use Symfony\Component\Console\Exception\InvalidArgumentException;
11
use Symfony\Component\Console\Exception\InvalidOptionException;
12
use Symfony\Component\Console\Input\InputArgument;
13
use Symfony\Component\Console\Input\InputInterface;
14
use Symfony\Component\Console\Input\InputOption;
15
use Symfony\Component\Console\Output\OutputInterface;
16
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
17
18
class GroupConsumerCommand extends BaseRabbitMqCommand
19
{
20
    /** @var iterable|Consumer[][] */
21
    protected $consumers;
22
23
    public function __construct(
24
        iterable $consumers,
25
        iterable $multiConsumers,
26
        iterable $dynamicConsumers,
27
        iterable $batchConsumers,
28
        iterable $anonConsumers
29
    ) {
30
        parent::__construct();
31
        $this->consumers = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array($consumers, $multi...sumers, $anonConsumers) of type array<integer,iterable> is incompatible with the declared type array<mixed,OldSound\Rab...Mq\Consumer[]>|iterable of property $consumers.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
32
            $consumers,
33
            $multiConsumers,
34
            $dynamicConsumers,
35
            $batchConsumers,
36
            $anonConsumers
37
        ];
38
    }
39
40
    protected function configure()
41
    {
42
        $this
43
            ->setName('rabbitmq:group:consumer')
44
            ->setDescription('Synchronous execute combined multiple consumers')
45
            ->addArgument('name', InputArgument::OPTIONAL, 'Consumer group name', 'default')
46
            // TODO ?! ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0)
47
            // ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
48
            ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process (MB)', null)
49
            ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
50
            ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
51
52
            ->addOption('connection', 'c', InputOption::VALUE_OPTIONAL, 'Rabbitmq connection name', 'default')
53
            ->addOption('timeout', 't', InputOption::VALUE_OPTIONAL, 'Timeout', 500)
54
            ->addOption('qos-prefetch-size', null, InputOption::VALUE_OPTIONAL, 'Qos prefetch size', 0)
55
            ->addOption('qos-prefetch-count', null,InputOption::VALUE_OPTIONAL, 'Qos prefetch count', 10)
56
            ->addOption('qos-global', null,InputOption::VALUE_OPTIONAL, 'Qos global', true)
57
        ;
58
    }
59
60
    /**
61
     * @todo remove duplicate code from BaseConsumerCommand
62
     * extands BaseConsumerCommand, split code to separate GroupConsumer class
63
     *
64
     * @param InputInterface $input
65
     * @param OutputInterface $output
66
     * @return int
67
     * @throws \ErrorException
68
     */
69
    protected function execute(InputInterface $input, OutputInterface $output)
70
    {
71
        $group = $input->getArgument('name');
72
73
        /** @var BaseConsumer[] $consumers */
74
        $consumers = [];
75
76
        foreach ($this->consumers as $consumersByType) {
77
            foreach ($consumersByType as $consumer) {
78
                if (in_array($group, $consumer->getGroups(), true)) {
79
                    $consumers[] = $consumer;
80
                }
81
            }
82
        }
83
84
        if ([] === $consumers) {
85
            throw new InvalidArgumentException(sprintf('No consumers with %s group', $group));
0 ignored issues
show
Bug introduced by
It seems like $group can also be of type string[]; however, parameter $args of sprintf() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

85
            throw new InvalidArgumentException(sprintf('No consumers with %s group', /** @scrutinizer ignore-type */ $group));
Loading history...
86
        }
87
88
        $connectionName = $input->getOption('connection');
89
        $connectionServiceId = sprintf('old_sound_rabbit_mq.connection.%s', $connectionName);
90
        if (false === $this->container->has($connectionServiceId)) {
91
            throw new InvalidOptionException(sprintf('Rabbitmq connection %s is not defined', $connectionName));
92
        };
93
94
95
        if (defined('AMQP_WITHOUT_SIGNALS') === false) {
96
            define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
97
        }
98
99
        if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) {
100
            if (!function_exists('pcntl_signal')) {
101
                throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called.");
102
            }
103
104
            $stopConsumer = function () use ($consumers) {
105
                foreach ($consumers as $consumer) {
106
                    $consumer->forceStopConsumer();
107
108
                    // Halt consumer if waiting for a new message from the queue
109
                    try {
110
                        $consumer->stopConsuming();
111
                    } catch (AMQPTimeoutException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
112
                }
113
            };
114
115
            pcntl_signal(SIGTERM, $stopConsumer);
116
            pcntl_signal(SIGINT, $stopConsumer);
117
            pcntl_signal(SIGHUP, $stopConsumer);
118
        }
119
120
        if (defined('AMQP_DEBUG') === false) {
121
            define('AMQP_DEBUG', (bool) $input->getOption('debug'));
122
        }
123
124
125
126
        /** @var AbstractConnection $connection */
127
        $connection = $this->container->get($connectionServiceId);
128
        $channel = $connection->channel();
129
130
        $channel->basic_qos(
131
            $input->getOption('qos-prefetch-size'),
132
            $input->getOption('qos-prefetch-count'),
133
            $input->getOption('qos-global')
134
        );
135
136
        if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) {
137
            foreach ($consumers as $i => $consumer) {
138
                $consumer->setMemoryLimit($input->getOption('memory-limit'));
139
            }
140
        }
141
142
        foreach ($consumers as $i => $consumer) {
143
            $consumer->setChannel($channel);
144
            $consumer->setConsumerTag(sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $i));
145
            $consumer->setupConsumer();
146
        }
147
        
148
        while (count($channel->callbacks)) {
149
            $channel->wait(null, false, $input->getOption('timeout'));
150
        }
151
152
        return 0;
153
    }
154
}