GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( 1dba41...0db4d1 )
by Anton
02:05
created

ParallelExecutor::runTask()   B

Complexity

Conditions 6
Paths 8

Size

Total Lines 30

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
nc 8
nop 2
dl 0
loc 30
ccs 18
cts 18
cp 1
crap 6
rs 8.8177
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Application;
11
use Deployer\Console\Input\Argument;
12
use Deployer\Console\Input\Option;
13
use Deployer\Console\Output\Informer;
14
use Deployer\Console\Output\VerbosityString;
15
use Deployer\Exception\Exception;
16
use Deployer\Exception\GracefulShutdownException;
17
use Deployer\Host\Host;
18
use Deployer\Host\Localhost;
19
use Deployer\Host\Storage;
20
use Deployer\Task\Context;
21
use Deployer\Task\Task;
22
use Symfony\Component\Console\Input\InputInterface;
23
use Symfony\Component\Console\Input\InputOption;
24
use Symfony\Component\Console\Output\OutputInterface;
25
use Symfony\Component\Process\Process;
26
27
class ParallelExecutor implements ExecutorInterface
28
{
29
    /**
30
     * @var InputInterface
31
     */
32
    private $input;
33
34
    /**
35
     * @var OutputInterface
36
     */
37
    private $output;
38
39
    /**
40
     * @var Informer
41
     */
42
    private $informer;
43
44
    /**
45
     * @var Application
46
     */
47
    private $console;
48
49 5
    public function __construct(
50
        InputInterface $input,
51
        OutputInterface $output,
52
        Informer $informer,
53
        Application $console
54
    ) {
55 5
        $this->input = $input;
56 5
        $this->output = $output;
57 5
        $this->informer = $informer;
58 5
        $this->console = $console;
59 5
    }
60
61
    /**
62
     * {@inheritdoc}
63
     */
64 5
    public function run(array $tasks, array $hosts)
65
    {
66 5
        $localhost = new Localhost();
67 5
        $limit = (int) $this->input->getOption('limit') ?: count($hosts);
68
69
        // We need contexts here for usage inside `on` function. Pass input/output to callback of it.
70
        // This allows to use code like this in parallel mode:
71
        //
72
        //     host('prod')
73
        //         ->set('branch', function () {
74
        //             return input()->getOption('branch') ?: 'production';
75
        //     })
76
        //
77
        // Otherwise `input()` wont be accessible (i.e. null)
78
        //
79 5
        Context::push(new Context($localhost, $this->input, $this->output));
80
        {
81 5
            Storage::persist($hosts);
82
        }
83 5
        Context::pop();
84
85 5
        foreach ($tasks as $task) {
86 5
            $success = true;
87 5
            $this->informer->startTask($task);
88
89 5
            if ($task->isLocal()) {
90 2
                Storage::load($hosts);
91
                {
92 2
                    $task->run(new Context($localhost, $this->input, $this->output));
93
                }
94 2
                Storage::flush($hosts);
95
            } else {
96 5
                foreach (array_chunk($hosts, $limit) as $chunk) {
97 5
                    $exitCode = $this->runTask($chunk, $task);
98
99
                    switch ($exitCode) {
100 5
                        case 1:
101
                            throw new GracefulShutdownException();
102 5
                        case 2:
103
                            $success = false;
104
                            break;
105 5
                        case 255:
106 5
                            throw new Exception();
107
                    }
108
                }
109
            }
110
111 5
            if ($success) {
112 5
                $this->informer->endTask($task);
113
            } else {
114 5
                $this->informer->taskError();
115
            }
116
        }
117 5
    }
118
119
    /**
120
     * Run task on hosts.
121
     *
122
     * @param Host[] $hosts
123
     */
124 5
    private function runTask(array $hosts, Task $task): int
125
    {
126 5
        $processes = [];
127
128 5
        foreach ($hosts as $host) {
129 5
            if ($task->shouldBePerformed($host)) {
130 5
                $processes[$host->getHostname()] = $this->getProcess($host, $task);
131 5
                if ($task->isOnce()) {
132 5
                    $task->setHasRun();
133
                }
134
            }
135
        }
136
137 5
        $callback = function (string $type, string $host, string $output) {
138 5
            $output = rtrim($output);
139 5
            if (strlen($output) !== 0) {
140 5
                $this->output->writeln($output);
141
            }
142 5
        };
143
144 5
        $this->startProcesses($processes);
145
146 5
        while ($this->areRunning($processes)) {
147 5
            $this->gatherOutput($processes, $callback);
148 5
            usleep(1000);
149
        }
150 5
        $this->gatherOutput($processes, $callback);
151
152 5
        return $this->gatherExitCodes($processes);
153
    }
154
155
    /**
156
     * Get process for task on host.
157
     */
158 5
    protected function getProcess(Host $host, Task $task): Process
159
    {
160 5
        $dep = PHP_BINARY.' '.DEPLOYER_BIN;
161 5
        $options = $this->generateOptions();
162 5
        $arguments = $this->generateArguments();
163 5
        $hostname = $host->getHostname();
164 5
        $taskName = $task->getName();
165 5
        $configFile = $host->get('host_config_storage');
166 5
        $value = $this->input->getOption('file');
167 5
        $file = $value ? "--file='$value'" : '';
168
169 5
        if ($this->output->isDecorated()) {
170
            $options .= ' --ansi';
171
        }
172
173 5
        $command = "$dep $file worker $arguments $options --hostname $hostname --task $taskName --config-file $configFile";
174 5
        $process = new Process($command);
175
176 5
        if (!defined('DEPLOYER_PARALLEL_PTY')) {
177
            $process->setPty(true);
178
        }
179
180 5
        return $process;
181
    }
182
183
    /**
184
     * Start all of the processes.
185
     *
186
     * @param Process[] $processes
187
     *
188
     * @return void
189
     */
190 5
    protected function startProcesses(array $processes)
191
    {
192 5
        foreach ($processes as $process) {
193 5
            $process->start();
194
        }
195 5
    }
196
197
    /**
198
     * Determine if any of the processes are running.
199
     *
200
     * @param Process[] $processes
201
     */
202 5
    protected function areRunning(array $processes): bool
203
    {
204 5
        foreach ($processes as $process) {
205 5
            if ($process->isRunning()) {
206 5
                return true;
207
            }
208
        }
209
210 5
        return false;
211
    }
212
213
    /**
214
     * Gather the output from all of the processes.
215
     *
216
     * @param Process[] $processes
217
     *
218
     * @return void
219
     */
220 5
    protected function gatherOutput(array $processes, callable $callback)
221
    {
222 5
        foreach ($processes as $host => $process) {
223 5
            $output = $process->getIncrementalOutput();
224 5
            if (strlen($output) !== 0) {
225 5
                $callback(Process::OUT, $host, $output);
226
            }
227
228 5
            $errorOutput = $process->getIncrementalErrorOutput();
229 5
            if (strlen($errorOutput) !== 0) {
230 5
                $callback(Process::ERR, $host, $errorOutput);
231
            }
232
        }
233 5
    }
234
235
    /**
236
     * Gather the cumulative exit code for the processes.
237
     *
238
     * @param Process[] $processes
239
     */
240 5
    protected function gatherExitCodes(array $processes): int
241
    {
242 5
        foreach ($processes as $process) {
243 5
            if ($process->getExitCode() > 0) {
244 5
                return $process->getExitCode();
245
            }
246
        }
247
248 5
        return 0;
249
    }
250
251
    /**
252
     * Generate options and arguments string.
253
     */
254 5
    private function generateOptions(): string
255
    {
256
        /** @var string[] $inputs */
257
        $inputs = [
258 5
            (string) (new VerbosityString($this->output)),
259
        ];
260
261 5
        $userDefinition = $this->console->getUserDefinition();
262
        // Get user arguments
263 5
        foreach ($userDefinition->getArguments() as $argument) {
264
            $inputs[] = Argument::toString($this->input, $argument);
265
        }
266
267
        // Get user options
268 5
        foreach ($userDefinition->getOptions() as $option) {
269 4
            $inputs[] = Option::toString($this->input, $option);
270
        }
271
272 5
        return implode(' ', $inputs);
273
    }
274
275 5
    private function generateArguments(): string
276
    {
277 5
        $arguments = '';
278
279 5
        if ($this->input->hasArgument('stage')) {
280
            // Some people rely on stage argument, so pass it to worker too.
281 5
            $arguments .= $this->input->getArgument('stage');
282
        }
283
284 5
        return $arguments;
285
    }
286
}
287