Completed
Pull Request — master (#41)
by Pascal
05:26
created

ProcessDynamicChannelCommand::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 12
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 12
ccs 0
cts 11
cp 0
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 9
nc 1
nop 4
crap 2
1
<?php
2
3
namespace Itkg\DelayEventBundle\Command;
4
5
use Itkg\DelayEventBundle\Handler\LockHandlerInterface;
6
use Itkg\DelayEventBundle\Model\Event;
7
use Itkg\DelayEventBundle\Model\Lock;
8
use Itkg\DelayEventBundle\Processor\EventProcessor;
9
use Itkg\DelayEventBundle\Repository\EventRepository;
10
use Itkg\DelayEventBundle\Repository\LockRepository;
11
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
12
use Symfony\Component\Console\Input\InputArgument;
13
use Symfony\Component\Console\Input\InputInterface;
14
use Symfony\Component\Console\Input\InputOption;
15
use Symfony\Component\Console\Output\OutputInterface;
16
17
/**
18
 * Class ProcessDynamicChannelCommand
19
 */
20
class ProcessDynamicChannelCommand extends ContainerAwareCommand
21
{
22
    /**
23
     * @var EventRepository
24
     */
25
    private $eventRepository;
26
27
    /**
28
     * @var EventProcessor
29
     */
30
    private $eventProcessor;
0 ignored issues
show
Unused Code introduced by
The property $eventProcessor is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
31
32
    /**
33
     * @var LockHandlerInterface
34
     */
35
    private $lockHandler;
0 ignored issues
show
Unused Code introduced by
The property $lockHandler is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
36
37
    /**
38
     * @var array
39
     */
40
    private $channels;
41
42
    /**
43
     * @var LockRepository
44
     */
45
    private $lockRepository;
46
47
    /**
48
     * ProcessEventCommand constructor.
49
     *
50
     * @param EventRepository      $eventRepository
51
     * @param LockRepository       $lockRepository
52
     * @param array                $channels
53
     * @param null|string          $name
54
     */
55
    public function __construct(
56
        EventRepository $eventRepository,
57
        LockRepository $lockRepository,
58
        array $channels = [],
59
        $name = null
60
    ) {
61
        $this->eventRepository = $eventRepository;
62
        $this->lockRepository = $lockRepository;
63
        $this->channels = $channels;
64
65
        parent::__construct($name);
66
    }
67
68
    /**
69
     * {@inheritdoc}
70
     */
71
    protected function configure()
72
    {
73
        $this
74
            ->setName('itkg_delay_event:process_dynamic_channel')
75
            ->setDescription('Process dynamic channel')
76
            ->addArgument(
77
                'channel',
78
                InputArgument::REQUIRED,
79
                'Dynamic channel to process'
80
            )
81
            ->addOption(
82
                'concurrent-jobs-count',
83
                'c',
84
                InputOption::VALUE_OPTIONAL,
85
                'Maximum concurrent jobs count for this channel. Default 5',
86
                5
87
            );
88
    }
89
90
    /**
91
     * {@inheritdoc}
92
     */
93
    protected function execute(InputInterface $input, OutputInterface $output)
94
    {
95
        $channel = $input->getArgument('channel');
96
97
        $commandline = 'php console --env=%s itkg_delay_event:process -c %s -g %s';
98
99
        if (!isset($this->channels[$channel])) {
100
            $output->writeln(
101
                sprintf(
102
                    '<error>Channel <info>%s</info> is not configured.</error>',
103
                    $channel
104
                )
105
            );
106
            return;
107
        }
108
109
        if (!$this->channels[$channel]['dynamic']) {
110
            $output->writeln(
111
                sprintf(
112
                    '<error>Channel <info>%s</info> is not dynamic.</error>',
113
                    $channel
114
                )
115
            );
116
            return;
117
        }
118
119
        $fieldGroupIdentifierList = $this->eventRepository->findDistinctFieldGroupIdentifierByChannel(
0 ignored issues
show
Bug introduced by
The method toArray cannot be called on $this->eventRepository->...s[$channel]['exclude']) (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
120
            $this->channels[$channel]['include'],
121
            $this->channels[$channel]['exclude']
122
        )->toArray();
123
124
        $locks = $this->lockRepository->findAll();
125
        $lockNames = array();
126
        $currentMachineLockNames = array();
127
        /** @var Lock $lock */
128
        foreach ($locks as $lock) {
129
            $lockName = str_replace(sprintf('%s_', $channel), '', $lock->getChannel());
130
            if ($lock->isCommandLocked() && in_array($lockName, $fieldGroupIdentifierList)) {
131
                $lockNames[] = $lockName;
132
                if ($lock->getLockedBy() === $this->getHostName()) {
133
                    $currentMachineLockNames[] = $lockName;
134
                }
135
            }
136
        }
137
138
        $concurrentAvailableSlotsCount = $this->calculateAvailableSlotsCount(
139
            $fieldGroupIdentifierList,
140
            $currentMachineLockNames,
141
            $input->getOption('concurrent-jobs-count')
142
        );
143
144
        if ($concurrentAvailableSlotsCount <= 0) {
145
            $output->writeln(
146
                sprintf(
147
                    '<info>Maximum concurrent jobs limit for %s is reached.</info>',
148
                    $channel
149
                )
150
            );
151
        }
152
153
        $groupFieldIdentifierListToProcess = array_slice(
154
            array_diff(
155
                $fieldGroupIdentifierList,
156
                $lockNames
157
            ),
158
            0,
159
            $input->getOption('concurrent-jobs-count')
160
        );
161
162
        foreach ($groupFieldIdentifierListToProcess as $identifier) {
163
            // Create a new dynamic channel
164
165
            $process = new \Symfony\Component\Process\Process(
166
                sprintf(
167
                    $commandline,
168
                    $this->getEnv($input),
169
                    $channel,
170
                    $identifier
171
                )
172
            );
173
174
            $process->setWorkingDirectory($this->getWorkingDir());
175
            $process->start();
176
        }
177
178
    }
179
180
    /**
181
     * @param array $lockNames
182
     * @param array $fieldGroupIdentifierList
183
     * @param int $maxConcurrentJobsCount
184
     *
185
     * @return bool
186
     */
187
    private function calculateAvailableSlotsCount($lockNames, $fieldGroupIdentifierList, $maxConcurrentJobsCount)
188
    {
189
        $concurrentJobsCount = count(array_intersect($fieldGroupIdentifierList  , $lockNames));
190
191
        return $maxConcurrentJobsCount - $concurrentJobsCount;
192
    }
193
194
    /**
195
     * @return string
196
     */
197
    private function getWorkingDir()
198
    {
199
        return $this->getContainer()->get('kernel')->getRootDir();
200
    }
201
202
    /**
203
     * @return string
204
     */
205
    private function getEnv(InputInterface $input)
206
    {
207
        return $input->getParameterOption(array('--env', '-e'), getenv('SYMFONY_ENV') ?: 'dev');
208
    }
209
210
    /**
211
     * @return string
212
     */
213
    private function getHostName()
214
    {
215
        return php_uname('n');
216
    }
217
}
218