GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

ParallelExecutor::run()   F
last analyzed

Complexity

Conditions 19
Paths 1028

Size

Total Lines 71

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 32
CRAP Score 22.8176

Importance

Changes 0
Metric Value
cc 19
nc 1028
nop 3
dl 0
loc 71
ccs 32
cts 41
cp 0.7805
crap 22.8176
rs 0.3499
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php declare(strict_types=1);
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Component\Ssh\Client;
11
use Deployer\Configuration\Configuration;
12
use Deployer\Exception\Exception;
13
use Deployer\Exception\GracefulShutdownException;
14
use Deployer\Host\Host;
15
use Deployer\Host\Localhost;
16
use Deployer\Selector\Selector;
17
use Deployer\Task\Context;
18
use Deployer\Task\Task;
19
use Symfony\Component\Console\Input\InputInterface;
20
use Symfony\Component\Console\Output\OutputInterface;
21
use Symfony\Component\Process\Process;
22
23 1
const FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
24
25
function spinner($message = '')
26
{
27 1
    $frame = FRAMES[(int)(microtime(true) * 10) % count(FRAMES)];
28 1
    return "  $frame $message\r";
29
}
30
31
class ParallelExecutor
32
{
33
    private $input;
34
    private $output;
35
    private $messenger;
36
    private $client;
37
    private $config;
38
39 8
    public function __construct(
40
        InputInterface $input,
41
        OutputInterface $output,
42
        Messenger $messenger,
43
        Client $client,
44
        Configuration $config
45
    )
46
    {
47 8
        $this->input = $input;
48 8
        $this->output = $output;
49 8
        $this->messenger = $messenger;
50 8
        $this->client = $client;
51 8
        $this->config = $config;
52 8
    }
53
54
    /**
55
     * @param Host[] $hosts
56
     */
57 8
    private function connect(array $hosts)
58
    {
59 View Code Duplication
        $callback = function (string $output) {
60
            $output = preg_replace('/\n$/', '', $output);
61
            if (strlen($output) !== 0) {
62
                $this->output->writeln($output);
63
            }
64 8
        };
65
66
        // Connect to each host sequentially, to prevent getting locked.
67 8
        foreach ($hosts as $host) {
68 8
            if ($host instanceof Localhost) {
69 8
                continue;
70
            }
71
            $process = $this->getProcess($host, new Task('connect'));
72
            $process->start();
73
74
            while ($process->isRunning()) {
75
                $this->gatherOutput([$process], $callback);
76
                $this->output->write(spinner(str_pad("connect {$host->getTag()}", intval(getenv('COLUMNS')) - 1)));
77
                usleep(1000);
78
            }
79
        }
80
81
        // Clear spinner.
82 8
        $this->output->write(str_repeat(' ', intval(getenv('COLUMNS')) - 1) . "\r");
83 8
    }
84
85
    /**
86
     * @param Task[] $tasks
87
     * @param Host[] $hosts
88
     * @param Planner|null $plan
89
     * @return int
90
     */
91 8
    public function run(array $tasks, array $hosts, $plan = null): int
92
    {
93 8
        $plan || $this->connect($hosts);
94
95 8
        $globalLimit = (int)$this->input->getOption('limit') ?: count($hosts);
96
97 8
        foreach ($tasks as $task) {
98 8
            $plan || $this->messenger->startTask($task);
99
100 8
            $plannedHosts = $hosts;
101
102 8
            $limit = min($globalLimit, $task->getLimit() ?? $globalLimit);
103
104 8
            if ($task->isOnce()) {
105 4
                $plannedHosts = [];
106 4
                foreach ($hosts as $host) {
107 4
                    if (Selector::apply($task->getSelector(), $host)) {
108 4
                        $plannedHosts[] = $host;
109 4
                        break;
110
                    }
111
                }
112
            }
113
114 8
            if ($task->isLocal()) {
115
                $plannedHosts = [new Localhost('localhost')];
116
            }
117
118 8
            if ($limit === 1 || count($plannedHosts) === 1) {
119 8
                foreach ($plannedHosts as $host) {
120 8
                    if (!Selector::apply($task->getSelector(), $host)) {
121
                        if ($plan) {
122
                            $plan->commit([], $task);
123
                        }
124
                        continue;
125
                    }
126
127 8
                    if ($plan) {
128
                        $plan->commit([$host], $task);
129
                        continue;
130
                    }
131
132
                    try {
133 8
                        $host->getConfig()->load();
134 8
                        Exception::setTaskSourceLocation($task->getSourceLocation());
135
136 8
                        $task->run(new Context($host, $this->input, $this->output));
137
138 8
                        $this->messenger->endOnHost($host);
139 8
                        $host->getConfig()->save();
140 1
                    } catch (GracefulShutdownException $exception) {
141
                        $this->messenger->renderException($exception, $host);
142
                        return GracefulShutdownException::EXIT_CODE;
143 1
                    } catch (\Throwable $exception) {
144 1
                        $this->messenger->renderException($exception, $host);
145 1
                        return 1;
146
                    }
147
                }
148
            } else {
149 1
                foreach (array_chunk($hosts, $limit) as $chunk) {
150 1
                    $exitCode = $this->runTask($chunk, $task, $plan);
151 1
                    if ($exitCode !== 0) {
152
                        return $exitCode;
153
                    }
154
                }
155
            }
156
157 8
            $this->messenger->endTask($task);
158
        }
159
160 8
        return 0;
161
    }
162
163 1
    private function runTask(array $hosts, Task $task, Planner $plan = null): int
164
    {
165 1
        $processes = [];
166 1
        $selectedHosts = [];
167 1
        foreach ($hosts as $host) {
168 1
            $selector = $task->getSelector();
169 1
            if ($selector === null || Selector::apply($selector, $host)) {
170 1
                $selectedHosts[] = $host;
171 1
                $plan || $processes[] = $this->getProcess($host, $task);
172
            }
173
        }
174
175 1
        if ($plan) {
176
            $plan->commit($selectedHosts, $task);
177
            return 0;
178
        }
179
180 View Code Duplication
        $callback = function (string $output) {
181 1
            $output = preg_replace('/\n$/', '', $output);
182 1
            if (strlen($output) !== 0) {
183 1
                $this->output->writeln($output);
184
            }
185 1
        };
186
187 1
        $this->startProcesses($processes);
188
189 1
        while ($this->areRunning($processes)) {
190 1
            $this->gatherOutput($processes, $callback);
191 1
            $this->output->write(spinner());
192 1
            usleep(1000);
193
        }
194
195
        // Clear spinner.
196 1
        $this->output->write("    \r");
197
198 1
        $this->gatherOutput($processes, $callback);
199
200 1
        return $this->cumulativeExitCode($processes);
201
    }
202
203 1
    protected function getProcess(Host $host, Task $task): Process
204
    {
205 1
        $dep = PHP_BINARY . ' ' . DEPLOYER_BIN;
206 1
        $configDirectory = $host->get('config_directory');
207 1
        $decorated = $this->output->isDecorated() ? '--decorated' : '';
208 1
        $command = "$dep worker $task {$host->getAlias()} $configDirectory {$this->input} $decorated";
209
210 1
        if ($this->output->isDebug()) {
211
            $this->output->writeln("[{$host->getTag()}] $command");
212
        }
213
214 1
        return Process::fromShellCommandline($command);
215
    }
216
217
    /**
218
     * @param Process[] $processes
219
     */
220 1
    protected function startProcesses(array $processes)
221
    {
222 1
        foreach ($processes as $process) {
223 1
            $process->start();
224
        }
225 1
    }
226
227
    /**
228
     * @param Process[] $processes
229
     */
230 1
    protected function areRunning(array $processes): bool
231
    {
232 1
        foreach ($processes as $process) {
233 1
            if ($process->isRunning()) {
234 1
                return true;
235
            }
236
        }
237
238 1
        return false;
239
    }
240
241
    /**
242
     * @param Process[] $processes
243
     */
244 1
    protected function gatherOutput(array $processes, callable $callback)
245
    {
246 1
        foreach ($processes as $process) {
247 1
            $output = $process->getIncrementalOutput();
248 1
            if (strlen($output) !== 0) {
249 1
                $callback($output);
250
            }
251
252 1
            $errorOutput = $process->getIncrementalErrorOutput();
253 1
            if (strlen($errorOutput) !== 0) {
254
                $callback($errorOutput);
255
            }
256
        }
257 1
    }
258
259
    /**
260
     * Gather the cumulative exit code for the processes.
261
     */
262 1
    protected function cumulativeExitCode(array $processes): int
263
    {
264 1
        foreach ($processes as $process) {
265 1
            if ($process->getExitCode() > 0) {
266
                return $process->getExitCode();
267
            }
268
        }
269
270 1
        return 0;
271
    }
272
}
273