Consume   A
last analyzed

Complexity

Total Complexity 7

Size/Duplication

Total Lines 73
Duplicated Lines 0 %

Test Coverage

Coverage 44.44%

Importance

Changes 0
Metric Value
wmc 7
dl 0
loc 73
rs 10
c 0
b 0
f 0
ccs 12
cts 27
cp 0.4444

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A configure() 0 9 1
B execute() 0 24 5
1
<?php
2
3
namespace JobQueue\Application\Console;
4
5
use JobQueue\Application\Utils\CommandTrait;
6
use JobQueue\Domain\Task\Profile;
7
use JobQueue\Domain\Task\Queue;
8
use JobQueue\Domain\Worker\Worker;
9
use Psr\Log\LoggerAwareInterface;
10
use Psr\Log\LoggerAwareTrait;
11
use Ramsey\Uuid\Uuid;
12
use Symfony\Component\Console\Command\Command;
13
use Symfony\Component\Console\Input\InputArgument;
14
use Symfony\Component\Console\Input\InputInterface;
15
use Symfony\Component\Console\Input\InputOption;
16
use Symfony\Component\Console\Output\OutputInterface;
17
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
18
19
final class Consume extends Command implements LoggerAwareInterface
20
{
21
    use CommandTrait;
22
    use LoggerAwareTrait;
23
24
    /**
25
     *
26
     * @var Queue
27
     */
28
    private $queue;
29
30
    /**
31
     *
32
     * @var EventDispatcherInterface
33
     */
34
    private $eventDispatcher;
35
36
    /**
37
     *
38
     * @param Queue                    $queue
39
     * @param EventDispatcherInterface $eventDispatcher
40
     * @param string                   $name
41
     */
42
    public function __construct(Queue $queue, EventDispatcherInterface $eventDispatcher, string $name = null)
43
    {
44
        $this->queue = $queue;
45
        $this->eventDispatcher = $eventDispatcher;
46
47
        parent::__construct($name);
48
    }
49
50
    public function configure()
51
    {
52
        $this
53
            ->setName('consume')
54
            ->setDescription('Consumes tasks from the queue')
55
            ->setHelp("Consumes tasks from a queue depending on one or multiple profiles.  \nEach task is consumed by executing the corresponding job.\n ")
56
            ->addArgument('profile', InputArgument::REQUIRED, 'Name of the profile to consume')
57
            ->addOption('name', 'w', InputOption::VALUE_OPTIONAL, 'Name of the worker')
58
            ->addOption('quantity', 'x', InputOption::VALUE_OPTIONAL, 'Quantity of tasks to consume')
59
        ;
60
    }
61
62
    /**
63
     *
64
     * @param InputInterface $input
65
     * @param OutputInterface $output
66
     * @return int
67
     */
68 2
    protected function execute(InputInterface $input, OutputInterface $output): int
69
    {
70 2
        $worker = new Worker(
71 2
            $name = $input->getOption('name') ?: (string) Uuid::uuid4(),
72 2
            $this->queue,
73 2
            $profile = new Profile($input->getArgument('profile')),
74 2
            $this->eventDispatcher
75
        );
76
77 2
        if ($this->logger) {
78
            $worker->setLogger($this->logger);
79
        }
80
81 2
        $this->formatInfoSection(sprintf('Worker %s handles "%s" tasks...', $name, $profile), $output);
82
83 2
        $worker->consume($quantity = (int) $input->getOption('quantity') ?: null);
84
85 2
        if ($quantity > 0) {
86 2
            $this->formatInfoSection(sprintf('Worker %s is done.', $name), $output);
87
        } else {
88
            $this->formatErrorSection(sprintf('Worker %s has hanged out!', $name), $output);
89
        }
90
91 2
        return 0;
92
    }
93
}
94