GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — master ( 44f5f8...8d9d8e )
by Anton
02:15
created

src/Executor/Master.php (1 issue)

Severity

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php declare(strict_types=1);
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Component\Ssh\Client;
11
use Deployer\Configuration\Configuration;
12
use Deployer\Deployer;
13
use Deployer\Host\Host;
14
use Deployer\Host\Localhost;
15
use Deployer\Selector\Selector;
16
use Deployer\Task\Task;
17
use Psr\Http\Message\ServerRequestInterface;
18
use Symfony\Component\Console\Input\InputInterface;
19
use Symfony\Component\Console\Output\OutputInterface;
20
use Symfony\Component\Process\Process;
21
use React;
22
23 1
const FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
24
25
function spinner($message = '')
26
{
27 1
    $frame = FRAMES[(int)(microtime(true) * 10) % count(FRAMES)];
28 1
    return "  $frame $message\r";
29
}
30
31
class Master
32
{
33
    private $input;
34
    private $output;
35
    private $messenger;
36
    private $client;
37
    private $config;
38
    private $port;
39
    /**
40
     * @var React\EventLoop\LoopInterface
41
     */
42
    private $loop;
43
44 10
    public function __construct(
45
        InputInterface $input,
46
        OutputInterface $output,
47
        Messenger $messenger,
48
        Client $client,
49
        Configuration $config
50
    )
51
    {
52 10
        $this->input = $input;
53 10
        $this->output = $output;
54 10
        $this->messenger = $messenger;
55 10
        $this->client = $client;
56 10
        $this->config = $config;
57 10
    }
58
59
    /**
60
     * @param Task[] $tasks
61
     * @param Host[] $hosts
62
     * @param Planner|null $plan
63
     * @return int
64
     */
65 10
    public function run(array $tasks, array $hosts, $plan = null): int
66
    {
67 10
        $plan || $this->connect($hosts);
68
69 10
        $globalLimit = (int)$this->input->getOption('limit') ?: count($hosts);
70
71 10
        foreach ($tasks as $task) {
72 10
            $plan || $this->messenger->startTask($task);
73
74 10
            $plannedHosts = $hosts;
75
76 10
            $limit = min($globalLimit, $task->getLimit() ?? $globalLimit);
77
78 10
            if ($task->isOnce()) {
79 4
                $plannedHosts = [];
80 4
                foreach ($hosts as $currentHost) {
81 4
                    if (Selector::apply($task->getSelector(), $currentHost)) {
82 4
                        $plannedHosts[] = $currentHost;
83 4
                        break;
84
                    }
85
                }
86
            }
87
88 10
            if ($task->isLocal()) {
89
                $plannedHosts = [new Localhost('localhost')];
90
            }
91
92 10
            if ($limit === 1 || count($plannedHosts) === 1) {
93 10
                foreach ($plannedHosts as $currentHost) {
94 10
                    if (!Selector::apply($task->getSelector(), $currentHost)) {
95
                        if ($plan) {
96
                            $plan->commit([], $task);
97
                        }
98
                        continue;
99
                    }
100
101 10
                    if ($plan) {
102
                        $plan->commit([$currentHost], $task);
103
                        continue;
104
                    }
105
106 10
                    $exitCode = $this->runTask($task, [$currentHost]);
107 10
                    if ($exitCode !== 0) {
108 2
                        return $exitCode;
109
                    }
110
                }
111
            } else {
112 2
                foreach (array_chunk($hosts, $limit) as $chunk) {
113 2
                    $selector = $task->getSelector();
114 2
                    $selectedHosts = [];
115 2
                    foreach ($chunk as $currentHost) {
116 2
                        if ($selector === null || Selector::apply($selector, $currentHost)) {
117 2
                            $selectedHosts[] = $currentHost;
118
                        }
119
                    }
120
121
122 2
                    if ($plan) {
123
                        $plan->commit($selectedHosts, $task);
124
                        continue;
125
                    }
126
127 2
                    $exitCode = $this->runTask($task, $selectedHosts);
128 2
                    if ($exitCode !== 0) {
129
                        return $exitCode;
130
                    }
131
                }
132
            }
133
134 10
            if (!$plan) {
135 10
                $this->messenger->endTask($task);
136
            }
137
        }
138
139 10
        return 0;
140
    }
141
142
    /**
143
     * @param Host[] $hosts
144
     */
145 10
    private function connect(array $hosts)
146
    {
147 View Code Duplication
        $callback = function (string $output) {
148
            $output = preg_replace('/\n$/', '', $output);
149
            if (strlen($output) !== 0) {
150
                $this->output->writeln($output);
151
            }
152 10
        };
153
154
        // Connect to each host sequentially, to prevent getting locked.
155 10
        foreach ($hosts as $host) {
156 10
            if ($host instanceof Localhost) {
157 10
                continue;
158
            }
159
            $process = $this->getProcess($host, new Task('connect'));
160
            $process->start();
161
162
            while ($process->isRunning()) {
163
                $this->gatherOutput([$process], $callback);
164
                $this->output->write(spinner(str_pad("connect {$host->getTag()}", intval(getenv('COLUMNS')) - 1)));
165
                usleep(1000);
166
            }
167
        }
168
169
        // Clear spinner.
170 10
        $this->output->write(str_repeat(' ', intval(getenv('COLUMNS')) - 1) . "\r");
171 10
    }
172
173
    /**
174
     * @param Task $task
175
     * @param Host[] $hosts
176
     * @return int
177
     */
178 10
    private function runTask(Task $task, array $hosts): int
179
    {
180 10
        if (getenv('DEPLOYER_LOCAL_WORKER') === 'true') {
181
            // This allows to code coverage all recipe,
182
            // as well as speedup tests by not spawning
183
            // lots of processes. Also there is a few tests
184
            // what runs with workers for tests subprocess
185
            // communications.
186 9
            foreach ($hosts as $host) {
187 9
                $worker = new Worker(Deployer::get());
188 9
                $exitCode = $worker->execute($task, $host);
189 9
                if ($exitCode !== 0) {
190 2
                    return $exitCode;
191
                }
192
            }
193 9
            return 0;
194
        }
195
196 1
        $processes = [];
197 1
        foreach ($hosts as $host) {
198 1
            $processes[] = $this->getProcess($host, $task);
199
        }
200
201 1
        foreach ($processes as $process) {
202 1
            $process->start();
203
        }
204
205 1
        $this->createServer();
206
207 View Code Duplication
        $callback = function (string $output) {
208 1
            $output = preg_replace('/\n$/', '', $output);
209 1
            if (strlen($output) !== 0) {
210 1
                $this->output->writeln($output);
211
            }
212 1
        };
213
214
        $this->loop->addPeriodicTimer(0.03, function () use ($processes, $callback) {
215 1
            $this->gatherOutput($processes, $callback);
216 1
            $this->output->write(spinner());
217 1
            if (!$this->areRunning($processes)) {
218 1
                $this->loop->stop();
219
            }
220 1
        });
221
222 1
        $this->loop->run();
223 1
        $this->output->write("    \r"); // clear spinner
224 1
        $this->gatherOutput($processes, $callback);
225 1
        return $this->cumulativeExitCode($processes);
226
    }
227
228 1
    protected function createServer()
229
    {
230 1
        $this->loop = React\EventLoop\Factory::create();
231
        $server = new React\Http\Server($this->loop, function (ServerRequestInterface $request) {
0 ignored issues
show
The parameter $request is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
232
            return new React\Http\Message\Response(
233
                200,
234
                array(
235
                    'Content-Type' => 'text/plain'
236
                ),
237
                "Hello World!\n"
238
            );
239 1
        });
240 1
        $socket = new React\Socket\Server(0, $this->loop);
241 1
        $server->listen($socket);
242 1
        $address = $socket->getAddress();
243 1
        $this->port = parse_url($address, PHP_URL_PORT);
244 1
    }
245
246 1
    protected function getProcess(Host $host, Task $task): Process
247
    {
248 1
        $dep = PHP_BINARY . ' ' . DEPLOYER_BIN;
249 1
        $configDirectory = $host->get('config_directory');
250 1
        $decorated = $this->output->isDecorated() ? '--decorated' : '';
251 1
        $command = "$dep worker $task {$host->getAlias()} $configDirectory {$this->input} $decorated";
252
253 1
        if ($this->output->isDebug()) {
254
            $this->output->writeln("[{$host->getTag()}] $command");
255
        }
256
257 1
        return Process::fromShellCommandline($command);
258
    }
259
260
    /**
261
     * @param Process[] $processes
262
     * @return bool
263
     */
264 1
    protected function areRunning(array $processes): bool
265
    {
266 1
        foreach ($processes as $process) {
267 1
            if ($process->isRunning()) {
268 1
                return true;
269
            }
270
        }
271 1
        return false;
272
    }
273
274
    /**
275
     * @param Process[] $processes
276
     * @param callable $callback
277
     */
278 1
    protected function gatherOutput(array $processes, callable $callback)
279
    {
280 1
        foreach ($processes as $process) {
281 1
            $output = $process->getIncrementalOutput();
282 1
            if (strlen($output) !== 0) {
283 1
                $callback($output);
284
            }
285
286 1
            $errorOutput = $process->getIncrementalErrorOutput();
287 1
            if (strlen($errorOutput) !== 0) {
288
                $callback($errorOutput);
289
            }
290
        }
291 1
    }
292
293
    /**
294
     * @param Process[] $processes
295
     * @return int
296
     */
297 1
    protected function cumulativeExitCode(array $processes): int
298
    {
299 1
        foreach ($processes as $process) {
300 1
            if ($process->getExitCode() > 0) {
301
                return $process->getExitCode();
302
            }
303
        }
304 1
        return 0;
305
    }
306
}
307