Completed
Pull Request — master (#41)
by Pascal
03:35 queued 01:00
created

ProcessDynamicChannelCommand::getEnv()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 4
cp 0
rs 10
c 0
b 0
f 0
cc 2
eloc 2
nc 1
nop 1
crap 6
1
<?php
2
3
namespace Itkg\DelayEventBundle\Command;
4
5
use Itkg\DelayEventBundle\Handler\LockHandlerInterface;
6
use Itkg\DelayEventBundle\Model\Event;
7
use Itkg\DelayEventBundle\Model\Lock;
8
use Itkg\DelayEventBundle\Processor\EventProcessor;
9
use Itkg\DelayEventBundle\Repository\EventRepository;
10
use Itkg\DelayEventBundle\Repository\LockRepository;
11
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
12
use Symfony\Component\Console\Input\InputArgument;
13
use Symfony\Component\Console\Input\InputInterface;
14
use Symfony\Component\Console\Input\InputOption;
15
use Symfony\Component\Console\Output\OutputInterface;
16
17
/**
18
 * Class ProcessDynamicChannelCommand
19
 */
20
class ProcessDynamicChannelCommand extends ContainerAwareCommand
21
{
22
    /**
23
     * @var EventRepository
24
     */
25
    private $eventRepository;
26
27
    /**
28
     * @var EventProcessor
29
     */
30
    private $eventProcessor;
0 ignored issues
show
Unused Code introduced by
The property $eventProcessor is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
31
32
    /**
33
     * @var LockHandlerInterface
34
     */
35
    private $lockHandler;
0 ignored issues
show
Unused Code introduced by
The property $lockHandler is not used and could be removed.

This check marks private properties in classes that are never used. Those properties can be removed.

Loading history...
36
37
    /**
38
     * @var array
39
     */
40
    private $channels;
41
42
    /**
43
     * @var LockRepository
44
     */
45
    private $lockRepository;
46
47
    /**
48
     * ProcessEventCommand constructor.
49
     *
50
     * @param EventRepository      $eventRepository
51
     * @param LockRepository       $lockRepository
52
     * @param array                $channels
53
     * @param null|string          $name
54
     */
55
    public function __construct(
56
        EventRepository $eventRepository,
57
        LockRepository $lockRepository,
58
        array $channels = [],
59
        $name = null
60
    ) {
61
        $this->eventRepository = $eventRepository;
62
        $this->lockRepository = $lockRepository;
63
        $this->channels = $channels;
64
65
        parent::__construct($name);
66
    }
67
68
    /**
69
     * {@inheritdoc}
70
     */
71
    protected function configure()
72
    {
73
        $this
74
            ->setName('itkg_delay_event:process_dynamic_channel')
75
            ->setDescription('Process dynamic channel')
76
            ->addArgument(
77
                'channel',
78
                InputArgument::REQUIRED,
79
                'Dynamic channel to process'
80
            )
81
            ->addOption(
82
                'concurrent-jobs-count',
83
                'c',
84
                InputOption::VALUE_OPTIONAL,
85
                'Maximum concurrent jobs count for this channel. Default 5',
86
                5
87
            );
88
    }
89
90
    /**
91
     * {@inheritdoc}
92
     */
93
    protected function execute(InputInterface $input, OutputInterface $output)
94
    {
95
        $channel = $input->getArgument('channel');
96
97
        $commandline = 'php console --env=%s itkg_delay_event:process -c %s -g %s';
98
99
        if (!isset($this->channels[$channel])) {
100
            $output->writeln(
101
                sprintf(
102
                    '<error>Channel <info>%s</info> is not configured.</error>',
103
                    $channel
104
                )
105
            );
106
            return;
107
        }
108
109
        if (!$this->channels[$channel]['dynamic']) {
110
            $output->writeln(
111
                sprintf(
112
                    '<error>Channel <info>%s</info> is not dynamic.</error>',
113
                    $channel
114
                )
115
            );
116
            return;
117
        }
118
119
        $fieldGroupIdentifierList = $this->eventRepository->findDistinctFieldGroupIdentifierByChannel(
0 ignored issues
show
Bug introduced by
The method toArray cannot be called on $this->eventRepository->...s[$channel]['exclude']) (of type array).

Methods can only be called on objects. This check looks for methods being called on variables that have been inferred to never be objects.

Loading history...
120
            $this->channels[$channel]['include'],
121
            $this->channels[$channel]['exclude']
122
        )->toArray();
123
124
        $locks = $this->lockRepository->findAll();
125
        $lockNames = array();
126
        /** @var Lock $lock */
127
        foreach ($locks as $lock) {
128
            if ($lock->isCommandLocked()) {
129
                $lockNames[] = str_replace(sprintf('%s_', $channel), '', $lock->getChannel())   ;
130
            }
131
        }
132
133
        $concurrentAvailableSlotsCount = $this->calculateAvailableSlotsCount(
134
            $fieldGroupIdentifierList,
135
            $lockNames,
136
            $input->getOption('concurrent-jobs-count')
137
        );
138
139
        if ($concurrentAvailableSlotsCount <= 0) {
140
            $output->writeln(
141
                sprintf(
142
                    '<info>Maximum concurrent jobs limit for %s is reached.</info>',
143
                    $channel
144
                )
145
            );
146
        }
147
148
        $groupFieldIdentifierListToProcess = array_slice(array_diff($fieldGroupIdentifierList, $lockNames), 0, 3);
149
150
        foreach ($groupFieldIdentifierListToProcess as $identifier) {
151
            // Create a new dynamic channel
152
153
            $process = new \Symfony\Component\Process\Process(
154
                sprintf(
155
                    $commandline,
156
                    $this->getEnv($input),
157
                    $channel,
158
                    $identifier
159
                )
160
            );
161
162
            $process->setWorkingDirectory($this->getWorkingDir());
163
            $process->start();
164
        }
165
166
    }
167
168
    /**
169
     * @param array $lockNames
170
     * @param array $fieldGroupIdentifierList
171
     * @param int $maxConcurrentJobsCount
172
     *
173
     * @return bool
174
     */
175
    private function calculateAvailableSlotsCount($lockNames, $fieldGroupIdentifierList, $maxConcurrentJobsCount)
176
    {
177
        $concurrentJobsCount = count(array_intersect($fieldGroupIdentifierList  , $lockNames));
178
179
        return $maxConcurrentJobsCount - $concurrentJobsCount;
180
    }
181
182
    /**
183
     * @return string
184
     */
185
    private function getWorkingDir()
186
    {
187
        return $this->getContainer()->get('kernel')->getRootDir();
188
    }
189
190
    /**
191
     * @return string
192
     */
193
    private function getEnv(InputInterface $input)
194
    {
195
        return $input->getParameterOption(array('--env', '-e'), getenv('SYMFONY_ENV') ?: 'dev');
196
    }
197
}
198