Master::run()   F
last analyzed

Complexity

Conditions 23
Paths 1028

Size

Total Lines 76

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 33
CRAP Score 28.2062

Importance

Changes 0
Metric Value
cc 23
nc 1028
nop 3
dl 0
loc 76
ccs 33
cts 42
cp 0.7856
crap 28.2062
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php declare(strict_types=1);
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Component\Ssh\Client;
11
use Deployer\Configuration\Configuration;
12
use Deployer\Deployer;
13
use Deployer\Host\Host;
14
use Deployer\Host\Localhost;
15
use Deployer\Selector\Selector;
16
use Deployer\Task\Task;
17
use Psr\Http\Message\ServerRequestInterface;
18
use Symfony\Component\Console\Input\InputInterface;
19
use Symfony\Component\Console\Output\OutputInterface;
20
use Symfony\Component\Process\Process;
21
use React;
22
23 1
const FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
24
25
function spinner($message = '')
26
{
27 1
    $frame = FRAMES[(int)(microtime(true) * 10) % count(FRAMES)];
28 1
    return "  $frame $message\r";
29
}
30
31
class Master
32
{
33
    private $input;
34
    private $output;
35
    private $messenger;
36
    private $client;
37
    private $config;
38
    private $port;
39
    /**
40
     * @var React\EventLoop\LoopInterface
41
     */
42
    private $loop;
43
44 10
    public function __construct(
45
        InputInterface $input,
46
        OutputInterface $output,
47
        Messenger $messenger,
48
        Client $client,
49
        Configuration $config
50
    )
51
    {
52 10
        $this->input = $input;
53 10
        $this->output = $output;
54 10
        $this->messenger = $messenger;
55 10
        $this->client = $client;
56 10
        $this->config = $config;
57 10
    }
58
59
    /**
60
     * @param Task[] $tasks
61
     * @param Host[] $hosts
62
     * @param Planner|null $plan
63
     * @return int
64
     */
65 10
    public function run(array $tasks, array $hosts, $plan = null): int
66
    {
67 10
        $plan || $this->connect($hosts);
68
69 10
        $globalLimit = (int)$this->input->getOption('limit') ?: count($hosts);
70
71 10
        foreach ($tasks as $task) {
72 10
            $plan || $this->messenger->startTask($task);
73
74 10
            $plannedHosts = $hosts;
75
76 10
            $limit = min($globalLimit, $task->getLimit() ?? $globalLimit);
77
78 10
            if ($task->isOnce()) {
79 4
                $plannedHosts = [];
80 4
                foreach ($hosts as $currentHost) {
81 4
                    if (Selector::apply($task->getSelector(), $currentHost)) {
82 4
                        $plannedHosts[] = $currentHost;
83 4
                        break;
84
                    }
85
                }
86
            }
87
88 10
            if ($task->isLocal()) {
89
                $plannedHosts = [new Localhost('localhost')];
90
            }
91
92 10
            if ($limit === 1 || count($plannedHosts) === 1) {
93 10
                foreach ($plannedHosts as $currentHost) {
94 10
                    if (!Selector::apply($task->getSelector(), $currentHost)) {
95
                        if ($plan) {
96
                            $plan->commit([], $task);
97
                        }
98
                        continue;
99
                    }
100
101 10
                    if ($plan) {
102
                        $plan->commit([$currentHost], $task);
103
                        continue;
104
                    }
105
106 10
                    $exitCode = $this->runTask($task, [$currentHost]);
107 10
                    if ($exitCode !== 0) {
108 2
                        return $exitCode;
109
                    }
110
                }
111
            } else {
112 2
                foreach (array_chunk($hosts, $limit) as $chunk) {
113 2
                    $selector = $task->getSelector();
114 2
                    $selectedHosts = [];
115 2
                    foreach ($chunk as $currentHost) {
116 2
                        if ($selector === null || Selector::apply($selector, $currentHost)) {
117 2
                            $selectedHosts[] = $currentHost;
118
                        }
119
                    }
120
121
122 2
                    if ($plan) {
123
                        $plan->commit($selectedHosts, $task);
124
                        continue;
125
                    }
126
127 2
                    $exitCode = $this->runTask($task, $selectedHosts);
128 2
                    if ($exitCode !== 0) {
129
                        return $exitCode;
130
                    }
131
                }
132
            }
133
134 10
            if (!$plan) {
135 10
                $this->messenger->endTask($task);
136
            }
137
        }
138
139 10
        return 0;
140
    }
141
142
    /**
143
     * @param Host[] $hosts
144
     */
145 10
    private function connect(array $hosts)
146
    {
147 View Code Duplication
        $callback = function (string $output) {
148
            $output = preg_replace('/\n$/', '', $output);
149
            if (strlen($output) !== 0) {
150
                $this->output->writeln($output);
151
            }
152 10
        };
153
154
        // Connect to each host sequentially, to prevent getting locked.
155 10
        foreach ($hosts as $host) {
156 10
            if ($host instanceof Localhost) {
157 10
                continue;
158
            }
159
            $process = $this->getProcess($host, new Task('connect'));
160
            $process->start();
161
162
            while ($process->isRunning()) {
163
                $this->gatherOutput([$process], $callback);
164
                $this->output->write(spinner(str_pad("connect {$host->getTag()}", intval(getenv('COLUMNS')) - 1)));
165
                usleep(1000);
166
            }
167
        }
168
169
        // Clear spinner.
170 10
        $this->output->write(str_repeat(' ', intval(getenv('COLUMNS')) - 1) . "\r");
171 10
    }
172
173
    /**
174
     * @param Task $task
175
     * @param Host[] $hosts
176
     * @return int
177
     */
178 10
    private function runTask(Task $task, array $hosts): int
179
    {
180 10
        if (getenv('DEPLOYER_LOCAL_WORKER') === 'true') {
181
            // This allows to code coverage all recipe,
182
            // as well as speedup tests by not spawning
183
            // lots of processes. Also there is a few tests
184
            // what runs with workers for tests subprocess
185
            // communications.
186 9
            foreach ($hosts as $host) {
187 9
                $worker = new Worker(Deployer::get());
188 9
                $exitCode = $worker->execute($task, $host);
189 9
                if ($exitCode !== 0) {
190 2
                    return $exitCode;
191
                }
192
            }
193 9
            return 0;
194
        }
195
196 1
        $processes = [];
197 1
        foreach ($hosts as $host) {
198 1
            $processes[] = $this->getProcess($host, $task);
199
        }
200
201 1
        foreach ($processes as $process) {
202 1
            $process->start();
203
        }
204
205 1
        $this->createServer();
206
207 View Code Duplication
        $callback = function (string $output) {
208 1
            $output = preg_replace('/\n$/', '', $output);
209 1
            if (strlen($output) !== 0) {
210 1
                $this->output->writeln($output);
211
            }
212 1
        };
213
214
        $this->loop->addPeriodicTimer(0.03, function () use ($processes, $callback) {
215 1
            $this->gatherOutput($processes, $callback);
216 1
            $this->output->write(spinner());
217 1
            if (!$this->areRunning($processes)) {
218 1
                $this->loop->stop();
219
            }
220 1
        });
221
222 1
        $this->loop->run();
223 1
        $this->output->write("    \r"); // clear spinner
224 1
        $this->gatherOutput($processes, $callback);
225 1
        return $this->cumulativeExitCode($processes);
226
    }
227
228 1
    protected function createServer()
229
    {
230 1
        $this->loop = React\EventLoop\Factory::create();
231
        $server = new React\Http\Server($this->loop, function (ServerRequestInterface $request) {
0 ignored issues
show
Unused Code introduced by Anton Medvedev
The parameter $request is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
232
            return new React\Http\Message\Response(
233
                200,
234
                array(
235
                    'Content-Type' => 'text/plain'
236
                ),
237
                "Hello World!\n"
238
            );
239 1
        });
240 1
        $socket = new React\Socket\Server(0, $this->loop);
241 1
        $server->listen($socket);
242 1
        $address = $socket->getAddress();
243 1
        $this->port = parse_url($address, PHP_URL_PORT);
244 1
    }
245
246 1
    protected function getProcess(Host $host, Task $task): Process
247
    {
248 1
        $dep = PHP_BINARY . ' ' . DEPLOYER_BIN;
249 1
        $configDirectory = $host->get('config_directory');
250 1
        $decorated = $this->output->isDecorated() ? '--decorated' : '';
251 1
        $command = "$dep worker $task {$host->getAlias()} $configDirectory {$this->input} $decorated";
252
253 1
        if ($this->output->isDebug()) {
254
            $this->output->writeln("[{$host->getTag()}] $command");
255
        }
256
257 1
        return Process::fromShellCommandline($command);
258
    }
259
260
    /**
261
     * @param Process[] $processes
262
     * @return bool
263
     */
264 1
    protected function areRunning(array $processes): bool
265
    {
266 1
        foreach ($processes as $process) {
267 1
            if ($process->isRunning()) {
268 1
                return true;
269
            }
270
        }
271 1
        return false;
272
    }
273
274
    /**
275
     * @param Process[] $processes
276
     * @param callable $callback
277
     */
278 1
    protected function gatherOutput(array $processes, callable $callback)
279
    {
280 1
        foreach ($processes as $process) {
281 1
            $output = $process->getIncrementalOutput();
282 1
            if (strlen($output) !== 0) {
283 1
                $callback($output);
284
            }
285
286 1
            $errorOutput = $process->getIncrementalErrorOutput();
287 1
            if (strlen($errorOutput) !== 0) {
288
                $callback($errorOutput);
289
            }
290
        }
291 1
    }
292
293
    /**
294
     * @param Process[] $processes
295
     * @return int
296
     */
297 1
    protected function cumulativeExitCode(array $processes): int
298
    {
299 1
        foreach ($processes as $process) {
300 1
            if ($process->getExitCode() > 0) {
301
                return $process->getExitCode();
302
            }
303
        }
304 1
        return 0;
305
    }
306
}
307