Test Failed
Pull Request — master (#38)
by Aleksandr
05:24 queued 02:25
created

GroupConsumerCommand   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 86
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
eloc 43
c 1
b 0
f 1
dl 0
loc 86
rs 10
wmc 10

3 Methods

Rating   Name   Duplication   Size   Complexity  
B execute() 0 47 8
A __construct() 0 14 1
A configure() 0 15 1
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Command;
4
5
use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer;
6
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use Symfony\Component\Console\Command\Command;
9
use Symfony\Component\Console\Exception\InvalidArgumentException;
10
use Symfony\Component\Console\Exception\InvalidOptionException;
11
use Symfony\Component\Console\Input\InputArgument;
12
use Symfony\Component\Console\Input\InputInterface;
13
use Symfony\Component\Console\Input\InputOption;
14
use Symfony\Component\Console\Output\OutputInterface;
15
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
16
17
class GroupConsumerCommand extends BaseRabbitMqCommand
18
{
19
    /** @var iterable|Consumer[][] */
20
    protected $consumers;
21
22
    public function __construct(
23
        iterable $consumers,
24
        iterable $multiConsumers,
25
        iterable $dynamicConsumers,
26
        iterable $batchConsumers,
27
        iterable $anonConsumers
28
    ) {
29
        parent::__construct();
30
        $this->consumers = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array($consumers, $multi...sumers, $anonConsumers) of type array<integer,iterable> is incompatible with the declared type array<mixed,OldSound\Rab...Mq\Consumer[]>|iterable of property $consumers.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
31
            $consumers,
32
            $multiConsumers,
33
            $dynamicConsumers,
34
            $batchConsumers,
35
            $anonConsumers
36
        ];
37
    }
38
39
    protected function configure()
40
    {
41
        $this
42
            ->setName('rabbitmq:group:consumer')
43
            ->addArgument('name', InputArgument::OPTIONAL, 'Consumer group name', 'default')
44
            ->addOption('connection', 'c', InputOption::VALUE_OPTIONAL, 'Rabbitmq connection name', 'default')
45
            ->addOption('timeout', 't', InputOption::VALUE_OPTIONAL, 'Timeout', 500)
46
            ->addOption('qos-prefetch-size', null, InputOption::VALUE_OPTIONAL, 'Qos prefetch size', 0)
47
            ->addOption('qos-prefetch-count', null,InputOption::VALUE_OPTIONAL, 'Qos prefetch count', 10)
48
            ->addOption('qos-global', null,InputOption::VALUE_OPTIONAL, 'Qos global', true)
49
            // TODO ?! ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0)
50
            // ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process (MB)', null)
51
            // ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
52
            // ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
53
            ->setDescription('Synchronous execute combined multiple consumers');
54
    }
55
56
    protected function execute(InputInterface $input, OutputInterface $output)
57
    {
58
        $group = $input->getArgument('name');
59
60
        /** @var BaseConsumer[] $consumers */
61
        $consumers = [];
62
63
        foreach ($this->consumers as $consumersByType) {
64
            foreach ($consumersByType as $consumer) {
65
                if (in_array($group, $consumer->getGroups(), true)) {
66
                    $consumers[] = $consumer;
67
                }
68
            }
69
        }
70
71
        if ([] === $consumers) {
72
            throw new InvalidArgumentException(sprintf('No consumers with %s group', $group));
0 ignored issues
show
Bug introduced by
It seems like $group can also be of type string[]; however, parameter $args of sprintf() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

72
            throw new InvalidArgumentException(sprintf('No consumers with %s group', /** @scrutinizer ignore-type */ $group));
Loading history...
73
        }
74
75
        $connectionName = $input->getOption('connection');
76
        $connectionServiceId = sprintf('old_sound_rabbit_mq.connection.%s', $connectionName);
77
        if (false === $this->container->has($connectionServiceId)) {
78
            throw new InvalidOptionException(sprintf('Rabbitmq connection %s is not defined', $connectionName));
79
        };
80
81
        /** @var AbstractConnection $connection */
82
        $connection = $this->container->get($connectionServiceId);
83
        $channel = $connection->channel();
84
85
        $channel->basic_qos(
86
            $input->getOption('qos-prefetch-size'),
87
            $input->getOption('qos-prefetch-count'),
88
            $input->getOption('qos-global')
89
        );
90
91
        foreach ($consumers as $i => $consumer) {
92
            $consumer->setChannel($channel);
93
            $consumer->setConsumerTag(sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $i));
94
            $consumer->setupConsumer();
95
        }
96
        
97
        while (count($channel->callbacks)) {
98
            // TODO emit events
99
            $channel->wait(null, false, $input->getOption('timeout'));
100
        }
101
102
        return 0;
103
    }
104
}