Test Failed
Pull Request — master (#38)
by Aleksandr
14:31 queued 04:35
created

GroupConsumerCommand::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 7
c 1
b 0
f 1
nc 1
nop 5
dl 0
loc 14
rs 10
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Command;
4
5
use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer;
6
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use PhpAmqpLib\Exception\AMQPTimeoutException;
9
use Symfony\Component\Console\Command\Command;
10
use Symfony\Component\Console\Exception\InvalidArgumentException;
11
use Symfony\Component\Console\Exception\InvalidOptionException;
12
use Symfony\Component\Console\Input\InputArgument;
13
use Symfony\Component\Console\Input\InputInterface;
14
use Symfony\Component\Console\Input\InputOption;
15
use Symfony\Component\Console\Output\OutputInterface;
16
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
17
18
class GroupConsumerCommand extends BaseRabbitMqCommand
19
{
20
    /** @var iterable|Consumer[][] */
21
    protected $consumers;
22
23
    public function __construct(
24
        iterable $consumers,
25
        iterable $multiConsumers,
26
        iterable $dynamicConsumers,
27
        iterable $batchConsumers,
28
        iterable $anonConsumers
29
    ) {
30
        parent::__construct();
31
        $this->consumers = [
0 ignored issues
show
Documentation Bug introduced by
It seems like array($consumers, $multi...sumers, $anonConsumers) of type array<integer,iterable> is incompatible with the declared type array<mixed,OldSound\Rab...Mq\Consumer[]>|iterable of property $consumers.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
32
            $consumers,
33
            $multiConsumers,
34
            $dynamicConsumers,
35
            $batchConsumers,
36
            $anonConsumers
37
        ];
38
    }
39
40
    protected function configure()
41
    {
42
        $this
43
            ->setName('rabbitmq:group:consumer')
44
            ->addArgument('name', InputArgument::OPTIONAL, 'Consumer group name', 'default')
45
            ->addOption('connection', 'c', InputOption::VALUE_OPTIONAL, 'Rabbitmq connection name', 'default')
46
            ->addOption('timeout', 't', InputOption::VALUE_OPTIONAL, 'Timeout', 500)
47
            ->addOption('qos-prefetch-size', null, InputOption::VALUE_OPTIONAL, 'Qos prefetch size', 0)
48
            ->addOption('qos-prefetch-count', null,InputOption::VALUE_OPTIONAL, 'Qos prefetch count', 10)
49
            ->addOption('qos-global', null,InputOption::VALUE_OPTIONAL, 'Qos global', true)
50
            // TODO ?! ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0)
51
            ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process (MB)', null)
52
            ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
53
            ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
54
            ->setDescription('Synchronous execute combined multiple consumers');
55
    }
56
57
    /**
58
     * @todo remove duplicate code from BaseConsumerCommand
59
     * extands BaseConsumerCommand, split code to separate GroupConsumer class
60
     *
61
     * @param InputInterface $input
62
     * @param OutputInterface $output
63
     * @return int
64
     * @throws \ErrorException
65
     */
66
    protected function execute(InputInterface $input, OutputInterface $output)
67
    {
68
        $group = $input->getArgument('name');
69
70
        /** @var BaseConsumer[] $consumers */
71
        $consumers = [];
72
73
        foreach ($this->consumers as $consumersByType) {
74
            foreach ($consumersByType as $consumer) {
75
                if (in_array($group, $consumer->getGroups(), true)) {
76
                    $consumers[] = $consumer;
77
                }
78
            }
79
        }
80
81
        if ([] === $consumers) {
82
            throw new InvalidArgumentException(sprintf('No consumers with %s group', $group));
0 ignored issues
show
Bug introduced by
It seems like $group can also be of type string[]; however, parameter $args of sprintf() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

82
            throw new InvalidArgumentException(sprintf('No consumers with %s group', /** @scrutinizer ignore-type */ $group));
Loading history...
83
        }
84
85
        $connectionName = $input->getOption('connection');
86
        $connectionServiceId = sprintf('old_sound_rabbit_mq.connection.%s', $connectionName);
87
        if (false === $this->container->has($connectionServiceId)) {
88
            throw new InvalidOptionException(sprintf('Rabbitmq connection %s is not defined', $connectionName));
89
        };
90
91
92
        if (defined('AMQP_WITHOUT_SIGNALS') === false) {
93
            define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
94
        }
95
96
        if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) {
97
            if (!function_exists('pcntl_signal')) {
98
                throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called.");
99
            }
100
101
            $stopConsumer = function () use ($consumers) {
102
                foreach ($consumers as $consumer) {
103
                    $consumer->forceStopConsumer();
104
105
                    // Halt consumer if waiting for a new message from the queue
106
                    try {
107
                        $consumer->stopConsuming();
108
                    } catch (AMQPTimeoutException $e) {}
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
109
                }
110
            };
111
112
            pcntl_signal(SIGTERM, $stopConsumer);
113
            pcntl_signal(SIGINT, $stopConsumer);
114
            pcntl_signal(SIGHUP, $stopConsumer);
115
        }
116
117
        if (defined('AMQP_DEBUG') === false) {
118
            define('AMQP_DEBUG', (bool) $input->getOption('debug'));
119
        }
120
121
122
123
        /** @var AbstractConnection $connection */
124
        $connection = $this->container->get($connectionServiceId);
125
        $channel = $connection->channel();
126
127
        $channel->basic_qos(
128
            $input->getOption('qos-prefetch-size'),
129
            $input->getOption('qos-prefetch-count'),
130
            $input->getOption('qos-global')
131
        );
132
133
        if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) {
134
            foreach ($consumers as $i => $consumer) {
135
                $consumer->setMemoryLimit($input->getOption('memory-limit'));
136
            }
137
        }
138
139
        foreach ($consumers as $i => $consumer) {
140
            $consumer->setChannel($channel);
141
            $consumer->setConsumerTag(sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $i));
142
            $consumer->setupConsumer();
143
        }
144
        
145
        while (count($channel->callbacks)) {
146
            $channel->wait(null, false, $input->getOption('timeout'));
147
        }
148
149
        return 0;
150
    }
151
}