Passed
Push — master ( 407e73...baec40 )
by Koldo
02:35
created

StartQueueConsumer   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 50
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 29
c 1
b 0
f 0
dl 0
loc 50
ccs 29
cts 29
cp 1
rs 10
wmc 4

3 Methods

Rating   Name   Duplication   Size   Complexity  
A execute() 0 17 2
A configure() 0 8 1
A __construct() 0 11 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Antidot\Queue\Cli;
6
7
use Antidot\Queue\Event\QueueConsumerStarted;
8
use DateTime;
9
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
10
use Enqueue\Consumption\QueueConsumerInterface;
11
use Interop\Queue\Context;
12
use Interop\Queue\Message;
13
use Interop\Queue\Processor;
14
use InvalidArgumentException;
15
use Psr\EventDispatcher\EventDispatcherInterface;
16
use Symfony\Component\Console\Command\Command;
17
use Symfony\Component\Console\Input\InputArgument;
18
use Symfony\Component\Console\Input\InputInterface;
19
use Symfony\Component\Console\Output\OutputInterface;
20
21
use function dump;
22
23
class StartQueueConsumer extends Command
24
{
25
    public const NAME = 'queue:start';
26
    public const INVALID_NAME_MESSAGE = 'Argument "queue_name" must be of type string.';
27
    private QueueConsumerInterface $consumer;
28
    private Processor $processor;
29
    private Context $context;
30
    private EventDispatcherInterface $eventDispatcher;
31
32 4
    public function __construct(
33
        QueueConsumerInterface $consumer,
34
        Processor $messageProcessor,
35
        Context $context,
36
        EventDispatcherInterface $eventDispatcher
37
    ) {
38 4
        $this->consumer = $consumer;
39 4
        $this->processor = $messageProcessor;
40 4
        $this->context = $context;
41 4
        $this->eventDispatcher = $eventDispatcher;
42 4
        parent::__construct();
43 4
    }
44
45 4
    protected function configure(): void
46
    {
47 4
        $this->setName(self::NAME)
48 4
            ->setDescription('Start listening to the given queue name.')
49 4
            ->addArgument(
50 4
                'queue_name',
51 4
                InputArgument::REQUIRED,
52 4
                'The queue name we want to consume'
53
            );
54 4
    }
55
56 2
    protected function execute(InputInterface $input, OutputInterface $output): int
57
    {
58 2
        $queue = $input->getArgument('queue_name');
59 2
        if (false === is_string($queue)) {
60 1
            throw new InvalidArgumentException(self::INVALID_NAME_MESSAGE);
61
        }
62 1
        $processor = $this->processor;
63 1
        $context = $this->context;
64 1
        $dispatcher = $this->eventDispatcher;
65 1
        $dispatcher->dispatch(QueueConsumerStarted::occur(['queue' => $queue]));
66 1
        $this->consumer->bindCallback(
67 1
            $queue,
0 ignored issues
show
Bug introduced by
It seems like $queue can also be of type string[]; however, parameter $queueName of Enqueue\Consumption\Queu...terface::bindCallback() does only seem to accept Interop\Queue\Queue|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

67
            /** @scrutinizer ignore-type */ $queue,
Loading history...
68 1
            static fn(Message $message) => $processor->process($message, $context)
69 1
        );
70 1
        $this->consumer->consume();
71
72 1
        return 0;
73
    }
74
}
75