Passed
Push — master ( 407e73...baec40 )
by Koldo
02:35
created

StartQueueConsumer::execute()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
eloc 12
c 1
b 0
f 0
nc 2
nop 2
dl 0
loc 17
ccs 14
cts 14
cp 1
crap 2
rs 9.8666
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Antidot\Queue\Cli;
6
7
use Antidot\Queue\Event\QueueConsumerStarted;
8
use DateTime;
9
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
10
use Enqueue\Consumption\QueueConsumerInterface;
11
use Interop\Queue\Context;
12
use Interop\Queue\Message;
13
use Interop\Queue\Processor;
14
use InvalidArgumentException;
15
use Psr\EventDispatcher\EventDispatcherInterface;
16
use Symfony\Component\Console\Command\Command;
17
use Symfony\Component\Console\Input\InputArgument;
18
use Symfony\Component\Console\Input\InputInterface;
19
use Symfony\Component\Console\Output\OutputInterface;
20
21
use function dump;
22
23
class StartQueueConsumer extends Command
24
{
25
    public const NAME = 'queue:start';
26
    public const INVALID_NAME_MESSAGE = 'Argument "queue_name" must be of type string.';
27
    private QueueConsumerInterface $consumer;
28
    private Processor $processor;
29
    private Context $context;
30
    private EventDispatcherInterface $eventDispatcher;
31
32 4
    public function __construct(
33
        QueueConsumerInterface $consumer,
34
        Processor $messageProcessor,
35
        Context $context,
36
        EventDispatcherInterface $eventDispatcher
37
    ) {
38 4
        $this->consumer = $consumer;
39 4
        $this->processor = $messageProcessor;
40 4
        $this->context = $context;
41 4
        $this->eventDispatcher = $eventDispatcher;
42 4
        parent::__construct();
43 4
    }
44
45 4
    protected function configure(): void
46
    {
47 4
        $this->setName(self::NAME)
48 4
            ->setDescription('Start listening to the given queue name.')
49 4
            ->addArgument(
50 4
                'queue_name',
51 4
                InputArgument::REQUIRED,
52 4
                'The queue name we want to consume'
53
            );
54 4
    }
55
56 2
    protected function execute(InputInterface $input, OutputInterface $output): int
57
    {
58 2
        $queue = $input->getArgument('queue_name');
59 2
        if (false === is_string($queue)) {
60 1
            throw new InvalidArgumentException(self::INVALID_NAME_MESSAGE);
61
        }
62 1
        $processor = $this->processor;
63 1
        $context = $this->context;
64 1
        $dispatcher = $this->eventDispatcher;
65 1
        $dispatcher->dispatch(QueueConsumerStarted::occur(['queue' => $queue]));
66 1
        $this->consumer->bindCallback(
67 1
            $queue,
0 ignored issues
show
Bug introduced by
It seems like $queue can also be of type string[]; however, parameter $queueName of Enqueue\Consumption\Queu...terface::bindCallback() does only seem to accept Interop\Queue\Queue|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

67
            /** @scrutinizer ignore-type */ $queue,
Loading history...
68 1
            static fn(Message $message) => $processor->process($message, $context)
69 1
        );
70 1
        $this->consumer->consume();
71
72 1
        return 0;
73
    }
74
}
75