Passed
Push — master ( 04aa1f...4b45e8 )
by Koldo
02:50
created

StartQueueConsumer::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 5
c 1
b 0
f 0
dl 0
loc 11
ccs 6
cts 6
cp 1
rs 10
cc 1
nc 1
nop 4
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Antidot\Queue\Cli;
6
7
use Antidot\Queue\Event\QueueConsumerStarted;
8
use Enqueue\Consumption\QueueConsumerInterface;
9
use Interop\Queue\Context;
10
use Interop\Queue\Message;
11
use Interop\Queue\Processor;
12
use InvalidArgumentException;
13
use Psr\EventDispatcher\EventDispatcherInterface;
14
use Symfony\Component\Console\Command\Command;
15
use Symfony\Component\Console\Input\InputArgument;
16
use Symfony\Component\Console\Input\InputInterface;
17
use Symfony\Component\Console\Output\OutputInterface;
18
19
class StartQueueConsumer extends Command
20
{
21
    public const NAME = 'queue:start';
22
    private QueueConsumerInterface $consumer;
23
    private Processor $processor;
24
    private Context $context;
25
    private EventDispatcherInterface $eventDispatcher;
26
27 2
    public function __construct(
28
        QueueConsumerInterface $consumer,
29
        Processor $messageProcessor,
30
        Context $context,
31
        EventDispatcherInterface $eventDispatcher
32
    ) {
33 2
        $this->consumer = $consumer;
34 2
        $this->processor = $messageProcessor;
35 2
        $this->context = $context;
36 2
        $this->eventDispatcher = $eventDispatcher;
37 2
        parent::__construct();
38 2
    }
39
40 2
    protected function configure(): void
41
    {
42 2
        $this->setName(self::NAME)
43 2
            ->setDescription('Start listening to the given queue name.')
44 2
            ->addArgument(
45 2
                'queue_name',
46 2
                InputArgument::REQUIRED,
47 2
                'The queue name we want to consume'
48
            );
49 2
    }
50
51
    protected function execute(InputInterface $input, OutputInterface $output): int
52
    {
53
        $queue = $input->getArgument('queue_name');
54
        if (false === is_string($queue)) {
55
            throw new InvalidArgumentException('Argument "queue_name" must be of type string.');
56
        }
57
        $processor = $this->processor;
58
        $context = $this->context;
59
        $dispatcher = $this->eventDispatcher;
60
        $dispatcher->dispatch(QueueConsumerStarted::occur(['queue' => $queue]));
61
        $this->consumer->bindCallback(
62
            $queue,
0 ignored issues
show
Bug introduced by
It seems like $queue can also be of type string[]; however, parameter $queueName of Enqueue\Consumption\Queu...terface::bindCallback() does only seem to accept Interop\Queue\Queue|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

62
            /** @scrutinizer ignore-type */ $queue,
Loading history...
63
            static fn(Message $message) => $processor->process($message, $context)
64
        );
65
66
        $this->consumer->consume();
67
68
        return 0;
69
    }
70
}
71