Test Failed
Push — master ( 6c9176...e80da1 )
by Anton
03:08
created

ParallelExecutor::run()   F

Complexity

Conditions 18
Paths 452

Size

Total Lines 64

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 30.9719

Importance

Changes 0
Metric Value
cc 18
nc 452
nop 3
dl 0
loc 64
ccs 25
cts 38
cp 0.6579
crap 30.9719
rs 1.4611
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php declare(strict_types=1);
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Component\Ssh\Client;
11
use Deployer\Configuration\Configuration;
12
use Deployer\Exception\Exception;
13
use Deployer\Exception\GracefulShutdownException;
14
use Deployer\Host\Host;
15
use Deployer\Host\Localhost;
16
use Deployer\Selector\Selector;
17
use Deployer\Task\Context;
18
use Deployer\Task\Task;
19
use Symfony\Component\Console\Input\InputInterface;
20
use Symfony\Component\Console\Output\OutputInterface;
21
use Symfony\Component\Process\Process;
22
23 1
const FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
24
25
function spinner($message = '')
26
{
27 1
    $frame = FRAMES[(int)(microtime(true) * 10) % count(FRAMES)];
28 1
    return "  $frame $message\r";
29
}
30
31
class ParallelExecutor
32
{
33
    private $input;
34
    private $output;
35
    private $messenger;
36
    private $client;
37
    private $config;
38
39 8
    public function __construct(
40
        InputInterface $input,
41
        OutputInterface $output,
42
        Messenger $messenger,
43
        Client $client,
44
        Configuration $config
45
    )
46
    {
47 8
        $this->input = $input;
48 8
        $this->output = $output;
49 8
        $this->messenger = $messenger;
50 8
        $this->client = $client;
51 8
        $this->config = $config;
52 8
    }
53
54
    /**
55
     * @param Host[] $hosts
56
     */
57 8
    private function connect(array $hosts)
58
    {
59 View Code Duplication
        $callback = function (string $output) {
60
            $output = preg_replace('/\n$/', '', $output);
61
            if (strlen($output) !== 0) {
62
                $this->output->writeln($output);
63
            }
64 8
        };
65
66
        // Connect to each host sequentially, to prevent getting locked.
67 8
        foreach ($hosts as $host) {
68 8
            if ($host instanceof Localhost) {
69 8
                continue;
70
            }
71
            $process = $this->getProcess($host, new Task('connect'));
72
            $process->start();
73
74
            while ($process->isRunning()) {
75
                $this->gatherOutput([$process], $callback);
76
                $this->output->write(spinner(str_pad("connect {$host->tag()}", intval(getenv('COLUMNS')) - 1)));
77
                usleep(1000);
78
            }
79
        }
80
81
        // Clear spinner.
82 8
        $this->output->write(str_repeat(' ', intval(getenv('COLUMNS')) - 1) . "\r");
83 8
    }
84
85
    /**
86
     * @param Task[] $tasks
87
     * @param Host[] $hosts
88
     * @param Planner|null $plan
89
     * @return int
90
     */
91 8
    public function run(array $tasks, array $hosts, $plan = null): int
92
    {
93 8
        $plan || $this->connect($hosts);
94
95 8
        $globalLimit = (int)$this->input->getOption('limit') ?: count($hosts);
96
97 8
        foreach ($tasks as $task) {
98 8
            $plan || $this->messenger->startTask($task);
99
100 8
            $limit = min($globalLimit, $task->getLimit() ?? $globalLimit);
101 8
            $plannedHosts = $hosts;
102 8
            if ($task->isOnce()) {
103
                $plannedHosts = [];
104
                foreach ($hosts as $host) {
105
                    if (Selector::apply($task->getSelector(), $host)) {
106
                        $plannedHosts[] = $host;
107
                        break;
108
                    }
109
                }
110
            }
111
112 8
            if ($limit === 1 || count($plannedHosts) === 1) {
113 7
                foreach ($plannedHosts as $host) {
114 7
                    if (!Selector::apply($task->getSelector(), $host)) {
115
                        if ($plan) {
116
                            $plan->commit([], $task);
117
                        }
118
                        continue;
119
                    }
120
121 7
                    if ($plan) {
122
                        $plan->commit([$host], $task);
123
                        continue;
124
                    }
125
126
                    try {
127 7
                        $host->getConfig()->load();
128 7
                        Exception::setTaskSourceLocation($task->getSourceLocation());
129
130 7
                        $task->run(new Context($host, $this->input, $this->output));
131
132 7
                        $host->getConfig()->save();
133 1
                    } catch (GracefulShutdownException $exception) {
134
                        $this->messenger->renderException($exception, $host);
135
                        return GracefulShutdownException::EXIT_CODE;
136 1
                    } catch (\Throwable $exception) {
137 1
                        $this->messenger->renderException($exception, $host);
138 1
                        return 1;
139
                    }
140
                }
141
            } else {
142 1
                foreach (array_chunk($hosts, $limit) as $chunk) {
143 1
                    $exitCode = $this->runTask($chunk, $task, $plan);
144 1
                    if ($exitCode !== 0) {
145
                        return $exitCode;
146
                    }
147
                }
148
            }
149
150 8
            $this->messenger->endTask($task);
151
        }
152
153 8
        return 0;
154
    }
155
156 1
    private function runTask(array $hosts, Task $task, Planner $plan = null): int
157
    {
158 1
        $processes = [];
159 1
        $selectedHosts = [];
160 1
        foreach ($hosts as $host) {
161 1
            $selector = $task->getSelector();
162 1
            if ($selector === null || Selector::apply($selector, $host)) {
163 1
                $selectedHosts[] = $host;
164 1
                $plan || $processes[] = $this->getProcess($host, $task);
165
            }
166
        }
167
168 1
        if ($plan) {
169
            $plan->commit($selectedHosts, $task);
170
            return 0;
171
        }
172
173 View Code Duplication
        $callback = function (string $output) {
174 1
            $output = preg_replace('/\n$/', '', $output);
175 1
            if (strlen($output) !== 0) {
176 1
                $this->output->writeln($output);
177
            }
178 1
        };
179
180 1
        $this->startProcesses($processes);
181
182 1
        while ($this->areRunning($processes)) {
183 1
            $this->gatherOutput($processes, $callback);
184 1
            $this->output->write(spinner());
185 1
            usleep(1000);
186
        }
187
188
        // Clear spinner.
189 1
        $this->output->write("    \r");
190
191 1
        $this->gatherOutput($processes, $callback);
192
193 1
        return $this->cumulativeExitCode($processes);
194
    }
195
196 1
    protected function getProcess(Host $host, Task $task): Process
197
    {
198 1
        $dep = PHP_BINARY . ' ' . DEPLOYER_BIN;
199 1
        $configDirectory = $host->get('config_directory');
200 1
        $decorated = $this->output->isDecorated() ? '--decorated' : '';
201 1
        $command = "$dep worker $task {$host->alias()} $configDirectory {$this->input} $decorated";
202
203 1
        if ($this->output->isDebug()) {
204
            $this->output->writeln("[{$host->tag()}] $command");
205
        }
206
207 1
        return Process::fromShellCommandline($command);
208
    }
209
210
    /**
211
     * @param Process[] $processes
212
     */
213 1
    protected function startProcesses(array $processes)
214
    {
215 1
        foreach ($processes as $process) {
216 1
            $process->start();
217
        }
218 1
    }
219
220
    /**
221
     * @param Process[] $processes
222
     */
223 1
    protected function areRunning(array $processes): bool
224
    {
225 1
        foreach ($processes as $process) {
226 1
            if ($process->isRunning()) {
227 1
                return true;
228
            }
229
        }
230
231 1
        return false;
232
    }
233
234
    /**
235
     * @param Process[] $processes
236
     */
237 1
    protected function gatherOutput(array $processes, callable $callback)
238
    {
239 1
        foreach ($processes as $process) {
240 1
            $output = $process->getIncrementalOutput();
241 1
            if (strlen($output) !== 0) {
242 1
                $callback($output);
243
            }
244
245 1
            $errorOutput = $process->getIncrementalErrorOutput();
246 1
            if (strlen($errorOutput) !== 0) {
247
                $callback($errorOutput);
248
            }
249
        }
250 1
    }
251
252
    /**
253
     * Gather the cumulative exit code for the processes.
254
     */
255 1
    protected function cumulativeExitCode(array $processes): int
256
    {
257 1
        foreach ($processes as $process) {
258 1
            if ($process->getExitCode() > 0) {
259
                return $process->getExitCode();
260
            }
261
        }
262
263 1
        return 0;
264
    }
265
}
266