ParallelExecutor   A
last analyzed

Complexity

Total Complexity 37

Size/Duplication

Total Lines 262
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 16

Test Coverage

Coverage 94.23%

Importance

Changes 0
Metric Value
dl 0
loc 262
ccs 98
cts 104
cp 0.9423
rs 9.44
c 0
b 0
f 0
wmc 37
lcom 1
cbo 16

10 Methods

Rating   Name   Duplication   Size   Complexity  
B run() 0 54 9
A __construct() 0 11 1
B runTask() 0 30 6
A getProcess() 0 24 4
A startProcesses() 0 6 2
A areRunning() 0 10 3
A gatherOutput() 0 14 4
A gatherExitCodes() 0 10 3
A generateOptions() 0 22 3
A generateArguments() 0 11 2
1
<?php declare(strict_types=1);
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Application;
11
use Deployer\Console\Input\Argument;
12
use Deployer\Console\Input\Option;
13
use Deployer\Console\Output\Informer;
14
use Deployer\Console\Output\VerbosityString;
15
use Deployer\Exception\Exception;
16
use Deployer\Exception\GracefulShutdownException;
17
use Deployer\Host\Host;
18
use Deployer\Host\Localhost;
19
use Deployer\Host\Storage;
20
use Deployer\Task\Context;
21
use Deployer\Task\Task;
22
use Symfony\Component\Console\Input\InputInterface;
23
use Symfony\Component\Console\Output\OutputInterface;
24
use Symfony\Component\Process\Process;
25
26
class ParallelExecutor implements ExecutorInterface
27
{
28
    /**
29
     * @var InputInterface
30
     */
31
    private $input;
32
33
    /**
34
     * @var OutputInterface
35
     */
36
    private $output;
37
38
    /**
39
     * @var Informer
40
     */
41
    private $informer;
42
43
    /**
44
     * @var Application
45
     */
46
    private $console;
47
48 5
    public function __construct(
49
        InputInterface $input,
50
        OutputInterface $output,
51
        Informer $informer,
52
        Application $console
53
    ) {
54 5
        $this->input = $input;
55 5
        $this->output = $output;
56 5
        $this->informer = $informer;
57 5
        $this->console = $console;
58 5
    }
59
60
    /**
61
     * {@inheritdoc}
62
     */
63 5
    public function run(array $tasks, array $hosts)
64
    {
65 5
        $localhost = new Localhost();
66 5
        $limit = (int) $this->input->getOption('limit') ?: count($hosts);
67
68
        // We need contexts here for usage inside `on` function. Pass input/output to callback of it.
69
        // This allows to use code like this in parallel mode:
70
        //
71
        //     host('prod')
72
        //         ->set('branch', function () {
73
        //             return input()->getOption('branch') ?: 'production';
74
        //     })
75
        //
76
        // Otherwise `input()` wont be accessible (i.e. null)
77
        //
78 5
        Context::push(new Context($localhost, $this->input, $this->output));
79
        {
80 5
            Storage::persist($hosts);
81
        }
82 5
        Context::pop();
83
84 5
        foreach ($tasks as $task) {
85 5
            $success = true;
86 5
            $this->informer->startTask($task);
87
88 5
            if ($task->isLocal()) {
89 2
                Storage::load($hosts);
90
                {
91 2
                    $task->run(new Context($localhost, $this->input, $this->output));
92
                }
93 2
                Storage::flush($hosts);
94
            } else {
95 5
                foreach (array_chunk($hosts, $limit) as $chunk) {
96 5
                    $exitCode = $this->runTask($chunk, $task);
97
98
                    switch ($exitCode) {
99 5
                        case 1:
100
                            throw new GracefulShutdownException();
101 5
                        case 2:
102
                            $success = false;
103
                            break;
104 5
                        case 255:
105 5
                            throw new Exception();
106
                    }
107
                }
108
            }
109
110 5
            if ($success) {
111 5
                $this->informer->endTask($task);
112
            } else {
113 5
                $this->informer->taskError();
114
            }
115
        }
116 5
    }
117
118
    /**
119
     * Run task on hosts.
120
     *
121
     * @param Host[] $hosts
122
     */
123 5
    private function runTask(array $hosts, Task $task): int
124
    {
125 5
        $processes = [];
126
127 5
        foreach ($hosts as $host) {
128 5
            if ($task->shouldBePerformed($host)) {
129 5
                $processes[$host->getHostname()] = $this->getProcess($host, $task);
130 5
                if ($task->isOnce()) {
131 5
                    $task->setHasRun();
132
                }
133
            }
134
        }
135
136 5
        $callback = function (string $type, string $host, string $output) {
137 5
            $output = rtrim($output);
138 5
            if (strlen($output) !== 0) {
139 5
                $this->output->writeln($output);
140
            }
141 5
        };
142
143 5
        $this->startProcesses($processes);
144
145 5
        while ($this->areRunning($processes)) {
146 5
            $this->gatherOutput($processes, $callback);
147 5
            usleep(1000);
148
        }
149 5
        $this->gatherOutput($processes, $callback);
150
151 5
        return $this->gatherExitCodes($processes);
152
    }
153
154
    /**
155
     * Get process for task on host.
156
     */
157 5
    protected function getProcess(Host $host, Task $task): Process
158
    {
159 5
        $dep = PHP_BINARY.' '.DEPLOYER_BIN;
160 5
        $options = $this->generateOptions();
161 5
        $arguments = $this->generateArguments();
162 5
        $hostname = $host->getHostname();
163 5
        $taskName = $task->getName();
164 5
        $configFile = $host->get('host_config_storage');
165 5
        $value = $this->input->getOption('file');
166 5
        $file = $value ? "--file='$value'" : '';
167
168 5
        if ($this->output->isDecorated()) {
169
            $options .= ' --ansi';
170
        }
171
172 5
        $command = "$dep $file worker $arguments $options --hostname $hostname --task $taskName --config-file $configFile";
173 5
        $process = new Process($command);
174
175 5
        if (!defined('DEPLOYER_PARALLEL_PTY')) {
176
            $process->setPty(true);
177
        }
178
179 5
        return $process;
180
    }
181
182
    /**
183
     * Start all of the processes.
184
     *
185
     * @param Process[] $processes
186
     *
187
     * @return void
188
     */
189 5
    protected function startProcesses(array $processes)
190
    {
191 5
        foreach ($processes as $process) {
192 5
            $process->start();
193
        }
194 5
    }
195
196
    /**
197
     * Determine if any of the processes are running.
198
     *
199
     * @param Process[] $processes
200
     */
201 5
    protected function areRunning(array $processes): bool
202
    {
203 5
        foreach ($processes as $process) {
204 5
            if ($process->isRunning()) {
205 5
                return true;
206
            }
207
        }
208
209 5
        return false;
210
    }
211
212
    /**
213
     * Gather the output from all of the processes.
214
     *
215
     * @param Process[] $processes
216
     *
217
     * @return void
218
     */
219 5
    protected function gatherOutput(array $processes, callable $callback)
220
    {
221 5
        foreach ($processes as $host => $process) {
222 5
            $output = $process->getIncrementalOutput();
223 5
            if (strlen($output) !== 0) {
224 5
                $callback(Process::OUT, $host, $output);
225
            }
226
227 5
            $errorOutput = $process->getIncrementalErrorOutput();
228 5
            if (strlen($errorOutput) !== 0) {
229 5
                $callback(Process::ERR, $host, $errorOutput);
230
            }
231
        }
232 5
    }
233
234
    /**
235
     * Gather the cumulative exit code for the processes.
236
     *
237
     * @param Process[] $processes
238
     */
239 5
    protected function gatherExitCodes(array $processes): int
240
    {
241 5
        foreach ($processes as $process) {
242 5
            if ($process->getExitCode() > 0) {
243 5
                return $process->getExitCode();
244
            }
245
        }
246
247 5
        return 0;
248
    }
249
250
    /**
251
     * Generate options and arguments string.
252
     */
253 5
    private function generateOptions(): string
254
    {
255
        /** @var string[] $inputs */
256
        $inputs = [
257 5
            (string) (new VerbosityString($this->output)),
258
        ];
259
260 5
        $userDefinition = $this->console->getUserDefinition();
261
        // Get user arguments
262 5
        foreach ($userDefinition->getArguments() as $argument) {
263
            $inputs[] = Argument::toString($this->input, $argument);
264
        }
265
266
        // Get user options
267 5
        foreach ($userDefinition->getOptions() as $option) {
268 4
            $inputs[] = Option::toString($this->input, $option);
269
        }
270
271 5
        return implode(' ', array_filter($inputs, static function (string $item): bool {
272 5
            return $item !== '';
273 5
        }));
274
    }
275
276 5
    private function generateArguments(): string
277
    {
278 5
        $arguments = '';
279
280 5
        if ($this->input->hasArgument('stage')) {
281
            // Some people rely on stage argument, so pass it to worker too.
282 5
            $arguments .= $this->input->getArgument('stage');
283
        }
284
285 5
        return $arguments;
286
    }
287
}
288