Passed
Pull Request — master (#167)
by Viktor
03:49 queued 38s
created

ListenAllCommand::execute()   A

Complexity

Conditions 6
Paths 14

Size

Total Lines 20
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 6
eloc 10
c 2
b 0
f 0
nc 14
nop 2
dl 0
loc 20
ccs 0
cts 11
cp 0
crap 42
rs 9.2222
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Yii\Queue\Command;
6
7
use Symfony\Component\Console\Command\Command;
8
use Symfony\Component\Console\Input\InputArgument;
9
use Symfony\Component\Console\Input\InputInterface;
10
use Symfony\Component\Console\Input\InputOption;
11
use Symfony\Component\Console\Output\OutputInterface;
12
use Yiisoft\Queue\Cli\LoopInterface;
13
use Yiisoft\Queue\QueueFactoryInterface;
14
15
final class ListenAllCommand extends Command
16
{
17
    protected static $defaultName = 'queue:listen-all';
18
    protected static $defaultDescription = 'Listens the all the given queues and executes messages as they come. ' .
19
    'Meant to be used in development environment only. ' .
20
    'Listens all configured queues by default in case you\'re using yiisoft/config. ' .
21
    'Needs to be stopped manually.';
22
23
    public function __construct(private QueueFactoryInterface $queueFactory, private LoopInterface $loop, private array $channels)
24
    {
25
        parent::__construct();
26
    }
27
28
    public function configure(): void
29
    {
30
        $this->addArgument(
31
            'channel',
32
            InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
33
            'Queue channel name list to connect to',
34
            $this->channels,
35
        )
36
            ->addOption(
37
                'pause',
38
                'p',
39
                InputOption::VALUE_REQUIRED,
40
                'Pause between queue channel iterations in seconds. May save some CPU. Default: 1',
41
                1,
42
            )
43
            ->addOption(
44
                'maximum',
45
                'm',
46
                InputOption::VALUE_REQUIRED,
47
                'Maximum number of messages to process in each channel before switching to another channel. ' .
48
                   'Default is 0 (no limits).',
49
                0,
50
            );
51
52
        $this->addUsage('[channel1 [channel2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
53
    }
54
55
    protected function execute(InputInterface $input, OutputInterface $output): int
56
    {
57
        $queues = [];
58
        /** @var string $channel */
59
        foreach ($input->getArgument('channel') as $channel) {
60
            $queues[] = $this->queueFactory->get($channel);
61
        }
62
63
        while ($this->loop->canContinue()) {
64
            $hasMessages = false;
65
            foreach ($queues as $queue) {
66
                $hasMessages = $queue->run((int)$input->getOption('maximum')) > 0 || $hasMessages;
67
            }
68
69
            if (!$hasMessages) {
70
                sleep((int)$input->getOption('pause'));
71
            }
72
        }
73
74
        return 0;
75
    }
76
}
77