GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( 6c9176...e80da1 )
by Anton
03:08
created

ParallelExecutor::run()   F

Complexity

Conditions 18
Paths 452

Size

Total Lines 64

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 30.9719

Importance

Changes 0
Metric Value
cc 18
nc 452
nop 3
dl 0
loc 64
ccs 25
cts 38
cp 0.6579
crap 30.9719
rs 1.4611
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php declare(strict_types=1);
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Component\Ssh\Client;
11
use Deployer\Configuration\Configuration;
12
use Deployer\Exception\Exception;
13
use Deployer\Exception\GracefulShutdownException;
14
use Deployer\Host\Host;
15
use Deployer\Host\Localhost;
16
use Deployer\Selector\Selector;
17
use Deployer\Task\Context;
18
use Deployer\Task\Task;
19
use Symfony\Component\Console\Input\InputInterface;
20
use Symfony\Component\Console\Output\OutputInterface;
21
use Symfony\Component\Process\Process;
22
23 1
const FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
24
25
function spinner($message = '')
26
{
27 1
    $frame = FRAMES[(int)(microtime(true) * 10) % count(FRAMES)];
28 1
    return "  $frame $message\r";
29
}
30
31
class ParallelExecutor
32
{
33
    private $input;
34
    private $output;
35
    private $messenger;
36
    private $client;
37
    private $config;
38
39 8
    public function __construct(
40
        InputInterface $input,
41
        OutputInterface $output,
42
        Messenger $messenger,
43
        Client $client,
44
        Configuration $config
45
    )
46
    {
47 8
        $this->input = $input;
48 8
        $this->output = $output;
49 8
        $this->messenger = $messenger;
50 8
        $this->client = $client;
51 8
        $this->config = $config;
52 8
    }
53
54
    /**
55
     * @param Host[] $hosts
56
     */
57 8
    private function connect(array $hosts)
58
    {
59 View Code Duplication
        $callback = function (string $output) {
60
            $output = preg_replace('/\n$/', '', $output);
61
            if (strlen($output) !== 0) {
62
                $this->output->writeln($output);
63
            }
64 8
        };
65
66
        // Connect to each host sequentially, to prevent getting locked.
67 8
        foreach ($hosts as $host) {
68 8
            if ($host instanceof Localhost) {
69 8
                continue;
70
            }
71
            $process = $this->getProcess($host, new Task('connect'));
72
            $process->start();
73
74
            while ($process->isRunning()) {
75
                $this->gatherOutput([$process], $callback);
76
                $this->output->write(spinner(str_pad("connect {$host->tag()}", intval(getenv('COLUMNS')) - 1)));
77
                usleep(1000);
78
            }
79
        }
80
81
        // Clear spinner.
82 8
        $this->output->write(str_repeat(' ', intval(getenv('COLUMNS')) - 1) . "\r");
83 8
    }
84
85
    /**
86
     * @param Task[] $tasks
87
     * @param Host[] $hosts
88
     * @param Planner|null $plan
89
     * @return int
90
     */
91 8
    public function run(array $tasks, array $hosts, $plan = null): int
92
    {
93 8
        $plan || $this->connect($hosts);
94
95 8
        $globalLimit = (int)$this->input->getOption('limit') ?: count($hosts);
96
97 8
        foreach ($tasks as $task) {
98 8
            $plan || $this->messenger->startTask($task);
99
100 8
            $limit = min($globalLimit, $task->getLimit() ?? $globalLimit);
101 8
            $plannedHosts = $hosts;
102 8
            if ($task->isOnce()) {
103
                $plannedHosts = [];
104
                foreach ($hosts as $host) {
105
                    if (Selector::apply($task->getSelector(), $host)) {
106
                        $plannedHosts[] = $host;
107
                        break;
108
                    }
109
                }
110
            }
111
112 8
            if ($limit === 1 || count($plannedHosts) === 1) {
113 7
                foreach ($plannedHosts as $host) {
114 7
                    if (!Selector::apply($task->getSelector(), $host)) {
115
                        if ($plan) {
116
                            $plan->commit([], $task);
117
                        }
118
                        continue;
119
                    }
120
121 7
                    if ($plan) {
122
                        $plan->commit([$host], $task);
123
                        continue;
124
                    }
125
126
                    try {
127 7
                        $host->getConfig()->load();
128 7
                        Exception::setTaskSourceLocation($task->getSourceLocation());
129
130 7
                        $task->run(new Context($host, $this->input, $this->output));
131
132 7
                        $host->getConfig()->save();
133 1
                    } catch (GracefulShutdownException $exception) {
134
                        $this->messenger->renderException($exception, $host);
135
                        return GracefulShutdownException::EXIT_CODE;
136 1
                    } catch (\Throwable $exception) {
137 1
                        $this->messenger->renderException($exception, $host);
138 1
                        return 1;
139
                    }
140
                }
141
            } else {
142 1
                foreach (array_chunk($hosts, $limit) as $chunk) {
143 1
                    $exitCode = $this->runTask($chunk, $task, $plan);
144 1
                    if ($exitCode !== 0) {
145
                        return $exitCode;
146
                    }
147
                }
148
            }
149
150 8
            $this->messenger->endTask($task);
151
        }
152
153 8
        return 0;
154
    }
155
156 1
    private function runTask(array $hosts, Task $task, Planner $plan = null): int
157
    {
158 1
        $processes = [];
159 1
        $selectedHosts = [];
160 1
        foreach ($hosts as $host) {
161 1
            $selector = $task->getSelector();
162 1
            if ($selector === null || Selector::apply($selector, $host)) {
163 1
                $selectedHosts[] = $host;
164 1
                $plan || $processes[] = $this->getProcess($host, $task);
165
            }
166
        }
167
168 1
        if ($plan) {
169
            $plan->commit($selectedHosts, $task);
170
            return 0;
171
        }
172
173 View Code Duplication
        $callback = function (string $output) {
174 1
            $output = preg_replace('/\n$/', '', $output);
175 1
            if (strlen($output) !== 0) {
176 1
                $this->output->writeln($output);
177
            }
178 1
        };
179
180 1
        $this->startProcesses($processes);
181
182 1
        while ($this->areRunning($processes)) {
183 1
            $this->gatherOutput($processes, $callback);
184 1
            $this->output->write(spinner());
185 1
            usleep(1000);
186
        }
187
188
        // Clear spinner.
189 1
        $this->output->write("    \r");
190
191 1
        $this->gatherOutput($processes, $callback);
192
193 1
        return $this->cumulativeExitCode($processes);
194
    }
195
196 1
    protected function getProcess(Host $host, Task $task): Process
197
    {
198 1
        $dep = PHP_BINARY . ' ' . DEPLOYER_BIN;
199 1
        $configDirectory = $host->get('config_directory');
200 1
        $decorated = $this->output->isDecorated() ? '--decorated' : '';
201 1
        $command = "$dep worker $task {$host->alias()} $configDirectory {$this->input} $decorated";
202
203 1
        if ($this->output->isDebug()) {
204
            $this->output->writeln("[{$host->tag()}] $command");
205
        }
206
207 1
        return Process::fromShellCommandline($command);
208
    }
209
210
    /**
211
     * @param Process[] $processes
212
     */
213 1
    protected function startProcesses(array $processes)
214
    {
215 1
        foreach ($processes as $process) {
216 1
            $process->start();
217
        }
218 1
    }
219
220
    /**
221
     * @param Process[] $processes
222
     */
223 1
    protected function areRunning(array $processes): bool
224
    {
225 1
        foreach ($processes as $process) {
226 1
            if ($process->isRunning()) {
227 1
                return true;
228
            }
229
        }
230
231 1
        return false;
232
    }
233
234
    /**
235
     * @param Process[] $processes
236
     */
237 1
    protected function gatherOutput(array $processes, callable $callback)
238
    {
239 1
        foreach ($processes as $process) {
240 1
            $output = $process->getIncrementalOutput();
241 1
            if (strlen($output) !== 0) {
242 1
                $callback($output);
243
            }
244
245 1
            $errorOutput = $process->getIncrementalErrorOutput();
246 1
            if (strlen($errorOutput) !== 0) {
247
                $callback($errorOutput);
248
            }
249
        }
250 1
    }
251
252
    /**
253
     * Gather the cumulative exit code for the processes.
254
     */
255 1
    protected function cumulativeExitCode(array $processes): int
256
    {
257 1
        foreach ($processes as $process) {
258 1
            if ($process->getExitCode() > 0) {
259
                return $process->getExitCode();
260
            }
261
        }
262
263 1
        return 0;
264
    }
265
}
266