GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 67031f...a1fe6c )
by Anton
03:12
created

ParallelExecutor::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 1
dl 0
loc 4
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Output\OutputWatcher;
11
use Deployer\Console\Output\VerbosityString;
12
use Deployer\Server\Environment;
13
use Deployer\Task\Context;
14
use Pure\Server;
15
use Pure\Storage\ArrayStorage;
16
use Pure\Storage\QueueStorage;
17
use React\Socket\ConnectionException;
18
use Symfony\Component\Console\Input\ArrayInput;
19
use Symfony\Component\Console\Input\InputDefinition;
20
use Symfony\Component\Process\Process;
21
22
class ParallelExecutor implements ExecutorInterface
23
{
24
    /**
25
     * Try to start server on this port.
26
     */
27
    const START_PORT = 3333;
28
29
    /**
30
     * If fails on start port, try until stop port.
31
     */
32
    const STOP_PORT = 3340;
33
34
    /**
35
     * @var InputDefinition
36
     */
37
    private $userDefinition;
38
39
    /**
40
     * @var \Deployer\Task\Task[]
41
     */
42
    private $tasks;
43
44
    /**
45
     * @var \Deployer\Server\ServerInterface[]
46
     */
47
    private $servers;
48
49
    /**
50
     * @var \Deployer\Server\Environment[]
51
     */
52
    private $environments;
53
54
    /**
55
     * @var \Symfony\Component\Console\Input\InputInterface
56
     */
57
    private $input;
58
59
    /**
60
     * @var \Symfony\Component\Console\Output\OutputInterface
61
     */
62
    private $output;
63
64
    /**
65
     * @var Informer
66
     */
67
    private $informer;
68
69
    /**
70
     * @var int
71
     */
72
    private $port;
73
74
    /**
75
     * @var Server
76
     */
77
    private $pure;
78
79
    /**
80
     * @var \React\EventLoop\LoopInterface
81
     */
82
    private $loop;
83
84
    /**
85
     * Wait until all workers finish they tasks. When set this variable to true and send new tasks to workers.
86
     *
87
     * @var bool
88
     */
89
    private $wait = false;
90
91
    /**
92
     * @var QueueStorage
93
     */
94
    private $outputStorage;
95
96
    /**
97
     * @var QueueStorage
98
     */
99
    private $exceptionStorage;
100
101
    /**
102
     * Array will contain tasks list what workers has to before moving to next task.
103
     *
104
     * @var array
105
     */
106
    private $tasksToDo = [];
107
108
    /**
109
     * Check if current task was successfully finished on all server (no exception was triggered).
110
     *
111
     * @var bool
112
     */
113
    private $isSuccessfullyFinished = true;
114
115
    /**
116
     * Check if current task triggered a non-fatal exception.
117
     *
118
     * @var bool
119
     */
120
    private $hasNonFatalException = false;
121
122
    /**
123
     * @param InputDefinition $userDefinition
124
     */
125 2
    public function __construct(InputDefinition $userDefinition)
126
    {
127 2
        $this->userDefinition = $userDefinition;
128 2
    }
129
    
130
    /**
131
     * {@inheritdoc}
132
     */
133 2
    public function run($tasks, $servers, $environments, $input, $output)
134
    {
135 2
        $this->tasks = $tasks;
136 2
        $this->servers = $servers;
137 2
        $this->environments = $environments;
138 2
        $this->input = $input;
139 2
        $this->output = new OutputWatcher($output);
140 2
        $this->informer = new Informer($this->output);
141 2
        $this->port = self::START_PORT;
142
143
        connect:
144
145 2
        $this->pure = new Server($this->port);
146 2
        $this->loop = $this->pure->getLoop();
147
148
        // Start workers for each server.
149 2
        $this->loop->addTimer(0, [$this, 'startWorkers']);
150
151
        // Wait for output
152 2
        $this->outputStorage = $this->pure['output'] = new QueueStorage();
153 2
        $this->loop->addPeriodicTimer(0, [$this, 'catchOutput']);
154
155
        // Lookup for exception
156 2
        $this->exceptionStorage = $this->pure['exception'] = new QueueStorage();
157 2
        $this->loop->addPeriodicTimer(0, [$this, 'catchExceptions']);
158
159
        // Send workers tasks to do.
160 2
        $this->loop->addPeriodicTimer(0, [$this, 'sendTasks']);
161
162
        // Wait all workers finish they tasks.
163 2
        $this->loop->addPeriodicTimer(0, [$this, 'idle']);
164
165
        // Start loop
166
        try {
167 2
            $this->pure->run();
168 2
        } catch (ConnectionException $exception) {
169
            // If port is already used, try with another one.
170 1
            $output->writeln("<error>" . $exception->getMessage() . "</error>");
171
172 1
            if (++$this->port <= self::STOP_PORT) {
173 1
                goto connect;
174
            }
175
        }
176 2
    }
177
178
    /**
179
     * Start workers, put master port, server name to run on, and options stuff.
180
     */
181 2
    public function startWorkers()
182
    {
183
        // Get verbosity.
184 2
        $verbosity = new VerbosityString($this->output);
185 2
186 2
        // Get current deploy.php file.
187
        $deployPhpFile = $this->input->getOption('file');
188
189 2
        // User input.
190
        $input = '';
191
192 2
        // Get user arguments.
193
        foreach ($this->userDefinition->getArguments() as $argument) {
194
            $value = $this->input->getArgument($argument->getName());
195 2
196 2
            if ($value) {
197 2
                $input .= " $value";
198
            }
199
        }
200 2
201 2
        // Get user options.
202 2
        foreach ($this->userDefinition->getOptions() as $option) {
203
            $value = $this->input->getOption($option->getName());
204 2
205 2
            if ($value) {
206
                $input .= " --{$option->getName()} $value";
207 2
            }
208 2
        }
209 2
        
210 2
        foreach ($this->servers as $serverName => $server) {
211 2
            $process = new Process(
212 2
                "php " . DEPLOYER_BIN .
213
                (null === $deployPhpFile ? "" : " --file=$deployPhpFile") .
214 2
                " worker " .
215 2
                " --master 127.0.0.1:{$this->port}" .
216 2
                " --server $serverName" .
217 2
                " $input " .
218 2
                " $verbosity" .
219
                " &"
220
            );
221
            $process->disableOutput();
222
            $process->start();
223 2
        }
224
    }
225 2
226 1
    /**
227
     * Wait for output from workers.
228
     */
229 1
    public function catchOutput()
230 1
    {
231 1
        while (count($this->outputStorage) > 0) {
232 1
            list($server, $messages, , $type) = $this->outputStorage->pop();
233 1
234
            $format = function ($message) use ($server) {
235 1
                $message = rtrim($message, "\n");
236 1
                return implode("\n", array_map(function ($text) use ($server) {
237 2
                    return "[$server] $text";
238
                }, explode("\n", $message)));
239
            };
240
241
            $this->output->writeln(array_map($format, (array)$messages), $type);
242 2
        }
243
    }
244 2
245
    /**
246
     * Wait for exceptions from workers.
247
     */
248
    public function catchExceptions()
249
    {
250
        while (count($this->exceptionStorage) > 0) {
251
            list($serverName, $exceptionClass, $message) = $this->exceptionStorage->pop();
252
253
            // Print exception message.
254
            $this->informer->taskException($serverName, $exceptionClass, $message);
255
256
            // We got some exception, so not.
257
            $this->isSuccessfullyFinished = false;
258
            
259
            if ($exceptionClass == 'Deployer\Task\NonFatalException') {
260
261
                // If we got NonFatalException, continue other tasks.
262
                $this->hasNonFatalException = true;
263
            } else {
264
265
                // Do not run other task.
266
                // Finish all current worker tasks and stop loop.
267
                $this->tasks = [];
268
269 2
                // Worker will not mark this task as done (remove self server name from `tasks_to_do` list),
270
                // so to finish current task execution we need to manually remove it from that list.
271
                $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
272
                $taskToDoStorage->delete($serverName);
273
            }
274
        }
275 2
    }
276
277 2
    /**
278 2
     * Action time for master! Send tasks `to-do` for workers and go to sleep.
279
     * Also decide when to stop server/loop.
280
     */
281 2
    public function sendTasks()
282 2
    {
283 2
        if (!$this->wait) {
284
            if (count($this->tasks) > 0) {
285 2
286
                // Get task name to do.
287 2
                $task = current($this->tasks);
288 2
                $taskName = $task->getName();
289 2
                array_shift($this->tasks);
290 2
291 2
                $this->informer->startTask($taskName);
292
293 2
                if ($task->isOnce()) {
294 2
                    $task->run(new Context(null, new Environment(), $this->input, $this->output));
295 2
                    $this->informer->endTask();
296
                } else {
297 2
                    $this->tasksToDo = [];
298 1
299
                    foreach ($this->servers as $serverName => $server) {
300
                        if ($task->runOnServer($serverName)) {
301 1
                            $env = isset($this->environments[$serverName]) ? $this->environments[$serverName] : $this->environments[$serverName] = new Environment();
302 1
303 1 View Code Duplication
                            if (count($task->getOnlyForStage()) > 0 && (!$env->has('stages') || !$task->runForStages($env->get('stages')))) {
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
304 2
                                continue;
305
                            }
306
307 2
                            $this->informer->onServer($serverName);
308 2
                            $this->tasksToDo[$serverName] = $taskName;
309 2
                        }
310
                    }
311 2
312
                    // Inform all workers what tasks they need to do.
313 2
                    $taskToDoStorage = new ArrayStorage();
314 2
                    $taskToDoStorage->push($this->tasksToDo);
315
                    $this->pure->setStorage('tasks_to_do', $taskToDoStorage);
316 2
317 2
                    $this->wait = true;
318
                }
319
            } else {
320
                $this->loop->stop();
321
            }
322 2
        }
323
    }
324 2
325 2
    /**
326
     * While idle master, print information about finished tasks.
327 2
     */
328 1
    public function idle()
329 1
    {
330 1
        if ($this->wait) {
331 1
            $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
332 2
333
            foreach ($this->tasksToDo as $serverName => $taskName) {
334 2
                if (!$taskToDoStorage->has($serverName)) {
335 2
                    $this->informer->endOnServer($serverName);
336 2
                    unset($this->tasksToDo[$serverName]);
337 2
                }
338
            }
339
340
            if (count($taskToDoStorage) === 0) {
341
                if ($this->isSuccessfullyFinished) {
342
                    $this->informer->endTask();
343 2
                } else {
344
                    $this->informer->taskError($this->hasNonFatalException);
345
                    // TODO: Get rid of hard dependency. Use DI container.
346 2
                    \Deployer\dispatcher()->dispatch('error');
347 2
                }
348 2
                
349 2
                // We waited all workers to finish their tasks.
350
                // Wait no more!
351
                $this->wait = false;
352
353
                // Reset to default for next tasks.
354
                $this->isSuccessfullyFinished = true;
355
            }
356
        }
357
    }
358
}
359