Passed
Push — master ( 1dba41...0db4d1 )
by Anton
02:05
created

ParallelExecutor::generateOptions()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3.0175

Importance

Changes 0
Metric Value
cc 3
nc 4
nop 0
dl 0
loc 20
ccs 7
cts 8
cp 0.875
crap 3.0175
rs 9.6
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Application;
11
use Deployer\Console\Input\Argument;
12
use Deployer\Console\Input\Option;
13
use Deployer\Console\Output\Informer;
14
use Deployer\Console\Output\VerbosityString;
15
use Deployer\Exception\Exception;
16
use Deployer\Exception\GracefulShutdownException;
17
use Deployer\Host\Host;
18
use Deployer\Host\Localhost;
19
use Deployer\Host\Storage;
20
use Deployer\Task\Context;
21
use Deployer\Task\Task;
22
use Symfony\Component\Console\Input\InputInterface;
23
use Symfony\Component\Console\Input\InputOption;
24
use Symfony\Component\Console\Output\OutputInterface;
25
use Symfony\Component\Process\Process;
26
27
class ParallelExecutor implements ExecutorInterface
28
{
29
    /**
30
     * @var InputInterface
31
     */
32
    private $input;
33
34
    /**
35
     * @var OutputInterface
36
     */
37
    private $output;
38
39
    /**
40
     * @var Informer
41
     */
42
    private $informer;
43
44
    /**
45
     * @var Application
46
     */
47
    private $console;
48
49 5
    public function __construct(
50
        InputInterface $input,
51
        OutputInterface $output,
52
        Informer $informer,
53
        Application $console
54
    ) {
55 5
        $this->input = $input;
56 5
        $this->output = $output;
57 5
        $this->informer = $informer;
58 5
        $this->console = $console;
59 5
    }
60
61
    /**
62
     * {@inheritdoc}
63
     */
64 5
    public function run(array $tasks, array $hosts)
65
    {
66 5
        $localhost = new Localhost();
67 5
        $limit = (int) $this->input->getOption('limit') ?: count($hosts);
68
69
        // We need contexts here for usage inside `on` function. Pass input/output to callback of it.
70
        // This allows to use code like this in parallel mode:
71
        //
72
        //     host('prod')
73
        //         ->set('branch', function () {
74
        //             return input()->getOption('branch') ?: 'production';
75
        //     })
76
        //
77
        // Otherwise `input()` wont be accessible (i.e. null)
78
        //
79 5
        Context::push(new Context($localhost, $this->input, $this->output));
80
        {
81 5
            Storage::persist($hosts);
82
        }
83 5
        Context::pop();
84
85 5
        foreach ($tasks as $task) {
86 5
            $success = true;
87 5
            $this->informer->startTask($task);
88
89 5
            if ($task->isLocal()) {
90 2
                Storage::load($hosts);
91
                {
92 2
                    $task->run(new Context($localhost, $this->input, $this->output));
93
                }
94 2
                Storage::flush($hosts);
95
            } else {
96 5
                foreach (array_chunk($hosts, $limit) as $chunk) {
97 5
                    $exitCode = $this->runTask($chunk, $task);
98
99
                    switch ($exitCode) {
100 5
                        case 1:
101
                            throw new GracefulShutdownException();
102 5
                        case 2:
103
                            $success = false;
104
                            break;
105 5
                        case 255:
106 5
                            throw new Exception();
107
                    }
108
                }
109
            }
110
111 5
            if ($success) {
112 5
                $this->informer->endTask($task);
113
            } else {
114 5
                $this->informer->taskError();
115
            }
116
        }
117 5
    }
118
119
    /**
120
     * Run task on hosts.
121
     *
122
     * @param Host[] $hosts
123
     */
124 5
    private function runTask(array $hosts, Task $task): int
125
    {
126 5
        $processes = [];
127
128 5
        foreach ($hosts as $host) {
129 5
            if ($task->shouldBePerformed($host)) {
130 5
                $processes[$host->getHostname()] = $this->getProcess($host, $task);
131 5
                if ($task->isOnce()) {
132 5
                    $task->setHasRun();
133
                }
134
            }
135
        }
136
137 5
        $callback = function (string $type, string $host, string $output) {
138 5
            $output = rtrim($output);
139 5
            if (strlen($output) !== 0) {
140 5
                $this->output->writeln($output);
141
            }
142 5
        };
143
144 5
        $this->startProcesses($processes);
145
146 5
        while ($this->areRunning($processes)) {
147 5
            $this->gatherOutput($processes, $callback);
148 5
            usleep(1000);
149
        }
150 5
        $this->gatherOutput($processes, $callback);
151
152 5
        return $this->gatherExitCodes($processes);
153
    }
154
155
    /**
156
     * Get process for task on host.
157
     */
158 5
    protected function getProcess(Host $host, Task $task): Process
159
    {
160 5
        $dep = PHP_BINARY.' '.DEPLOYER_BIN;
161 5
        $options = $this->generateOptions();
162 5
        $arguments = $this->generateArguments();
163 5
        $hostname = $host->getHostname();
164 5
        $taskName = $task->getName();
165 5
        $configFile = $host->get('host_config_storage');
166 5
        $value = $this->input->getOption('file');
167 5
        $file = $value ? "--file='$value'" : '';
168
169 5
        if ($this->output->isDecorated()) {
170
            $options .= ' --ansi';
171
        }
172
173 5
        $command = "$dep $file worker $arguments $options --hostname $hostname --task $taskName --config-file $configFile";
174 5
        $process = new Process($command);
175
176 5
        if (!defined('DEPLOYER_PARALLEL_PTY')) {
177
            $process->setPty(true);
178
        }
179
180 5
        return $process;
181
    }
182
183
    /**
184
     * Start all of the processes.
185
     *
186
     * @param Process[] $processes
187
     *
188
     * @return void
189
     */
190 5
    protected function startProcesses(array $processes)
191
    {
192 5
        foreach ($processes as $process) {
193 5
            $process->start();
194
        }
195 5
    }
196
197
    /**
198
     * Determine if any of the processes are running.
199
     *
200
     * @param Process[] $processes
201
     */
202 5
    protected function areRunning(array $processes): bool
203
    {
204 5
        foreach ($processes as $process) {
205 5
            if ($process->isRunning()) {
206 5
                return true;
207
            }
208
        }
209
210 5
        return false;
211
    }
212
213
    /**
214
     * Gather the output from all of the processes.
215
     *
216
     * @param Process[] $processes
217
     *
218
     * @return void
219
     */
220 5
    protected function gatherOutput(array $processes, callable $callback)
221
    {
222 5
        foreach ($processes as $host => $process) {
223 5
            $output = $process->getIncrementalOutput();
224 5
            if (strlen($output) !== 0) {
225 5
                $callback(Process::OUT, $host, $output);
226
            }
227
228 5
            $errorOutput = $process->getIncrementalErrorOutput();
229 5
            if (strlen($errorOutput) !== 0) {
230 5
                $callback(Process::ERR, $host, $errorOutput);
231
            }
232
        }
233 5
    }
234
235
    /**
236
     * Gather the cumulative exit code for the processes.
237
     *
238
     * @param Process[] $processes
239
     */
240 5
    protected function gatherExitCodes(array $processes): int
241
    {
242 5
        foreach ($processes as $process) {
243 5
            if ($process->getExitCode() > 0) {
244 5
                return $process->getExitCode();
245
            }
246
        }
247
248 5
        return 0;
249
    }
250
251
    /**
252
     * Generate options and arguments string.
253
     */
254 5
    private function generateOptions(): string
255
    {
256
        /** @var string[] $inputs */
257
        $inputs = [
258 5
            (string) (new VerbosityString($this->output)),
259
        ];
260
261 5
        $userDefinition = $this->console->getUserDefinition();
262
        // Get user arguments
263 5
        foreach ($userDefinition->getArguments() as $argument) {
264
            $inputs[] = Argument::toString($this->input, $argument);
265
        }
266
267
        // Get user options
268 5
        foreach ($userDefinition->getOptions() as $option) {
269 4
            $inputs[] = Option::toString($this->input, $option);
270
        }
271
272 5
        return implode(' ', $inputs);
273
    }
274
275 5
    private function generateArguments(): string
276
    {
277 5
        $arguments = '';
278
279 5
        if ($this->input->hasArgument('stage')) {
280
            // Some people rely on stage argument, so pass it to worker too.
281 5
            $arguments .= $this->input->getArgument('stage');
282
        }
283
284 5
        return $arguments;
285
    }
286
}
287