ConsumeCommand::execute()   B
last analyzed

Complexity

Conditions 4
Paths 4

Size

Total Lines 28
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 4

Importance

Changes 1
Bugs 0 Features 1
Metric Value
dl 0
loc 28
ccs 18
cts 18
cp 1
rs 8.5806
c 1
b 0
f 1
cc 4
eloc 19
nc 4
nop 2
crap 4
1
<?php
2
declare(strict_types = 1);
3
4
namespace Innmind\AMQPBundle\Command;
5
6
use Innmind\AMQP\Model\Basic\{
7
    Consume,
8
    Qos
9
};
10
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
11
use Symfony\Component\Console\{
12
    Input\InputInterface,
13
    Input\InputArgument,
14
    Output\OutputInterface
15
};
16
17
final class ConsumeCommand extends ContainerAwareCommand
18
{
19
    /**
20
     * {@inheritdoc}
21
     */
22 6
    protected function configure()
23
    {
24
        $this
25 6
            ->setName('innmind:amqp:consume')
26 6
            ->setDescription('Will process messages from the given queue')
27 6
            ->addArgument('queue', InputArgument::REQUIRED)
28 6
            ->addArgument('number', InputArgument::OPTIONAL, 'The number of messages to process')
29 6
            ->addArgument('prefetch', InputArgument::OPTIONAL, 'The number of messages to prefetch');
30 6
    }
31
32
    /**
33
     * {@inheritdoc}
34
     */
35 6
    protected function execute(InputInterface $input, OutputInterface $output)
36
    {
37 6
        $queue = $input->getArgument('queue');
38 6
        $number = $input->getArgument('number');
39 6
        $prefetch = $input->getArgument('prefetch');
40
        $consume = $this
41 6
            ->getContainer()
42 6
            ->get('innmind.amqp.consumers')
43 6
            ->get($queue);
44
45
        $basic = $this
46 6
            ->getContainer()
47 6
            ->get('innmind.amqp.client')
48 6
            ->channel()
49 6
            ->basic();
50
51 6
        if (!is_null($number) || !is_null($prefetch)) {
52 4
            $basic->qos(new Qos(0, (int) ($prefetch ?? $number)));
53
        }
54
55 6
        $consumer = $basic->consume(new Consume($queue));
56
57 6
        if (!is_null($number)) {
58 4
            $consumer->take((int) $number);
59
        }
60
61 6
        $consumer->foreach($consume);
62 6
    }
63
}
64