GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 3308b4...466970 )
by Anton
02:28
created

ParallelExecutor   C

Complexity

Total Complexity 32

Size/Duplication

Total Lines 359
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 19

Test Coverage

Coverage 91.06%

Importance

Changes 0
Metric Value
dl 0
loc 359
ccs 112
cts 123
cp 0.9106
rs 6.6
c 0
b 0
f 0
wmc 32
lcom 1
cbo 19

7 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
B run() 0 50 4
C startWorkers() 0 44 7
A catchOutput() 0 15 2
B catchExceptions() 0 31 3
C sendTasks() 0 41 7
C idle() 0 30 8
1
<?php
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Output\OutputWatcher;
11
use Deployer\Console\Output\VerbosityString;
12
use Deployer\Exception\Exception;
13
use Deployer\Exception\ForwardException;
14
use Deployer\Server\Environment;
15
use Deployer\Server\Local;
16
use Deployer\Task\Context;
17
use Pure\Server;
18
use Pure\Storage\ArrayStorage;
19
use Pure\Storage\QueueStorage;
20
use React\Socket\ConnectionException;
21
use Symfony\Component\Console\Input\InputDefinition;
22
use Symfony\Component\Process\Process;
23
24
class ParallelExecutor implements ExecutorInterface
25
{
26
    /**
27
     * Try to start server on this port.
28
     */
29
    const START_PORT = 3333;
30
31
    /**
32
     * If fails on start port, try until stop port.
33
     */
34
    const STOP_PORT = 3340;
35
36
    /**
37
     * @var InputDefinition
38
     */
39
    private $userDefinition;
40
41
    /**
42
     * @var \Deployer\Task\Task[]
43
     */
44
    private $tasks;
45
46
    /**
47
     * @var \Deployer\Server\ServerInterface[]
48
     */
49
    private $servers;
50
51
    /**
52
     * @var \Deployer\Server\Environment[]
53
     */
54
    private $environments;
55
56
    /**
57
     * @var \Symfony\Component\Console\Input\InputInterface
58
     */
59
    private $input;
60
61
    /**
62
     * @var \Symfony\Component\Console\Output\OutputInterface
63
     */
64
    private $output;
65
66
    /**
67
     * @var Informer
68
     */
69
    private $informer;
70
71
    /**
72
     * @var int
73
     */
74
    private $port;
75
76
    /**
77
     * @var Server
78
     */
79
    private $pure;
80
81
    /**
82
     * @var \React\EventLoop\LoopInterface
83
     */
84
    private $loop;
85
86
    /**
87
     * Wait until all workers finish they tasks. When set this variable to true and send new tasks to workers.
88
     *
89
     * @var bool
90
     */
91
    private $wait = false;
92
93
    /**
94
     * @var QueueStorage
95
     */
96
    private $outputStorage;
97
98
    /**
99
     * @var QueueStorage
100
     */
101
    private $exceptionStorage;
102
103
    /**
104
     * Array will contain tasks list what workers has to before moving to next task.
105
     *
106
     * @var array
107
     */
108
    private $tasksToDo = [];
109
110
    /**
111
     * Check if current task was successfully finished on all server (no exception was triggered).
112
     *
113
     * @var bool
114
     */
115
    private $isSuccessfullyFinished = true;
116
117
    /**
118
     * Check if current task triggered a non-fatal exception.
119
     *
120
     * @var bool
121
     */
122
    private $hasNonFatalException = false;
123
124
    /**
125 2
     * @var Exception
126
     */
127 2
    private $lastException;
128 2
129
    /**
130
     * @var Local
131
     */
132
    private $localhost;
133 2
134
    /**
135 2
     * @var Environment
136 2
     */
137 2
    private $localEnv;
138 2
139 2
    /**
140 2
     * @param InputDefinition $userDefinition
141 2
     */
142
    public function __construct(InputDefinition $userDefinition)
143
    {
144
        $this->userDefinition = $userDefinition;
145 2
    }
146 2
147
    /**
148
     * {@inheritdoc}
149 2
     */
150
    public function run($tasks, $servers, $environments, $input, $output)
151
    {
152 2
        $this->tasks = $tasks;
153 2
        $this->servers = $servers;
154
        $this->environments = $environments;
155
        $this->input = $input;
156 2
        $this->output = new OutputWatcher($output);
157 2
        $this->informer = new Informer($this->output);
158
        $this->localhost = new Local();
159
        $this->localEnv = new Environment();
160 2
        $this->port = self::START_PORT;
161
162
        connect:
163 2
164
        $this->pure = new Server($this->port);
165
        $this->loop = $this->pure->getLoop();
166
167 2
        // Start workers for each server.
168 2
        $this->loop->addTimer(0, [$this, 'startWorkers']);
169
170 1
        // Wait for output
171
        $this->outputStorage = $this->pure['output'] = new QueueStorage();
172 1
        $this->loop->addPeriodicTimer(0, [$this, 'catchOutput']);
173 1
174
        // Lookup for exception
175
        $this->exceptionStorage = $this->pure['exception'] = new QueueStorage();
176 2
        $this->loop->addPeriodicTimer(0, [$this, 'catchExceptions']);
177
178
        // Send workers tasks to do.
179
        $this->loop->addPeriodicTimer(0, [$this, 'sendTasks']);
180
181 2
        // Wait all workers finish they tasks.
182
        $this->loop->addPeriodicTimer(0, [$this, 'idle']);
183
184 2
        // Start loop
185 2
        try {
186 2
            $this->pure->run();
187
        } catch (ConnectionException $exception) {
188
            // If port is already used, try with another one.
189 2
            $output->writeln("<fg=red>✘ " . $exception->getMessage() . "</fg=red>");
190
191
            if (++$this->port <= self::STOP_PORT) {
192 2
                goto connect;
193
            }
194
        }
195 2
196 2
        if (!$this->isSuccessfullyFinished) {
197 2
            throw $this->lastException;
198
        }
199
    }
200 2
201 2
    /**
202 2
     * Start workers, put master port, server name to run on, and options stuff.
203
     */
204 2
    public function startWorkers()
205 2
    {
206
        // Get verbosity.
207 2
        $verbosity = new VerbosityString($this->output);
208 2
209 2
        // Get current deploy.php file.
210 2
        $deployPhpFile = $this->input->getOption('file');
211 2
212 2
        // User input.
213
        $input = '';
214 2
215 2
        // Get user arguments.
216 2
        foreach ($this->userDefinition->getArguments() as $argument) {
217 2
            $value = $this->input->getArgument($argument->getName());
218 2
219
            if ($value) {
220
                $input .= " $value";
221
            }
222
        }
223 2
224
        // Get user options.
225 2
        foreach ($this->userDefinition->getOptions() as $option) {
226 1
            $value = $this->input->getOption($option->getName());
227
228
            if ($value) {
229 1
                $input .= " --{$option->getName()} $value";
230 1
            }
231 1
        }
232 1
233 1
        foreach ($this->servers as $serverName => $server) {
234
            $process = new Process(
235 1
                "php " . DEPLOYER_BIN .
236 1
                (null === $deployPhpFile ? "" : " --file=$deployPhpFile") .
237 2
                " worker " .
238
                " --master 127.0.0.1:{$this->port}" .
239
                " --server $serverName" .
240
                " $input " .
241
                " $verbosity" .
242 2
                " &"
243
            );
244 2
            $process->disableOutput();
245
            $process->start();
246
        }
247
    }
248
249
    /**
250
     * Wait for output from workers.
251
     */
252
    public function catchOutput()
253
    {
254
        while (count($this->outputStorage) > 0) {
255
            list(, $messages, , $type) = $this->outputStorage->pop();
256
257
            $format = function ($message) {
258
                $message = rtrim($message, "\n");
259
                return implode("\n", array_map(function ($text) {
260
                    return $text;
261
                }, explode("\n", $message)));
262
            };
263
264
            $this->output->writeln(array_map($format, (array)$messages), $type);
265
        }
266
    }
267
268
    /**
269 2
     * Wait for exceptions from workers.
270
     */
271
    public function catchExceptions()
272
    {
273
        while (count($this->exceptionStorage) > 0) {
274
            list($serverName, $exceptionClass, $message) = $this->exceptionStorage->pop();
275 2
276
            // Print exception message.
277 2
            $this->informer->taskException($serverName, $exceptionClass, $message);
278 2
279
            // Save exception.
280
            $this->lastException = new ForwardException($serverName, $exceptionClass, $message);
281 2
282 2
            // We got some exception, so not.
283 2
            $this->isSuccessfullyFinished = false;
284
285 2
            if ($exceptionClass == 'Deployer\Exception\NonFatalException') {
286
287 2
                // If we got NonFatalException, continue other tasks.
288 2
                $this->hasNonFatalException = true;
289 2
            } else {
290 2
291 2
                // Do not run other task.
292
                // Finish all current worker tasks and stop loop.
293 2
                $this->tasks = [];
294 2
295 2
                // Worker will not mark this task as done (remove self server name from `tasks_to_do` list),
296
                // so to finish current task execution we need to manually remove it from that list.
297 2
                $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
298 1
                $taskToDoStorage->delete($serverName);
299
            }
300
        }
301 1
    }
302 1
303 1
    /**
304 2
     * Action time for master! Send tasks `to-do` for workers and go to sleep.
305
     * Also decide when to stop server/loop.
306
     */
307 2
    public function sendTasks()
308 2
    {
309 2
        if (!$this->wait) {
310
            if (count($this->tasks) > 0) {
311 2
312
                // Get task name to do.
313 2
                $task = current($this->tasks);
314 2
                $taskName = $task->getName();
315
                array_shift($this->tasks);
316 2
317 2
                $this->informer->startTask($taskName);
318
319
                if ($task->isOnce()) {
320
                    $task->run(new Context($this->localhost, $this->localEnv, $this->input, $this->output));
321
                    $this->informer->endTask();
322 2
                } else {
323
                    $this->tasksToDo = [];
324 2
325 2
                    foreach ($this->servers as $serverName => $server) {
326
                        if ($task->isOnServer($serverName)) {
327 2
                            if (!isset($this->environments[$serverName])) {
328 1
                                $this->environments[$serverName] = new Environment();
329 1
                            }
330 1
331 1
                            // Start task on $serverName.
332 2
                            $this->tasksToDo[$serverName] = $taskName;
333
                        }
334 2
                    }
335 2
336 2
                    // Inform all workers what tasks they need to do.
337 2
                    $taskToDoStorage = new ArrayStorage();
338
                    $taskToDoStorage->push($this->tasksToDo);
339
                    $this->pure->setStorage('tasks_to_do', $taskToDoStorage);
340
341
                    $this->wait = true;
342
                }
343 2
            } else {
344
                $this->loop->stop();
345
            }
346 2
        }
347 2
    }
348 2
349 2
    /**
350
     * While idle master, print information about finished tasks.
351
     */
352
    public function idle()
353
    {
354
        if ($this->wait) {
355
            $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
356
357
            foreach ($this->tasksToDo as $serverName => $taskName) {
358
                if (!$taskToDoStorage->has($serverName)) {
359
                    $this->informer->endOnServer($serverName);
360
                    unset($this->tasksToDo[$serverName]);
361
                }
362
            }
363
364
            if (count($taskToDoStorage) === 0) {
365
                if ($this->isSuccessfullyFinished) {
366
                    $this->informer->endTask();
367
                } else {
368
                    $this->informer->taskError($this->hasNonFatalException);
369
                }
370
371
                // We waited all workers to finish their tasks.
372
                // Wait no more!
373
                $this->wait = false;
374
375
                if ($this->isSuccessfullyFinished || $this->hasNonFatalException) {
376
                    // Reset to default for next tasks.
377
                    $this->isSuccessfullyFinished = true;
378
                }
379
            }
380
        }
381
    }
382
}
383