Passed
Push — master ( 4ace66...b47a41 )
by Alexander
06:35 queued 03:35
created

ListenAllCommand::configure()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 25
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 17
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 25
ccs 0
cts 23
cp 0
crap 2
rs 9.7
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Yiisoft\Queue\Command;
6
7
use Symfony\Component\Console\Command\Command;
8
use Symfony\Component\Console\Input\InputArgument;
9
use Symfony\Component\Console\Input\InputInterface;
10
use Symfony\Component\Console\Input\InputOption;
11
use Symfony\Component\Console\Output\OutputInterface;
12
use Yiisoft\Queue\Cli\LoopInterface;
13
use Yiisoft\Queue\QueueFactoryInterface;
14
15
final class ListenAllCommand extends Command
16
{
17
    protected static $defaultName = 'queue:listen-all';
18
    protected static $defaultDescription = 'Listens the all the given queues and executes messages as they come. ' .
19
    'Meant to be used in development environment only. ' .
20
    'Listens all configured queues by default in case you\'re using yiisoft/config. ' .
21
    'Needs to be stopped manually.';
22
23
    public function __construct(private QueueFactoryInterface $queueFactory, private LoopInterface $loop, private array $channels)
24
    {
25
        parent::__construct();
26
    }
27
28
    public function configure(): void
29
    {
30
        $this->addArgument(
31
            'channel',
32
            InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
33
            'Queue channel name list to connect to',
34
            $this->channels,
35
        )
36
            ->addOption(
37
                'pause',
38
                'p',
39
                InputOption::VALUE_REQUIRED,
40
                'Pause between queue channel iterations in seconds. May save some CPU. Default: 1',
41
                1,
42
            )
43
            ->addOption(
44
                'maximum',
45
                'm',
46
                InputOption::VALUE_REQUIRED,
47
                'Maximum number of messages to process in each channel before switching to another channel. ' .
48
                   'Default is 0 (no limits).',
49
                0,
50
            );
51
52
        $this->addUsage('[channel1 [channel2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
53
    }
54
55
    protected function execute(InputInterface $input, OutputInterface $output): int
56
    {
57
        $queues = [];
58
        /** @var string $channel */
59
        foreach ($input->getArgument('channel') as $channel) {
60
            $queues[] = $this->queueFactory->get($channel);
61
        }
62
63
        while ($this->loop->canContinue()) {
64
            $hasMessages = false;
65
            foreach ($queues as $queue) {
66
                $hasMessages = $queue->run((int)$input->getOption('maximum')) > 0 || $hasMessages;
67
            }
68
69
            if (!$hasMessages) {
70
                $pauseSeconds = (int)$input->getOption('pause');
71
                if ($pauseSeconds < 0) {
72
                    $pauseSeconds = 1;
73
                }
74
75
                /** @psalm-var 0|positive-int $pauseSeconds */
76
                sleep($pauseSeconds);
77
            }
78
        }
79
80
        return 0;
81
    }
82
}
83