GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#505)
by Martin
05:52
created

ParallelExecutor   B

Complexity

Total Complexity 26

Size/Duplication

Total Lines 318
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 16

Test Coverage

Coverage 84.03%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 26
lcom 1
cbo 16
dl 0
loc 318
ccs 100
cts 119
cp 0.8403
rs 8.4615
c 1
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
B run() 0 43 3
B startWorkers() 0 38 5
A catchOutput() 0 16 2
B catchExceptions() 0 28 3
B sendTasks() 0 37 6
B idle() 0 28 6
1
<?php
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Output\OutputWatcher;
11
use Deployer\Console\Output\VerbosityString;
12
use Deployer\Task\Context;
13
use Pure\Server;
14
use Pure\Storage\ArrayStorage;
15
use Pure\Storage\QueueStorage;
16
use React\Socket\ConnectionException;
17
use Symfony\Component\Console\Input\ArrayInput;
18
use Symfony\Component\Console\Input\InputDefinition;
19
use Symfony\Component\Process\Process;
20
21
class ParallelExecutor implements ExecutorInterface
22
{
23
    /**
24
     * Try to start server on this port.
25
     */
26
    const START_PORT = 3333;
27
28
    /**
29
     * If fails on start port, try until stop port.
30
     */
31
    const STOP_PORT = 3340;
32
33
    /**
34
     * @var InputDefinition
35
     */
36
    private $userDefinition;
37
38
    /**
39
     * @var \Deployer\Task\Task[]
40
     */
41
    private $tasks;
42
43
    /**
44
     * @var \Deployer\Server\ServerInterface[]
45
     */
46
    private $servers;
47
48
    /**
49
     * @var \Symfony\Component\Console\Input\InputInterface
50
     */
51
    private $input;
52
53
    /**
54
     * @var \Symfony\Component\Console\Output\OutputInterface
55
     */
56
    private $output;
57
58
    /**
59
     * @var Informer
60
     */
61
    private $informer;
62
63
    /**
64
     * @var int
65
     */
66
    private $port;
67
68
    /**
69
     * @var Server
70
     */
71
    private $pure;
72
73
    /**
74
     * @var \React\EventLoop\LoopInterface
75
     */
76
    private $loop;
77
78
    /**
79
     * Wait until all workers finish they tasks. When set this variable to true and send new tasks to workers.
80
     *
81
     * @var bool
82
     */
83
    private $wait = false;
84
85
    /**
86
     * @var QueueStorage
87
     */
88
    private $outputStorage;
89
90
    /**
91
     * @var QueueStorage
92
     */
93
    private $exceptionStorage;
94
95
    /**
96
     * Array will contain tasks list what workers has to before moving to next task.
97
     *
98
     * @var array
99
     */
100
    private $tasksToDo = [];
101
102
    /**
103
     * Check if current task was successfully finished on all server (no exception was triggered).
104
     *
105
     * @var bool
106
     */
107
    private $isSuccessfullyFinished = true;
108
109
    /**
110
     * Check if current task triggered a non-fatal exception.
111
     *
112
     * @var bool
113
     */
114
    private $hasNonFatalException = false;
115
116
    /**
117
     * @param InputDefinition $userDefinition
118
     */
119 1
    public function __construct(InputDefinition $userDefinition)
120
    {
121 1
        $this->userDefinition = $userDefinition;
122 1
    }
123
    
124
    /**
125
     * {@inheritdoc}
126
     */
127 1
    public function run($tasks, $servers, $environments, $input, $output)
128
    {
129 1
        $this->tasks = $tasks;
130 1
        $this->servers = $servers;
131 1
        $this->input = $input;
132 1
        $this->output = new OutputWatcher($output);
133 1
        $this->informer = new Informer($this->output);
134 1
        $this->port = self::START_PORT;
135
136
        connect:
137
138 1
        $this->pure = new Server($this->port);
139 1
        $this->loop = $this->pure->getLoop();
140
141
        // Start workers for each server.
142 1
        $this->loop->addTimer(0, [$this, 'startWorkers']);
143
144
        // Wait for output
145 1
        $this->outputStorage = $this->pure['output'] = new QueueStorage();
146 1
        $this->loop->addPeriodicTimer(0, [$this, 'catchOutput']);
147
148
        // Lookup for exception
149 1
        $this->exceptionStorage = $this->pure['exception'] = new QueueStorage();
150 1
        $this->loop->addPeriodicTimer(0, [$this, 'catchExceptions']);
151
152
        // Send workers tasks to do.
153 1
        $this->loop->addPeriodicTimer(0, [$this, 'sendTasks']);
154
155
        // Wait all workers finish they tasks.
156 1
        $this->loop->addPeriodicTimer(0, [$this, 'idle']);
157
158
        // Start loop
159
        try {
160 1
            $this->pure->run();
161 1
        } catch (ConnectionException $exception) {
162
            // If port is already used, try with another one.
163
            $output->writeln("<error>" . $exception->getMessage() . "</error>");
164
165
            if (++$this->port <= self::STOP_PORT) {
166
                goto connect;
167
            }
168
        }
169 1
    }
170
171
    /**
172
     * Start workers, put master port, server name to run on, and options stuff.
173
     */
174 1
    public function startWorkers()
175
    {
176
        $input = [
177 1
            '--master' => '127.0.0.1:' . $this->port,
178 1
            '--server' => '',
179 1
        ];
180
        
181
        // Get verbosity.
182 1
        $verbosity = new VerbosityString($this->output);
183
184
        // Get current deploy.php file.
185 1
        $deployPhpFile = $this->input->getOption('file');
186
187
        // Get user arguments.
188 1
        foreach ($this->userDefinition->getArguments() as $argument) {
189
            $input[$argument->getName()] = $this->input->getArgument($argument->getName());
190 1
        }
191
192
        // Get user options.
193 1
        foreach ($this->userDefinition->getOptions() as $option) {
194
            $input["--" . $option->getName()] = $this->input->getOption($option->getName());
195 1
        }
196
        
197 1
        foreach ($this->servers as $serverName => $server) {
198 1
            $input['--server'] = $serverName;
199
            
200 1
            $process = new Process(
201 1
                "php " . DEPLOYER_BIN .
202 1
                (null === $deployPhpFile ? "" : " --file=$deployPhpFile") .
203 1
                " worker " .
204 1
                new ArrayInput($input) .
205 1
                " $verbosity" .
206
                " &"
207 1
            );
208 1
            $process->disableOutput();
209 1
            $process->run();
210 1
        }
211 1
    }
212
213
    /**
214
     * Wait for output from workers.
215
     */
216 1
    public function catchOutput()
217
    {
218 1
        while (count($this->outputStorage) > 0) {
219 1
            list($server, $messages, , $type) = $this->outputStorage->pop();
220
221
            $format = function ($message) use ($server) {
222 1
                $message = rtrim($message, "\n");
223 1
                return implode("\n", array_map(function ($text) use ($server) {
224 1
                    return "[$server] $text";
225 1
                }, explode("\n", $message)));
226
227 1
            };
228
229 1
            $this->output->writeln(array_map($format, (array)$messages), $type);
230 1
        }
231 1
    }
232
233
    /**
234
     * Wait for exceptions from workers.
235
     */
236 1
    public function catchExceptions()
237
    {
238 1
        while (count($this->exceptionStorage) > 0) {
239
            list($serverName, $exceptionClass, $message) = $this->exceptionStorage->pop();
240
241
            // Print exception message.
242
            $this->informer->taskException($serverName, $exceptionClass, $message);
243
244
            // We got some exception, so not.
245
            $this->isSuccessfullyFinished = false;
246
            
247
            if ($exceptionClass == 'Deployer\Task\NonFatalException') {
248
249
                // If we got NonFatalException, continue other tasks.
250
                $this->hasNonFatalException = true;
251
            } else {
252
253
                // Do not run other task.
254
                // Finish all current worker tasks and stop loop.
255
                $this->tasks = [];
256
257
                // Worker will not mark this task as done (remove self server name from `tasks_to_do` list),
258
                // so to finish current task execution we need to manually remove it from that list.
259
                $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
260
                $taskToDoStorage->delete($serverName);
261
            }
262
        }
263 1
    }
264
265
    /**
266
     * Action time for master! Send tasks `to-do` for workers and go to sleep.
267
     * Also decide when to stop server/loop.
268
     */
269 1
    public function sendTasks()
270
    {
271 1
        if (!$this->wait) {
272 1
            if (count($this->tasks) > 0) {
273
274
                // Get task name to do.
275 1
                $task = current($this->tasks);
276 1
                $taskName = $task->getName();
277 1
                array_shift($this->tasks);
278
279 1
                $this->informer->startTask($taskName);
280
281 1
                if ($task->isOnce()) {
282
                    $task->run(new Context(null, null, $this->input, $this->output));
283
                    $this->informer->endTask();
284
                } else {
285 1
                    $this->tasksToDo = [];
286
287 1
                    foreach ($this->servers as $serverName => $server) {
288 1
                        if ($task->runOnServer($serverName)) {
289 1
                            $this->informer->onServer($serverName);
290 1
                            $this->tasksToDo[$serverName] = $taskName;
291 1
                        }
292 1
                    }
293
294
                    // Inform all workers what tasks they need to do.
295 1
                    $taskToDoStorage = new ArrayStorage();
296 1
                    $taskToDoStorage->push($this->tasksToDo);
297 1
                    $this->pure->setStorage('tasks_to_do', $taskToDoStorage);
298
299 1
                    $this->wait = true;
300
                }
301 1
            } else {
302 1
                $this->loop->stop();
303
            }
304 1
        }
305 1
    }
306
307
    /**
308
     * While idle master, print information about finished tasks.
309
     */
310 1
    public function idle()
311
    {
312 1
        if ($this->wait) {
313 1
            $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
314
315 1
            foreach ($this->tasksToDo as $serverName => $taskName) {
316 1
                if (!$taskToDoStorage->has($serverName)) {
317 1
                    $this->informer->endOnServer($serverName);
318 1
                    unset($this->tasksToDo[$serverName]);
319 1
                }
320 1
            }
321
322 1
            if (count($taskToDoStorage) === 0) {
323 1
                if ($this->isSuccessfullyFinished) {
324 1
                    $this->informer->endTask();
325 1
                } else {
326
                    $this->informer->taskError($this->hasNonFatalException);
327
                }
328
                
329
                // We waited all workers to finish their tasks.
330
                // Wait no more!
331 1
                $this->wait = false;
332
333
                // Reset to default for next tasks.
334 1
                $this->isSuccessfullyFinished = true;
335 1
            }
336 1
        }
337 1
    }
338
}
339