ParallelExecutor::startProcesses()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
nc 2
nop 1
dl 0
loc 6
ccs 4
cts 4
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Application;
11
use Deployer\Console\Input\Argument;
12
use Deployer\Console\Input\Option;
13
use Deployer\Console\Output\Informer;
14
use Deployer\Console\Output\VerbosityString;
15
use Deployer\Exception\Exception;
16
use Deployer\Exception\GracefulShutdownException;
17
use Deployer\Host\Host;
18
use Deployer\Host\Localhost;
19
use Deployer\Host\Storage;
20
use Deployer\Task\Context;
21
use Deployer\Task\Task;
22
use Symfony\Component\Console\Input\InputInterface;
23
use Symfony\Component\Console\Output\OutputInterface;
24
use Symfony\Component\Process\Process;
25
26
class ParallelExecutor implements ExecutorInterface
27
{
28
    /**
29
     * @var InputInterface
30
     */
31
    private $input;
32
33
    /**
34
     * @var OutputInterface
35
     */
36
    private $output;
37
38
    /**
39
     * @var Informer
40
     */
41
    private $informer;
42
43
    /**
44
     * @var Application
45
     */
46
    private $console;
47
48 5
    public function __construct(
49
        InputInterface $input,
50
        OutputInterface $output,
51
        Informer $informer,
52
        Application $console
53
    ) {
54 5
        $this->input = $input;
55 5
        $this->output = $output;
56 5
        $this->informer = $informer;
57 5
        $this->console = $console;
58 5
    }
59
60
    /**
61
     * {@inheritdoc}
62
     */
63 5
    public function run(array $tasks, array $hosts)
64
    {
65 5
        $localhost = new Localhost();
66 5
        $limit = (int)$this->input->getOption('limit') ?: count($hosts);
67
68
        // We need contexts here for usage inside `on` function. Pass input/output to callback of it.
69
        // This allows to use code like this in parallel mode:
70
        //
71
        //     host('prod')
72
        //         ->set('branch', function () {
73
        //             return input()->getOption('branch') ?: 'production';
74
        //     })
75
        //
76
        // Otherwise `input()` wont be accessible (i.e. null)
77
        //
78 5
        Context::push(new Context($localhost, $this->input, $this->output));
79
        {
80 5
            Storage::persist($hosts);
81
        }
82 5
        Context::pop();
83
84 5
        foreach ($tasks as $task) {
85 5
            $success = true;
86 5
            $this->informer->startTask($task);
87
88 5
            if ($task->isLocal()) {
89 2
                Storage::load($hosts);
90
                {
91 2
                    $task->run(new Context($localhost, $this->input, $this->output));
92
                }
93 2
                Storage::flush($hosts);
94
            } else {
95 5
                foreach (array_chunk($hosts, $limit) as $chunk) {
96 5
                    $exitCode = $this->runTask($chunk, $task);
97
98
                    switch ($exitCode) {
99 5
                        case 1:
100
                            throw new GracefulShutdownException();
101 5
                        case 2:
102
                            $success = false;
103
                            break;
104 5
                        case 255:
105 5
                            throw new Exception();
106
                    }
107
                }
108
            }
109
110 5
            if ($success) {
111 5
                $this->informer->endTask($task);
112
            } else {
113 5
                $this->informer->taskError();
114
            }
115
        }
116 5
    }
117
118
    /**
119
     * Run task on hosts.
120
     *
121
     * @param Host[] $hosts
122
     * @return int
123
     */
124 5
    private function runTask(array $hosts, Task $task): int
125
    {
126 5
        $processes = [];
127
128 5
        foreach ($hosts as $host) {
129 5
            if ($task->shouldBePerformed($host)) {
130 5
                $processes[$host->getHostname()] = $this->getProcess($host, $task);
131 5
                if ($task->isOnce()) {
132 5
                    $task->setHasRun();
133
                }
134
            }
135
        }
136
137 5
        $callback = function (string $type, string $host, string $output) {
138 5
            $output = rtrim($output);
139 5
            if (strlen($output) !== 0) {
140 5
                $this->output->writeln($output);
141
            }
142 5
        };
143
144 5
        $this->startProcesses($processes);
145
146 5
        while ($this->areRunning($processes)) {
147 5
            $this->gatherOutput($processes, $callback);
148 5
            usleep(1000);
149
        }
150 5
        $this->gatherOutput($processes, $callback);
151
152 5
        return $this->gatherExitCodes($processes);
153
    }
154
155
    /**
156
     * Get process for task on host.
157
     */
158 5
    protected function getProcess(Host $host, Task $task): Process
159
    {
160 5
        $dep = PHP_BINARY . ' ' . DEPLOYER_BIN;
161 5
        $options = $this->generateOptions();
162 5
        $arguments = $this->generateArguments();
163 5
        $hostname = $host->getHostname();
164 5
        $taskName = $task->getName();
165 5
        $configFile = $host->get('host_config_storage');
166 5
        $value = $this->input->getOption('file');
167 5
        $file = $value ? "--file='$value'" : '';
168
169 5
        if ($this->output->isDecorated()) {
170
            $options .= ' --ansi';
171
        }
172
173 5
        $command = "$dep $file worker $arguments $options --hostname $hostname --task $taskName --config-file $configFile";
174 5
        $process = new Process($command);
175
176 5
        if (!defined('DEPLOYER_PARALLEL_PTY')) {
177
            $process->setPty(true);
178
        }
179
180 5
        return $process;
181
    }
182
183
    /**
184
     * Start all of the processes.
185
     *
186
     * @param Process[] $processes
187
     * @return void
188
     */
189 5
    protected function startProcesses(array $processes)
190
    {
191 5
        foreach ($processes as $process) {
192 5
            $process->start();
193
        }
194 5
    }
195
196
    /**
197
     * Determine if any of the processes are running.
198
     *
199
     * @param Process[] $processes
200
     * @return bool
201
     */
202 5
    protected function areRunning(array $processes): bool
203
    {
204 5
        foreach ($processes as $process) {
205 5
            if ($process->isRunning()) {
206 5
                return true;
207
            }
208
        }
209
210 5
        return false;
211
    }
212
213
    /**
214
     * Gather the output from all of the processes.
215
     *
216
     * @param Process[] $processes
217
     * @param callable $callback
218
     * @return void
219
     */
220 5
    protected function gatherOutput(array $processes, callable $callback)
221
    {
222 5
        foreach ($processes as $host => $process) {
223 5
            $output = $process->getIncrementalOutput();
224 5
            if (strlen($output) !== 0) {
225 5
                $callback(Process::OUT, $host, $output);
226
            }
227
228 5
            $errorOutput = $process->getIncrementalErrorOutput();
229 5
            if (strlen($errorOutput) !== 0) {
230 5
                $callback(Process::ERR, $host, $errorOutput);
231
            }
232
        }
233 5
    }
234
235
    /**
236
     * Gather the cumulative exit code for the processes.
237
     */
238 5
    protected function gatherExitCodes(array $processes): int
239
    {
240 5
        foreach ($processes as $process) {
241 5
            if ($process->getExitCode() > 0) {
242 5
                return $process->getExitCode();
243
            }
244
        }
245
246 5
        return 0;
247
    }
248
249
    /**
250
     * Generate options and arguments string.
251
     */
252 5
    private function generateOptions(): string
253
    {
254
        /** @var string[] $inputs */
255
        $inputs = [
256 5
            (string)(new VerbosityString($this->output)),
257
        ];
258
259 5
        $userDefinition = $this->console->getUserDefinition();
260
        // Get user arguments
261 5
        foreach ($userDefinition->getArguments() as $argument) {
262
            $inputs[] = Argument::toString($this->input, $argument);
263
        }
264
265
        // Get user options
266 5
        foreach ($userDefinition->getOptions() as $option) {
267 4
            $inputs[] = Option::toString($this->input, $option);
268
        }
269
270 5
        return implode(' ', array_filter($inputs, static function (string $item): bool {
271 5
            return $item !== '';
272 5
        }));
273
    }
274
275 5
    private function generateArguments(): string
276
    {
277 5
        $arguments = '';
278
279 5
        if ($this->input->hasArgument('stage')) {
280
            // Some people rely on stage argument, so pass it to worker too.
281 5
            $arguments .= $this->input->getArgument('stage');
282
        }
283
284 5
        return $arguments;
285
    }
286
}
287