GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 07f145...3e40fc )
by Anton
02:18
created

ParallelExecutor   B

Complexity

Total Complexity 32

Size/Duplication

Total Lines 359
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 17

Test Coverage

Coverage 91.06%

Importance

Changes 0
Metric Value
dl 0
loc 359
ccs 112
cts 123
cp 0.9106
rs 7.5428
c 0
b 0
f 0
wmc 32
lcom 1
cbo 17

7 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
B run() 0 50 4
C startWorkers() 0 44 7
A catchOutput() 0 15 2
B catchExceptions() 0 31 3
C sendTasks() 0 41 7
C idle() 0 30 8
1
<?php
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Output\OutputWatcher;
11
use Deployer\Console\Output\VerbosityString;
12
use Deployer\Server\Environment;
13
use Deployer\Server\Local;
14
use Deployer\Task\Context;
15
use Pure\Server;
16
use Pure\Storage\ArrayStorage;
17
use Pure\Storage\QueueStorage;
18
use React\Socket\ConnectionException;
19
use Symfony\Component\Console\Input\InputDefinition;
20
use Symfony\Component\Process\Process;
21
22
class ParallelExecutor implements ExecutorInterface
23
{
24
    /**
25
     * Try to start server on this port.
26
     */
27
    const START_PORT = 3333;
28
29
    /**
30
     * If fails on start port, try until stop port.
31
     */
32
    const STOP_PORT = 3340;
33
34
    /**
35
     * @var InputDefinition
36
     */
37
    private $userDefinition;
38
39
    /**
40
     * @var \Deployer\Task\Task[]
41
     */
42
    private $tasks;
43
44
    /**
45
     * @var \Deployer\Server\ServerInterface[]
46
     */
47
    private $servers;
48
49
    /**
50
     * @var \Deployer\Server\Environment[]
51
     */
52
    private $environments;
53
54
    /**
55
     * @var \Symfony\Component\Console\Input\InputInterface
56
     */
57
    private $input;
58
59
    /**
60
     * @var \Symfony\Component\Console\Output\OutputInterface
61
     */
62
    private $output;
63
64
    /**
65
     * @var Informer
66
     */
67
    private $informer;
68
69
    /**
70
     * @var int
71
     */
72
    private $port;
73
74
    /**
75
     * @var Server
76
     */
77
    private $pure;
78
79
    /**
80
     * @var \React\EventLoop\LoopInterface
81
     */
82
    private $loop;
83
84
    /**
85
     * Wait until all workers finish they tasks. When set this variable to true and send new tasks to workers.
86
     *
87
     * @var bool
88
     */
89
    private $wait = false;
90
91
    /**
92
     * @var QueueStorage
93
     */
94
    private $outputStorage;
95
96
    /**
97
     * @var QueueStorage
98
     */
99
    private $exceptionStorage;
100
101
    /**
102
     * Array will contain tasks list what workers has to before moving to next task.
103
     *
104
     * @var array
105
     */
106
    private $tasksToDo = [];
107
108
    /**
109
     * Check if current task was successfully finished on all server (no exception was triggered).
110
     *
111
     * @var bool
112
     */
113
    private $isSuccessfullyFinished = true;
114
115
    /**
116
     * Check if current task triggered a non-fatal exception.
117
     *
118
     * @var bool
119
     */
120
    private $hasNonFatalException = false;
121
122
    /**
123
     * @var string
124
     */
125 2
    private $lastExceptionMessage = false;
126
127 2
    /**
128 2
     * @var Local
129
     */
130
    private $localhost;
131
132
    /**
133 2
     * @var Environment
134
     */
135 2
    private $localEnv;
136 2
137 2
    /**
138 2
     * @param InputDefinition $userDefinition
139 2
     */
140 2
    public function __construct(InputDefinition $userDefinition)
141 2
    {
142
        $this->userDefinition = $userDefinition;
143
    }
144
    
145 2
    /**
146 2
     * {@inheritdoc}
147
     */
148
    public function run($tasks, $servers, $environments, $input, $output)
149 2
    {
150
        $this->tasks = $tasks;
151
        $this->servers = $servers;
152 2
        $this->environments = $environments;
153 2
        $this->input = $input;
154
        $this->output = new OutputWatcher($output);
155
        $this->informer = new Informer($this->output);
156 2
        $this->localhost = new Local();
157 2
        $this->localEnv = new Environment();
158
        $this->port = self::START_PORT;
159
160 2
        connect:
161
162
        $this->pure = new Server($this->port);
163 2
        $this->loop = $this->pure->getLoop();
164
165
        // Start workers for each server.
166
        $this->loop->addTimer(0, [$this, 'startWorkers']);
167 2
168 2
        // Wait for output
169
        $this->outputStorage = $this->pure['output'] = new QueueStorage();
170 1
        $this->loop->addPeriodicTimer(0, [$this, 'catchOutput']);
171
172 1
        // Lookup for exception
173 1
        $this->exceptionStorage = $this->pure['exception'] = new QueueStorage();
174
        $this->loop->addPeriodicTimer(0, [$this, 'catchExceptions']);
175
176 2
        // Send workers tasks to do.
177
        $this->loop->addPeriodicTimer(0, [$this, 'sendTasks']);
178
179
        // Wait all workers finish they tasks.
180
        $this->loop->addPeriodicTimer(0, [$this, 'idle']);
181 2
182
        // Start loop
183
        try {
184 2
            $this->pure->run();
185 2
        } catch (ConnectionException $exception) {
186 2
            // If port is already used, try with another one.
187
            $output->writeln("<fg=red>✘ " . $exception->getMessage() . "</fg=red>");
188
189 2
            if (++$this->port <= self::STOP_PORT) {
190
                goto connect;
191
            }
192 2
        }
193
194
        if (!$this->isSuccessfullyFinished) {
195 2
            throw new \RuntimeException($this->lastExceptionMessage);
196 2
        }
197 2
    }
198
199
    /**
200 2
     * Start workers, put master port, server name to run on, and options stuff.
201 2
     */
202 2
    public function startWorkers()
203
    {
204 2
        // Get verbosity.
205 2
        $verbosity = new VerbosityString($this->output);
206
207 2
        // Get current deploy.php file.
208 2
        $deployPhpFile = $this->input->getOption('file');
209 2
210 2
        // User input.
211 2
        $input = '';
212 2
213
        // Get user arguments.
214 2
        foreach ($this->userDefinition->getArguments() as $argument) {
215 2
            $value = $this->input->getArgument($argument->getName());
216 2
217 2
            if ($value) {
218 2
                $input .= " $value";
219
            }
220
        }
221
222
        // Get user options.
223 2
        foreach ($this->userDefinition->getOptions() as $option) {
224
            $value = $this->input->getOption($option->getName());
225 2
226 1
            if ($value) {
227
                $input .= " --{$option->getName()} $value";
228
            }
229 1
        }
230 1
        
231 1
        foreach ($this->servers as $serverName => $server) {
232 1
            $process = new Process(
233 1
                "php " . DEPLOYER_BIN .
234
                (null === $deployPhpFile ? "" : " --file=$deployPhpFile") .
235 1
                " worker " .
236 1
                " --master 127.0.0.1:{$this->port}" .
237 2
                " --server $serverName" .
238
                " $input " .
239
                " $verbosity" .
240
                " &"
241
            );
242 2
            $process->disableOutput();
243
            $process->start();
244 2
        }
245
    }
246
247
    /**
248
     * Wait for output from workers.
249
     */
250
    public function catchOutput()
251
    {
252
        while (count($this->outputStorage) > 0) {
253
            list(, $messages, , $type) = $this->outputStorage->pop();
254
255
            $format = function ($message) {
256
                $message = rtrim($message, "\n");
257
                return implode("\n", array_map(function ($text) {
258
                    return $text;
259
                }, explode("\n", $message)));
260
            };
261
262
            $this->output->writeln(array_map($format, (array)$messages), $type);
263
        }
264
    }
265
266
    /**
267
     * Wait for exceptions from workers.
268
     */
269 2
    public function catchExceptions()
270
    {
271
        while (count($this->exceptionStorage) > 0) {
272
            list($serverName, $exceptionClass, $message) = $this->exceptionStorage->pop();
273
274
            // Print exception message.
275 2
            $this->informer->taskException($serverName, $exceptionClass, $message);
276
277 2
            // Save message.
278 2
            $this->lastExceptionMessage = $message;
279
280
            // We got some exception, so not.
281 2
            $this->isSuccessfullyFinished = false;
282 2
            
283 2
            if ($exceptionClass == 'Deployer\Exception\NonFatalException') {
284
285 2
                // If we got NonFatalException, continue other tasks.
286
                $this->hasNonFatalException = true;
287 2
            } else {
288 2
289 2
                // Do not run other task.
290 2
                // Finish all current worker tasks and stop loop.
291 2
                $this->tasks = [];
292
293 2
                // Worker will not mark this task as done (remove self server name from `tasks_to_do` list),
294 2
                // so to finish current task execution we need to manually remove it from that list.
295 2
                $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
296
                $taskToDoStorage->delete($serverName);
297 2
            }
298 1
        }
299
    }
300
301 1
    /**
302 1
     * Action time for master! Send tasks `to-do` for workers and go to sleep.
303 1
     * Also decide when to stop server/loop.
304 2
     */
305
    public function sendTasks()
306
    {
307 2
        if (!$this->wait) {
308 2
            if (count($this->tasks) > 0) {
309 2
310
                // Get task name to do.
311 2
                $task = current($this->tasks);
312
                $taskName = $task->getName();
313 2
                array_shift($this->tasks);
314 2
315
                $this->informer->startTask($taskName);
316 2
317 2
                if ($task->isOnce()) {
318
                    $task->run(new Context($this->localhost, $this->localEnv, $this->input, $this->output));
319
                    $this->informer->endTask();
320
                } else {
321
                    $this->tasksToDo = [];
322 2
323
                    foreach ($this->servers as $serverName => $server) {
324 2
                        if ($task->isOnServer($serverName)) {
325 2
                            if (!isset($this->environments[$serverName])) {
326
                                $this->environments[$serverName] = new Environment();
327 2
                            }
328 1
329 1
                            // Start task on $serverName.
330 1
                            $this->tasksToDo[$serverName] = $taskName;
331 1
                        }
332 2
                    }
333
334 2
                    // Inform all workers what tasks they need to do.
335 2
                    $taskToDoStorage = new ArrayStorage();
336 2
                    $taskToDoStorage->push($this->tasksToDo);
337 2
                    $this->pure->setStorage('tasks_to_do', $taskToDoStorage);
338
339
                    $this->wait = true;
340
                }
341
            } else {
342
                $this->loop->stop();
343 2
            }
344
        }
345
    }
346 2
347 2
    /**
348 2
     * While idle master, print information about finished tasks.
349 2
     */
350
    public function idle()
351
    {
352
        if ($this->wait) {
353
            $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
354
355
            foreach ($this->tasksToDo as $serverName => $taskName) {
356
                if (!$taskToDoStorage->has($serverName)) {
357
                    $this->informer->endOnServer($serverName);
358
                    unset($this->tasksToDo[$serverName]);
359
                }
360
            }
361
362
            if (count($taskToDoStorage) === 0) {
363
                if ($this->isSuccessfullyFinished) {
364
                    $this->informer->endTask();
365
                } else {
366
                    $this->informer->taskError($this->hasNonFatalException);
367
                }
368
                
369
                // We waited all workers to finish their tasks.
370
                // Wait no more!
371
                $this->wait = false;
372
373
                if ($this->isSuccessfullyFinished || $this->hasNonFatalException) {
374
                    // Reset to default for next tasks.
375
                    $this->isSuccessfullyFinished = true;
376
                }
377
            }
378
        }
379
    }
380
}
381