ProcessDynamicChannelCommand   A
last analyzed

Complexity

Total Complexity 14

Size/Duplication

Total Lines 190
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 6

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
wmc 14
lcom 2
cbo 6
dl 0
loc 190
ccs 0
cts 115
cp 0
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A calculateAvailableSlotsCount() 0 6 1
A getWorkingDir() 0 4 1
A __construct() 0 12 1
A configure() 0 18 1
C execute() 0 86 9
A getHostName() 0 4 1
1
<?php
2
3
namespace Itkg\DelayEventBundle\Command;
4
5
use Itkg\DelayEventBundle\Handler\LockHandlerInterface;
6
use Itkg\DelayEventBundle\Model\Lock;
7
use Itkg\DelayEventBundle\Processor\EventProcessor;
8
use Itkg\DelayEventBundle\Repository\EventRepository;
9
use Itkg\DelayEventBundle\Repository\LockRepository;
10
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
11
use Symfony\Component\Console\Input\InputArgument;
12
use Symfony\Component\Console\Input\InputInterface;
13
use Symfony\Component\Console\Input\InputOption;
14
use Symfony\Component\Console\Output\OutputInterface;
15
16
/**
17
 * Class ProcessDynamicChannelCommand
18
 */
19
class ProcessDynamicChannelCommand extends ContainerAwareCommand
20
{
21
    /**
22
     * @var EventRepository
23
     */
24
    private $eventRepository;
25
26
    /**
27
     * @var EventProcessor
28
     */
29
    private $eventProcessor;
0 ignored issues
show
Unused Code introduced by
The property $eventProcessor is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
30
31
    /**
32
     * @var LockHandlerInterface
33
     */
34
    private $lockHandler;
0 ignored issues
show
Unused Code introduced by
The property $lockHandler is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
35
36
    /**
37
     * @var array
38
     */
39
    private $channels;
40
41
    /**
42
     * @var LockRepository
43
     */
44
    private $lockRepository;
45
46
    /**
47
     * ProcessEventCommand constructor.
48
     *
49
     * @param EventRepository      $eventRepository
50
     * @param LockRepository       $lockRepository
51
     * @param array                $channels
52
     * @param null|string          $name
53
     */
54
    public function __construct(
55
        EventRepository $eventRepository,
56
        LockRepository $lockRepository,
57
        array $channels = [],
58
        $name = null
59
    ) {
60
        $this->eventRepository = $eventRepository;
61
        $this->lockRepository = $lockRepository;
62
        $this->channels = $channels;
63
64
        parent::__construct($name);
65
    }
66
67
    /**
68
     * {@inheritdoc}
69
     */
70
    protected function configure()
71
    {
72
        $this
73
            ->setName('itkg_delay_event:process_dynamic_channel')
74
            ->setDescription('Process dynamic channel')
75
            ->addArgument(
76
                'channel',
77
                InputArgument::REQUIRED,
78
                'Dynamic channel to process'
79
            )
80
            ->addOption(
81
                'concurrent-jobs-count',
82
                'c',
83
                InputOption::VALUE_OPTIONAL,
84
                'Maximum concurrent jobs count for this channel. Default 5',
85
                5
86
            );
87
    }
88
89
    /**
90
     * {@inheritdoc}
91
     */
92
    protected function execute(InputInterface $input, OutputInterface $output)
93
    {
94
        $channel = $input->getArgument('channel');
95
96
        $commandline = $this->getContainer()->getParameter('itkg_delay_event.command_line');
97
98
        if (!isset($this->channels[$channel])) {
99
            $output->writeln(
100
                sprintf(
101
                    '<error>Channel <info>%s</info> is not configured.</error>',
102
                    $channel
103
                )
104
            );
105
            return;
106
        }
107
108
        if (!$this->channels[$channel]['dynamic']) {
109
            $output->writeln(
110
                sprintf(
111
                    '<error>Channel <info>%s</info> is not dynamic.</error>',
112
                    $channel
113
                )
114
            );
115
            return;
116
        }
117
118
        $fieldGroupIdentifierList = $this->eventRepository->findDistinctFieldGroupIdentifierByChannel(
0 ignored issues
show
Bug introduced by
The method toArray cannot be called on $this->eventRepository->...s[$channel]['exclude']) (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
119
            $this->channels[$channel]['include'],
120
            $this->channels[$channel]['exclude']
121
        )->toArray();
122
123
        $locks = $this->lockRepository->findAll();
124
        $lockNames = array();
125
        $currentMachineLockNames = array();
126
        /** @var Lock $lock */
127
        foreach ($locks as $lock) {
128
            $lockName = str_replace(sprintf('%s_', $channel), '', $lock->getChannel());
129
            if ($lock->isCommandLocked() && in_array($lockName, $fieldGroupIdentifierList)) {
130
                $lockNames[] = $lockName;
131
                if ($lock->getLockedBy() === $this->getHostName()) {
132
                    $currentMachineLockNames[] = $lockName;
133
                }
134
            }
135
        }
136
137
        $concurrentAvailableSlotsCount = $this->calculateAvailableSlotsCount(
138
            $fieldGroupIdentifierList,
139
            $currentMachineLockNames,
140
            $input->getOption('concurrent-jobs-count')
141
        );
142
143
        if ($concurrentAvailableSlotsCount <= 0) {
144
            $output->writeln(
145
                sprintf(
146
                    '<info>Maximum concurrent jobs limit for %s is reached.</info>',
147
                    $channel
148
                )
149
            );
150
            return;
151
        }
152
153
        $groupFieldIdentifierListToProcess = array_slice(
154
            array_diff(
155
                $fieldGroupIdentifierList,
156
                $lockNames
157
            ),
158
            0,
159
            $concurrentAvailableSlotsCount
160
        );
161
162
        foreach ($groupFieldIdentifierListToProcess as $identifier) {
163
            // Create a new dynamic channel
164
165
            $process = new \Symfony\Component\Process\Process(
166
                sprintf(
167
                    $commandline,
168
                    $this->getContainer()->getParameter('kernel.environment'),
169
                    $channel,
170
                    $identifier
171
                )
172
            );
173
174
            $process->setWorkingDirectory($this->getWorkingDir());
175
            $process->start();
176
        }
177
    }
178
179
    /**
180
     * @param array $lockNames
181
     * @param array $fieldGroupIdentifierList
182
     * @param int $maxConcurrentJobsCount
183
     *
184
     * @return bool
185
     */
186
    private function calculateAvailableSlotsCount($lockNames, $fieldGroupIdentifierList, $maxConcurrentJobsCount)
187
    {
188
        $concurrentJobsCount = count(array_intersect($fieldGroupIdentifierList  , $lockNames));
189
190
        return $maxConcurrentJobsCount - $concurrentJobsCount;
191
    }
192
193
    /**
194
     * @return string
195
     */
196
    private function getWorkingDir()
197
    {
198
        return $this->getContainer()->get('kernel')->getRootDir();
199
    }
200
201
    /**
202
     * @return string
203
     */
204
    private function getHostName()
205
    {
206
        return php_uname('n');
207
    }
208
}
209