GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Pull Request — master (#1061)
by Maxim
04:23 queued 01:35
created

ParallelExecutor::startWorkers()   C

Complexity

Conditions 7
Paths 18

Size

Total Lines 44
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 26
CRAP Score 7.116

Importance

Changes 0
Metric Value
cc 7
eloc 24
nc 18
nop 0
dl 0
loc 44
ccs 26
cts 30
cp 0.8667
crap 7.116
rs 6.7272
c 0
b 0
f 0
1
<?php
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Output\OutputWatcher;
11
use Deployer\Console\Output\VerbosityString;
12
use Deployer\Exception\Exception;
13
use Deployer\Exception\ForwardException;
14
use Deployer\Server\Environment;
15
use Deployer\Server\Local;
16
use Deployer\Task\Context;
17
use Pure\Server;
18
use Pure\Storage\ArrayStorage;
19
use Pure\Storage\QueueStorage;
20
use React\Socket\ConnectionException;
21
use Symfony\Component\Console\Input\InputDefinition;
22
use Symfony\Component\Process\Process;
23
24
class ParallelExecutor implements ExecutorInterface
25
{
26
    /**
27
     * Try to start server on this port.
28
     */
29
    const START_PORT = 3333;
30
31
    /**
32
     * If fails on start port, try until stop port.
33
     */
34
    const STOP_PORT = 3340;
35
36
    /**
37
     * @var InputDefinition
38
     */
39
    private $userDefinition;
40
41
    /**
42
     * @var \Deployer\Task\Task[]
43
     */
44
    private $tasks;
45
46
    /**
47
     * @var \Deployer\Server\ServerInterface[]
48
     */
49
    private $servers;
50
51
    /**
52
     * @var \Deployer\Server\Environment[]
53
     */
54
    private $environments;
55
56
    /**
57
     * @var \Symfony\Component\Console\Input\InputInterface
58
     */
59
    private $input;
60
61
    /**
62
     * @var \Symfony\Component\Console\Output\OutputInterface
63
     */
64
    private $output;
65
66
    /**
67
     * @var Informer
68
     */
69
    private $informer;
70
71
    /**
72
     * @var int
73
     */
74
    private $port;
75
76
    /**
77
     * @var Server
78
     */
79
    private $pure;
80
81
    /**
82
     * @var \React\EventLoop\LoopInterface
83
     */
84
    private $loop;
85
86
    /**
87
     * Wait until all workers finish they tasks. When set this variable to true and send new tasks to workers.
88
     *
89
     * @var bool
90
     */
91
    private $wait = false;
92
93
    /**
94
     * @var QueueStorage
95
     */
96
    private $outputStorage;
97
98
    /**
99
     * @var QueueStorage
100
     */
101
    private $exceptionStorage;
102
103
    /**
104
     * Array will contain tasks list what workers has to before moving to next task.
105
     *
106
     * @var array
107
     */
108
    private $tasksToDo = [];
109
110
    /**
111
     * Check if current task was successfully finished on all server (no exception was triggered).
112
     *
113
     * @var bool
114
     */
115
    private $isSuccessfullyFinished = true;
116
117
    /**
118
     * Check if current task triggered a non-fatal exception.
119
     *
120
     * @var bool
121
     */
122
    private $hasNonFatalException = false;
123
124
    /**
125
     * @var Exception
126
     */
127
    private $lastException;
128
129
    /**
130
     * @var Local
131
     */
132
    private $localhost;
133
134
    /**
135
     * @var Environment
136
     */
137
    private $localEnv;
138
139
    /**
140
     * @param InputDefinition $userDefinition
141
     */
142 2
    public function __construct(InputDefinition $userDefinition)
143
    {
144 2
        $this->userDefinition = $userDefinition;
145 2
    }
146
147
    /**
148
     * {@inheritdoc}
149
     */
150 2
    public function run($tasks, $servers, $environments, $input, $output)
151
    {
152 2
        $this->tasks = $tasks;
153 2
        $this->servers = $servers;
154 2
        $this->environments = $environments;
155 2
        $this->input = $input;
156 2
        $this->output = new OutputWatcher($output);
157 2
        $this->informer = new Informer($this->output);
158 2
        $this->localhost = new Local();
159 2
        $this->localEnv = new Environment();
160 2
        $this->port = self::START_PORT;
161
162
        connect:
163
164 2
        $this->pure = new Server($this->port);
165 2
        $this->loop = $this->pure->getLoop();
166
167
        // Start workers for each server.
168 2
        $this->loop->addTimer(0, [$this, 'startWorkers']);
169
170
        // Wait for output
171 2
        $this->outputStorage = $this->pure['output'] = new QueueStorage();
172 2
        $this->loop->addPeriodicTimer(0, [$this, 'catchOutput']);
173
174
        // Lookup for exception
175 2
        $this->exceptionStorage = $this->pure['exception'] = new QueueStorage();
176 2
        $this->loop->addPeriodicTimer(0, [$this, 'catchExceptions']);
177
178
        // Send workers tasks to do.
179 2
        $this->loop->addPeriodicTimer(0, [$this, 'sendTasks']);
180
181
        // Wait all workers finish they tasks.
182 2
        $this->loop->addPeriodicTimer(0, [$this, 'idle']);
183
184
        // Start loop
185
        try {
186 2
            $this->pure->run();
187 2
        } catch (ConnectionException $exception) {
188
            // If port is already used, try with another one.
189 1
            $output->writeln("<fg=red>✘ " . $exception->getMessage() . "</fg=red>");
190
191 1
            if (++$this->port <= self::STOP_PORT) {
192 1
                goto connect;
193
            }
194
        }
195
196 2
        if (!$this->isSuccessfullyFinished) {
197
            throw $this->lastException;
198
        }
199 2
    }
200
201
    /**
202
     * Start workers, put master port, server name to run on, and options stuff.
203
     */
204 2
    public function startWorkers()
205
    {
206
        // Get verbosity.
207 2
        $verbosity = new VerbosityString($this->output);
208
209
        // Get current deploy.php file.
210 2
        $deployPhpFile = $this->input->getOption('file');
211
212
        // User input.
213 2
        $input = '';
214
215
        // Get user arguments.
216 2
        foreach ($this->userDefinition->getArguments() as $argument) {
217 2
            $value = $this->input->getArgument($argument->getName());
218
219 2
            if ($value) {
220
                $input .= " $value";
221
            }
222 2
        }
223
224
        // Get user options.
225 2
        foreach ($this->userDefinition->getOptions() as $option) {
226 2
            $value = $this->input->getOption($option->getName());
227
228 2
            if ($value) {
229
                $input .= " --{$option->getName()} $value";
230
            }
231 2
        }
232
233 2
        foreach ($this->servers as $serverName => $server) {
234 2
            $process = new Process(
235 2
                "php " . DEPLOYER_BIN .
236 2
                (null === $deployPhpFile ? "" : " --file=$deployPhpFile") .
237 2
                " worker " .
238 2
                " --master 127.0.0.1:{$this->port}" .
239 2
                " --server $serverName" .
240 2
                " $input " .
241 2
                " $verbosity" .
242
                " &"
243 2
            );
244 2
            $process->disableOutput();
245 2
            $process->start();
246 2
        }
247 2
    }
248
249
    /**
250
     * Wait for output from workers.
251
     */
252 2
    public function catchOutput()
253
    {
254 2
        while (count($this->outputStorage) > 0) {
255
            list(, $messages, , $type) = $this->outputStorage->pop();
256
257
            $format = function ($message) {
258
                $message = rtrim($message, "\n");
259
                return implode("\n", array_map(function ($text) {
260
                    return $text;
261
                }, explode("\n", $message)));
262
            };
263
264
            $this->output->writeln(array_map($format, (array)$messages), $type);
265
        }
266 2
    }
267
268
    /**
269
     * Wait for exceptions from workers.
270
     */
271 2
    public function catchExceptions()
272
    {
273 2
        while (count($this->exceptionStorage) > 0) {
274
            list($serverName, $exceptionClass, $message) = $this->exceptionStorage->pop();
275
276
            // Print exception message.
277
            $this->informer->taskException($serverName, $exceptionClass, $message);
278
279
            // Save exception.
280
            $this->lastException = new ForwardException($serverName, $exceptionClass, $message);
281
282
            // We got some exception, so not.
283
            $this->isSuccessfullyFinished = false;
284
285
            if ($exceptionClass == 'Deployer\Exception\NonFatalException') {
286
287
                // If we got NonFatalException, continue other tasks.
288
                $this->hasNonFatalException = true;
289
            } else {
290
291
                // Do not run other task.
292
                // Finish all current worker tasks and stop loop.
293
                $this->tasks = [];
294
295
                // Worker will not mark this task as done (remove self server name from `tasks_to_do` list),
296
                // so to finish current task execution we need to manually remove it from that list.
297
                $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
298
                $taskToDoStorage->delete($serverName);
299
            }
300
        }
301 2
    }
302
303
    /**
304
     * Action time for master! Send tasks `to-do` for workers and go to sleep.
305
     * Also decide when to stop server/loop.
306
     */
307 2
    public function sendTasks()
308
    {
309 2
        if (!$this->wait) {
310 2
            if (count($this->tasks) > 0) {
311
312
                // Get task name to do.
313 2
                $task = current($this->tasks);
314 2
                $taskName = $task->getName();
315 2
                array_shift($this->tasks);
316
317 2
                $this->informer->startTask($taskName);
318
319 2
                if ($task->isOnce()) {
320 2
                    $task->run(new Context($this->localhost, $this->localEnv, $this->input, $this->output));
321 2
                    $this->informer->endTask();
322 2
                } else {
323
                    $this->tasksToDo = [];
324
325
                    foreach ($this->servers as $serverName => $server) {
326
                        if ($task->isOnServer($serverName)) {
327
                            if (!isset($this->environments[$serverName])) {
328
                                $this->environments[$serverName] = new Environment();
329
                            }
330
331
                            // Start task on $serverName.
332
                            $this->tasksToDo[$serverName] = $taskName;
333
                        }
334
                    }
335
336
                    // Inform all workers what tasks they need to do.
337
                    $taskToDoStorage = new ArrayStorage();
338
                    $taskToDoStorage->push($this->tasksToDo);
339
                    $this->pure->setStorage('tasks_to_do', $taskToDoStorage);
340
341
                    $this->wait = true;
342
                }
343 2
            } else {
344 2
                $this->loop->stop();
345
            }
346 2
        }
347 2
    }
348
349
    /**
350
     * While idle master, print information about finished tasks.
351
     */
352 2
    public function idle()
353
    {
354 2
        if ($this->wait) {
355
            $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
356
357
            foreach ($this->tasksToDo as $serverName => $taskName) {
358
                if (!$taskToDoStorage->has($serverName)) {
359
                    $this->informer->endOnServer($serverName);
360
                    unset($this->tasksToDo[$serverName]);
361
                }
362
            }
363
364
            if (count($taskToDoStorage) === 0) {
365
                if ($this->isSuccessfullyFinished) {
366
                    $this->informer->endTask();
367
                } else {
368
                    $this->informer->taskError($this->hasNonFatalException);
369
                }
370
371
                // We waited all workers to finish their tasks.
372
                // Wait no more!
373
                $this->wait = false;
374
375
                if ($this->isSuccessfullyFinished || $this->hasNonFatalException) {
376
                    // Reset to default for next tasks.
377
                    $this->isSuccessfullyFinished = true;
378
                }
379
            }
380
        }
381 2
    }
382
}
383