GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( d89857...62c949 )
by Anton
08:07 queued 05:04
created

ParallelExecutor::gatherExitCodes()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
nc 3
nop 1
dl 0
loc 10
ccs 6
cts 6
cp 1
crap 3
rs 9.9332
c 0
b 0
f 0
1
<?php
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Application;
11
use Deployer\Console\Output\Informer;
12
use Deployer\Console\Output\VerbosityString;
13
use Deployer\Exception\Exception;
14
use Deployer\Exception\GracefulShutdownException;
15
use Deployer\Host\Host;
16
use Deployer\Host\Localhost;
17
use Deployer\Host\Storage;
18
use Deployer\Task\Context;
19
use Deployer\Task\Task;
20
use Symfony\Component\Console\Input\InputInterface;
21
use Symfony\Component\Console\Output\OutputInterface;
22
use Symfony\Component\Process\Process;
23
24
class ParallelExecutor implements ExecutorInterface
25
{
26
    /**
27
     * @var InputInterface
28
     */
29
    private $input;
30
31
    /**
32
     * @var OutputInterface
33
     */
34
    private $output;
35
36
    /**
37
     * @var Informer
38
     */
39
    private $informer;
40
41
    /**
42
     * @var Application
43
     */
44
    private $console;
45
46
    /**
47
     * @param InputInterface $input
48
     * @param OutputInterface $output
49
     * @param Informer $informer
50
     * @param Application $console
51
     */
52 4
    public function __construct(InputInterface $input, OutputInterface $output, Informer $informer, Application $console)
53
    {
54 4
        $this->input = $input;
55 4
        $this->output = $output;
56 4
        $this->informer = $informer;
57 4
        $this->console = $console;
58 4
    }
59
60
    /**
61
     * {@inheritdoc}
62
     */
63 4
    public function run($tasks, $hosts)
64
    {
65 4
        $localhost = new Localhost();
66 4
        $limit = (int)$this->input->getOption('limit') ?: count($hosts);
67
68
        // We need contexts here for usage inside `on` function. Pass input/output to callback of it.
69
        // This allows to use code like this in parallel mode:
70
        //
71
        //     host('prod')
72
        //         ->set('branch', function () {
73
        //             return input()->getOption('branch') ?: 'production';
74
        //     })
75
        //
76
        // Otherwise `input()` wont be accessible (i.e. null)
77
        //
78 4
        Context::push(new Context($localhost, $this->input, $this->output));
79
        {
80 4
            Storage::persist($hosts);
81
        }
82 4
        Context::pop();
83
84 4
        foreach ($tasks as $task) {
85 4
            $success = true;
86 4
            $this->informer->startTask($task);
87
88 4
            if ($task->isLocal()) {
89 2
                Storage::load($hosts);
90
                {
91 2
                    $task->run(new Context($localhost, $this->input, $this->output));
92
                }
93 2
                Storage::flush($hosts);
94
            } else {
95 4
                foreach (array_chunk($hosts, $limit) as $chunk) {
96 4
                    $exitCode = $this->runTask($chunk, $task);
97
98
                    switch ($exitCode) {
99 4
                        case 1:
100
                            throw new GracefulShutdownException();
101 4
                        case 2:
102
                            $success = false;
103
                            break;
104 4
                        case 255:
105 4
                            throw new Exception();
106
                    }
107
                }
108
            }
109
110 4
            if ($success) {
111 4
                $this->informer->endTask($task);
112
            } else {
113 4
                $this->informer->taskError();
114
            }
115
        }
116 4
    }
117
118
    /**
119
     * Run task on hosts.
120
     *
121
     * @param Host[] $hosts
122
     * @param Task $task
123
     * @return int
124
     */
125 4
    private function runTask(array $hosts, Task $task)
126
    {
127 4
        $processes = [];
128
129 4
        foreach ($hosts as $host) {
130 4
            if ($task->shouldBePerformed($host)) {
131 4
                $processes[$host->getHostname()] = $this->getProcess($host, $task);
132 4
                if ($task->isOnce()) {
133 4
                    break;
134
                }
135
            }
136
        }
137
138 4
        $callback = function ($type, $host, $output) {
139 4
            $output = rtrim($output);
140 4
            if (!empty($output)) {
141 4
                $this->output->writeln($output);
142
            }
143 4
        };
144
145 4
        $this->startProcesses($processes);
146
147 4
        while ($this->areRunning($processes)) {
148 4
            $this->gatherOutput($processes, $callback);
149 4
            usleep(1000);
150
        }
151 4
        $this->gatherOutput($processes, $callback);
152
153 4
        return $this->gatherExitCodes($processes);
154
    }
155
156
    /**
157
     * Get process for task on host.
158
     *
159
     * @param Host $host
160
     * @param Task $task
161
     * @return Process
162
     */
163 4
    protected function getProcess($host, Task $task)
164
    {
165 4
        $dep = PHP_BINARY . ' ' . DEPLOYER_BIN;
166 4
        $options = $this->generateOptions();
167 4
        $arguments = $this->generateArguments();
168 4
        $hostname = $host->getHostname();
169 4
        $taskName = $task->getName();
170 4
        $configFile = $host->get('host_config_storage');
171 4
        $value = $this->input->getOption('file');
172 4
        $file = $value ? "--file='$value'" : '';
173
174 4
        if ($this->output->isDecorated()) {
175
            $options .= ' --ansi';
176
        }
177
178 4
        $command = "$dep $file worker $arguments $options --hostname $hostname --task $taskName --config-file $configFile";
179 4
        $process = new Process($command);
180
181 4
        if (!defined('DEPLOYER_PARALLEL_PTY')) {
182
            $process->setPty(true);
183
        }
184
185 4
        return $process;
186
    }
187
188
    /**
189
     * Start all of the processes.
190
     *
191
     * @param Process[] $processes
192
     * @return void
193
     */
194 4
    protected function startProcesses(array $processes)
195
    {
196 4
        foreach ($processes as $process) {
197 4
            $process->start();
198
        }
199 4
    }
200
201
    /**
202
     * Determine if any of the processes are running.
203
     *
204
     * @param Process[] $processes
205
     * @return bool
206
     */
207 4
    protected function areRunning(array $processes)
208
    {
209 4
        foreach ($processes as $process) {
210 4
            if ($process->isRunning()) {
211 4
                return true;
212
            }
213
        }
214 4
        return false;
215
    }
216
217
    /**
218
     * Gather the output from all of the processes.
219
     *
220
     * @param Process[] $processes
221
     * @param callable $callback
222
     */
223 4
    protected function gatherOutput(array $processes, callable $callback)
224
    {
225 4
        foreach ($processes as $host => $process) {
226
            $methods = [
227 4
                Process::OUT => 'getIncrementalOutput',
228
                Process::ERR => 'getIncrementalErrorOutput',
229
            ];
230 4
            foreach ($methods as $type => $method) {
231 4
                $output = $process->{$method}();
232 4
                if (!empty($output)) {
233 4
                    $callback($type, $host, $output);
234
                }
235
            }
236
        }
237 4
    }
238
239
    /**
240
     * Gather the cumulative exit code for the processes.
241
     *
242
     * @param Process[] $processes
243
     * @return int
244
     */
245 4
    protected function gatherExitCodes(array $processes)
246
    {
247 4
        $code = 0;
248 4
        foreach ($processes as $process) {
249 4
            if ($process->getExitCode() > 0) {
250 4
                $code = $process->getExitCode();
251
            }
252
        }
253 4
        return $code;
254
    }
255
256
    /**
257
     * Generate options and arguments string.
258
     * @return string
259
     */
260 4
    private function generateOptions()
261
    {
262 4
        $verbosity = new VerbosityString($this->output);
263 4
        $input = $verbosity;
264
265
        // Get user arguments
266 4
        foreach ($this->console->getUserDefinition()->getArguments() as $argument) {
267
            $value = $this->input->getArgument($argument->getName());
268
            if ($value) {
269
                $input .= " $value";
270
            }
271
        }
272
273
        // Get user options
274 4
        foreach ($this->console->getUserDefinition()->getOptions() as $option) {
275 3
            $name = $option->getName();
276 3
            $value = $this->input->getOption($name);
277
278 3
            if ($value) {
279
                $input .= " --{$name}";
280
281
                if ($option->acceptValue()) {
282 3
                    $input .= " {$value}";
283
                }
284
            }
285
        }
286
287 4
        return $input;
288
    }
289
290 4
    private function generateArguments(): string
291
    {
292 4
        $arguments = '';
293
294 4
        if ($this->input->hasArgument('stage')) {
295
            // Some people rely on stage argument, so pass it to worker too.
296 4
            $arguments .= $this->input->getArgument('stage');
297
        }
298
299 4
        return $arguments;
300
    }
301
}
302