ProcessEventCommand   A
last analyzed

Complexity

Total Complexity 16

Size/Duplication

Total Lines 181
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 7

Test Coverage

Coverage 0%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 16
c 1
b 0
f 0
lcom 2
cbo 7
dl 0
loc 181
ccs 0
cts 119
cp 0
rs 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 14 1
A configure() 0 19 1
D execute() 0 96 10
A isLimitReached() 0 5 4
1
<?php
2
3
namespace Itkg\DelayEventBundle\Command;
4
5
use Itkg\DelayEventBundle\Handler\LockHandlerInterface;
6
use Itkg\DelayEventBundle\Model\Event;
7
use Itkg\DelayEventBundle\Processor\EventProcessor;
8
use Itkg\DelayEventBundle\Repository\EventRepository;
9
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
10
use Symfony\Component\Console\Input\InputInterface;
11
use Symfony\Component\Console\Input\InputOption;
12
use Symfony\Component\Console\Output\OutputInterface;
13
14
/**
15
 * Class ProcessEventCommand
16
 */
17
class ProcessEventCommand extends ContainerAwareCommand
18
{
19
    /**
20
     * @var EventRepository
21
     */
22
    private $eventRepository;
23
24
    /**
25
     * @var EventProcessor
26
     */
27
    private $eventProcessor;
28
29
    /**
30
     * @var LockHandlerInterface
31
     */
32
    private $lockHandler;
33
34
    /**
35
     * @var array
36
     */
37
    private $channels;
38
39
    /**
40
     * ProcessEventCommand constructor.
41
     *
42
     * @param EventRepository      $eventRepository
43
     * @param EventProcessor       $eventProcessor
44
     * @param LockHandlerInterface $lockHandler
45
     * @param array                $channels
46
     * @param null|string          $name
47
     */
48
    public function __construct(
49
        EventRepository $eventRepository,
50
        EventProcessor $eventProcessor,
51
        LockHandlerInterface $lockHandler,
52
        array $channels = [],
53
        $name = null
54
    ) {
55
        $this->eventRepository = $eventRepository;
56
        $this->eventProcessor = $eventProcessor;
57
        $this->lockHandler = $lockHandler;
58
        $this->channels = $channels;
59
60
        parent::__construct($name);
61
    }
62
63
    /**
64
     * {@inheritdoc}
65
     */
66
    protected function configure()
67
    {
68
        $this
69
            ->setName('itkg_delay_event:process')
70
            ->setDescription('Process async events')
71
            ->addOption(
72
                'channel',
73
                'c',
74
                InputOption::VALUE_IS_ARRAY | InputOption::VALUE_REQUIRED,
75
                'Specify the channels to process (default: [\'default\'])',
76
                ['default']
77
            )
78
            ->addOption(
79
                'group-field-identifier',
80
                'g',
81
                InputOption::VALUE_OPTIONAL,
82
                null
83
            );
84
    }
85
86
    /**
87
     * {@inheritdoc}
88
     */
89
    protected function execute(InputInterface $input, OutputInterface $output)
90
    {
91
        $channels = $input->getOption('channel');
92
93
        foreach ($channels as $channel) {
94
            if (!isset($this->channels[$channel])) {
95
                $output->writeln(
96
                    sprintf(
97
                        '<error>Channel <info>%s</info> is not configured.</error>',
98
                        $channel
99
                    )
100
                );
101
102
                continue;
103
            }
104
105
            $lockName = $channel;
106
            if (null !== $field = $input->getOption('group-field-identifier')) {
107
                $lockName = sprintf('%s_%s', $lockName, $field);
108
            }
109
            if ($this->lockHandler->isLocked($lockName)) {
110
                $output->writeln(
111
                    sprintf(
112
                        'Command is locked by another process for channel <info>%s</info> and lock name <info>%s</info>',
113
                        $channel,
114
                        $lockName
115
                    )
116
                );
117
118
                continue;
119
            }
120
121
            $output->writeln(
122
                sprintf(
123
                    'Process events for channel <info>%s</info>',
124
                    $channel
125
                )
126
            );
127
128
            $this->lockHandler->lock($lockName);
129
130
            $processedEventsCount = 0;
131
            $commandStartTime = time();
132
            $event = null;
133
134
            try {
135
                while (!$this->isLimitReached(
136
                    $commandStartTime,
137
                    $processedEventsCount,
138
                    $this->channels[$channel]['duration_limit_per_run'],
139
                    $this->channels[$channel]['events_limit_per_run'])
140
                ) {
141
                    if (!$this->lockHandler->isLocked($lockName)) {
142
                        $output->writeln(
143
                            sprintf(
144
                                '<error>Lock for channel <info>%s</info> has been released outside of the process.</error>',
145
                                $channel
146
                            )
147
                        );
148
149
                        break;
150
                    }
151
152
                    $event = $this->eventRepository->findFirstTodoEvent(
153
                        false,
154
                        $this->channels[$channel]['include'],
155
                        $this->channels[$channel]['exclude'],
156
                        $input->getOption('group-field-identifier')
157
                    );
158
159
                    if (!$event instanceof Event) {
160
                        break;
161
                    }
162
163
                    $event->setDelayed(false);
164
                    $this->eventProcessor->process($event);
165
                    $processedEventsCount++;
166
                }
167
            } catch (\Exception $e) {
168
                if ($event instanceof Event) {
169
                    $output->writeln(
170
                        sprintf(
171
                            '<info>[%s]</info> <error> An error occurred while processing event "%s".</error>',
172
                            $channel,
173
                            $event->getOriginalName()
174
                        )
175
                    );
176
                }
177
178
                $output->writeln(sprintf('<info>[%s]</info> <error>%s</error>', $channel, $e->getMessage()));
179
                $output->writeln($e->getTraceAsString());
180
            }
181
182
            $this->lockHandler->release($lockName);
183
        }
184
    }
185
186
    /**
187
     * @param int $commandStartTime
188
     * @param int $processedEventsCount
189
     * @param int $maxDurationPerRun
190
     * @param int $maxProcessedEventPerRun
191
     */
192
    private function isLimitReached($commandStartTime, $processedEventsCount, $maxDurationPerRun, $maxProcessedEventPerRun)
193
    {
194
        return (($maxProcessedEventPerRun !== null && $processedEventsCount >= $maxProcessedEventPerRun)
195
            || ($maxDurationPerRun !== null && (time() - $commandStartTime) >= $maxDurationPerRun));
196
    }
197
}
198