GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( d89857...62c949 )
by Anton
08:07 queued 05:04
created

ParallelExecutor   A

Complexity

Total Complexity 40

Size/Duplication

Total Lines 278
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 16

Test Coverage

Coverage 90.83%

Importance

Changes 0
Metric Value
dl 0
loc 278
ccs 99
cts 109
cp 0.9083
rs 9.2
c 0
b 0
f 0
wmc 40
lcom 1
cbo 16

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
B run() 0 54 9
B runTask() 0 30 6
A getProcess() 0 24 4
A startProcesses() 0 6 2
A areRunning() 0 9 3
A gatherOutput() 0 15 4
A gatherExitCodes() 0 10 3
B generateOptions() 0 29 6
A generateArguments() 0 11 2

How to fix   Complexity   

Complex Class

Complex classes like ParallelExecutor often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use ParallelExecutor, and based on these observations, apply Extract Interface, too.

1
<?php
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Application;
11
use Deployer\Console\Output\Informer;
12
use Deployer\Console\Output\VerbosityString;
13
use Deployer\Exception\Exception;
14
use Deployer\Exception\GracefulShutdownException;
15
use Deployer\Host\Host;
16
use Deployer\Host\Localhost;
17
use Deployer\Host\Storage;
18
use Deployer\Task\Context;
19
use Deployer\Task\Task;
20
use Symfony\Component\Console\Input\InputInterface;
21
use Symfony\Component\Console\Output\OutputInterface;
22
use Symfony\Component\Process\Process;
23
24
class ParallelExecutor implements ExecutorInterface
25
{
26
    /**
27
     * @var InputInterface
28
     */
29
    private $input;
30
31
    /**
32
     * @var OutputInterface
33
     */
34
    private $output;
35
36
    /**
37
     * @var Informer
38
     */
39
    private $informer;
40
41
    /**
42
     * @var Application
43
     */
44
    private $console;
45
46
    /**
47
     * @param InputInterface $input
48
     * @param OutputInterface $output
49
     * @param Informer $informer
50
     * @param Application $console
51
     */
52 4
    public function __construct(InputInterface $input, OutputInterface $output, Informer $informer, Application $console)
53
    {
54 4
        $this->input = $input;
55 4
        $this->output = $output;
56 4
        $this->informer = $informer;
57 4
        $this->console = $console;
58 4
    }
59
60
    /**
61
     * {@inheritdoc}
62
     */
63 4
    public function run($tasks, $hosts)
64
    {
65 4
        $localhost = new Localhost();
66 4
        $limit = (int)$this->input->getOption('limit') ?: count($hosts);
67
68
        // We need contexts here for usage inside `on` function. Pass input/output to callback of it.
69
        // This allows to use code like this in parallel mode:
70
        //
71
        //     host('prod')
72
        //         ->set('branch', function () {
73
        //             return input()->getOption('branch') ?: 'production';
74
        //     })
75
        //
76
        // Otherwise `input()` wont be accessible (i.e. null)
77
        //
78 4
        Context::push(new Context($localhost, $this->input, $this->output));
79
        {
80 4
            Storage::persist($hosts);
81
        }
82 4
        Context::pop();
83
84 4
        foreach ($tasks as $task) {
85 4
            $success = true;
86 4
            $this->informer->startTask($task);
87
88 4
            if ($task->isLocal()) {
89 2
                Storage::load($hosts);
90
                {
91 2
                    $task->run(new Context($localhost, $this->input, $this->output));
92
                }
93 2
                Storage::flush($hosts);
94
            } else {
95 4
                foreach (array_chunk($hosts, $limit) as $chunk) {
96 4
                    $exitCode = $this->runTask($chunk, $task);
97
98
                    switch ($exitCode) {
99 4
                        case 1:
100
                            throw new GracefulShutdownException();
101 4
                        case 2:
102
                            $success = false;
103
                            break;
104 4
                        case 255:
105 4
                            throw new Exception();
106
                    }
107
                }
108
            }
109
110 4
            if ($success) {
111 4
                $this->informer->endTask($task);
112
            } else {
113 4
                $this->informer->taskError();
114
            }
115
        }
116 4
    }
117
118
    /**
119
     * Run task on hosts.
120
     *
121
     * @param Host[] $hosts
122
     * @param Task $task
123
     * @return int
124
     */
125 4
    private function runTask(array $hosts, Task $task)
126
    {
127 4
        $processes = [];
128
129 4
        foreach ($hosts as $host) {
130 4
            if ($task->shouldBePerformed($host)) {
131 4
                $processes[$host->getHostname()] = $this->getProcess($host, $task);
132 4
                if ($task->isOnce()) {
133 4
                    break;
134
                }
135
            }
136
        }
137
138 4
        $callback = function ($type, $host, $output) {
139 4
            $output = rtrim($output);
140 4
            if (!empty($output)) {
141 4
                $this->output->writeln($output);
142
            }
143 4
        };
144
145 4
        $this->startProcesses($processes);
146
147 4
        while ($this->areRunning($processes)) {
148 4
            $this->gatherOutput($processes, $callback);
149 4
            usleep(1000);
150
        }
151 4
        $this->gatherOutput($processes, $callback);
152
153 4
        return $this->gatherExitCodes($processes);
154
    }
155
156
    /**
157
     * Get process for task on host.
158
     *
159
     * @param Host $host
160
     * @param Task $task
161
     * @return Process
162
     */
163 4
    protected function getProcess($host, Task $task)
164
    {
165 4
        $dep = PHP_BINARY . ' ' . DEPLOYER_BIN;
166 4
        $options = $this->generateOptions();
167 4
        $arguments = $this->generateArguments();
168 4
        $hostname = $host->getHostname();
169 4
        $taskName = $task->getName();
170 4
        $configFile = $host->get('host_config_storage');
171 4
        $value = $this->input->getOption('file');
172 4
        $file = $value ? "--file='$value'" : '';
173
174 4
        if ($this->output->isDecorated()) {
175
            $options .= ' --ansi';
176
        }
177
178 4
        $command = "$dep $file worker $arguments $options --hostname $hostname --task $taskName --config-file $configFile";
179 4
        $process = new Process($command);
180
181 4
        if (!defined('DEPLOYER_PARALLEL_PTY')) {
182
            $process->setPty(true);
183
        }
184
185 4
        return $process;
186
    }
187
188
    /**
189
     * Start all of the processes.
190
     *
191
     * @param Process[] $processes
192
     * @return void
193
     */
194 4
    protected function startProcesses(array $processes)
195
    {
196 4
        foreach ($processes as $process) {
197 4
            $process->start();
198
        }
199 4
    }
200
201
    /**
202
     * Determine if any of the processes are running.
203
     *
204
     * @param Process[] $processes
205
     * @return bool
206
     */
207 4
    protected function areRunning(array $processes)
208
    {
209 4
        foreach ($processes as $process) {
210 4
            if ($process->isRunning()) {
211 4
                return true;
212
            }
213
        }
214 4
        return false;
215
    }
216
217
    /**
218
     * Gather the output from all of the processes.
219
     *
220
     * @param Process[] $processes
221
     * @param callable $callback
222
     */
223 4
    protected function gatherOutput(array $processes, callable $callback)
224
    {
225 4
        foreach ($processes as $host => $process) {
226
            $methods = [
227 4
                Process::OUT => 'getIncrementalOutput',
228
                Process::ERR => 'getIncrementalErrorOutput',
229
            ];
230 4
            foreach ($methods as $type => $method) {
231 4
                $output = $process->{$method}();
232 4
                if (!empty($output)) {
233 4
                    $callback($type, $host, $output);
234
                }
235
            }
236
        }
237 4
    }
238
239
    /**
240
     * Gather the cumulative exit code for the processes.
241
     *
242
     * @param Process[] $processes
243
     * @return int
244
     */
245 4
    protected function gatherExitCodes(array $processes)
246
    {
247 4
        $code = 0;
248 4
        foreach ($processes as $process) {
249 4
            if ($process->getExitCode() > 0) {
250 4
                $code = $process->getExitCode();
251
            }
252
        }
253 4
        return $code;
254
    }
255
256
    /**
257
     * Generate options and arguments string.
258
     * @return string
259
     */
260 4
    private function generateOptions()
261
    {
262 4
        $verbosity = new VerbosityString($this->output);
263 4
        $input = $verbosity;
264
265
        // Get user arguments
266 4
        foreach ($this->console->getUserDefinition()->getArguments() as $argument) {
267
            $value = $this->input->getArgument($argument->getName());
268
            if ($value) {
269
                $input .= " $value";
270
            }
271
        }
272
273
        // Get user options
274 4
        foreach ($this->console->getUserDefinition()->getOptions() as $option) {
275 3
            $name = $option->getName();
276 3
            $value = $this->input->getOption($name);
277
278 3
            if ($value) {
279
                $input .= " --{$name}";
280
281
                if ($option->acceptValue()) {
282 3
                    $input .= " {$value}";
283
                }
284
            }
285
        }
286
287 4
        return $input;
288
    }
289
290 4
    private function generateArguments(): string
291
    {
292 4
        $arguments = '';
293
294 4
        if ($this->input->hasArgument('stage')) {
295
            // Some people rely on stage argument, so pass it to worker too.
296 4
            $arguments .= $this->input->getArgument('stage');
297
        }
298
299 4
        return $arguments;
300
    }
301
}
302