Completed
Pull Request — master (#11)
by Clément
04:00 queued 33s
created

ProcessEventCommand::execute()   C

Complexity

Conditions 7
Paths 15

Size

Total Lines 47
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 56

Importance

Changes 5
Bugs 1 Features 1
Metric Value
c 5
b 1
f 1
dl 0
loc 47
ccs 0
cts 40
cp 0
rs 6.7272
cc 7
eloc 27
nc 15
nop 2
crap 56
1
<?php
2
3
namespace Itkg\DelayEventBundle\Command;
4
5
use Itkg\DelayEventBundle\DomainManager\EventManager;
6
use Itkg\DelayEventBundle\Exception\LockException;
7
use Itkg\DelayEventBundle\Handler\LockHandlerInterface;
8
use Itkg\DelayEventBundle\Model\Event;
9
use Itkg\DelayEventBundle\Processor\EventProcessor;
10
use Itkg\DelayEventBundle\Repository\EventRepository;
11
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
12
use Symfony\Component\Console\Input\InputArgument;
13
use Symfony\Component\Console\Input\InputInterface;
14
use Symfony\Component\Console\Input\InputOption;
15
use Symfony\Component\Console\Output\OutputInterface;
16
17
/**
18
 * Class ProcessEventCommand
19
 */
20
class ProcessEventCommand extends ContainerAwareCommand
21
{
22
    /**
23
     * @var EventRepository
24
     */
25
    private $eventRepository;
26
27
    /**
28
     * @var EventProcessor
29
     */
30
    private $eventProcessor;
31
32
    /**
33
     * @var LockHandlerInterface
34
     */
35
    private $lockHandler;
36
37
    /**
38
     * @var array
39
     */
40
    private $channels;
41
42
    /**
43
     * @param EventRepository      $eventRepository
44
     * @param EventProcessor       $eventProcessor
45
     * @param LockHandlerInterface $lockHandler
46
     * @param array                $channels
47
     * @param null|string          $name
48
     */
49
    public function __construct(
50
        EventRepository $eventRepository,
51
        EventProcessor $eventProcessor,
52
        LockHandlerInterface $lockHandler,
53
        array $channels = [],
54
        $name = null
55
    )
56
    {
57
        $this->eventRepository = $eventRepository;
58
        $this->eventProcessor = $eventProcessor;
59
        $this->lockHandler = $lockHandler;
60
        $this->channels = $channels;
61
62
        parent::__construct($name);
63
    }
64
65
    /**
66
     * {@inheritDoc}
67
     */
68
    protected function configure()
69
    {
70
        $this
71
            ->setName('itkg_delay_event:process')
72
            ->setDescription('Process async events')
73
            ->addOption(
74
                'channel',
75
                'c',
76
                InputOption::VALUE_IS_ARRAY | InputOption::VALUE_REQUIRED,
77
                'Specify the channels to process (default: [\'default\'])',
78
                ['default']
79
            );
80
    }
81
82
    /**
83
     * @param InputInterface  $input
84
     * @param OutputInterface $output
85
     *
86
     * @throws LockException
87
     * @throws \Exception
88
     *
89
     * @return int|null|void
90
     */
91
    protected function execute(InputInterface $input, OutputInterface $output)
92
    {
93
        $channels = $input->getOption('channels');
94
95
        foreach ($channels as $channel) {
96
            if(!isset($this->channels[$channel])) {
97
                $output->writeln(sprintf(
98
                    '<error>Channel <info>%s</info> is not configured.</error>',
99
                    $channel
100
                ));
101
            }
102
103
            if ($this->lockHandler->isLocked($channel)) {
104
                $output->writeln(
105
                    sprintf(
106
                        'Command is locked by another process for channel <info>%s</info>.',
107
                        $channel
108
                    )
109
                );
110
111
                continue;
112
            }
113
114
            $output->writeln(
115
                sprintf(
116
                    'Process events for channel <info>%s</info>',
117
                    $channel
118
                )
119
            );
120
121
            $this->lockHandler->lock($channel);
122
123
            try {
124
                while (true) {
125
                    if (!$event = $this->eventRepository->findFirstTodoEvent(false, $this->channels[$channel]['include'], $this->channels[$channel]['exclude'])) {
126
                        break;
127
                    }
128
                    $event->setDelayed(false);
129
                    $this->eventProcessor->process($event);
130
                }
131
            } catch (\Exception $e) {
132
                $output->writeln(sprintf('<error>%s</error>', $e->getMessage()));
133
            }
134
135
            $this->lockHandler->release($channel);
136
        }
137
    }
138
}
139