GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 99078a...9605c1 )
by Oanh
02:49
created

ParallelExecutor::sendTasks()   D

Complexity

Conditions 10
Paths 9

Size

Total Lines 43
Code Lines 25

Duplication

Lines 3
Ratio 6.98 %

Code Coverage

Tests 26
CRAP Score 10.1105

Importance

Changes 2
Bugs 0 Features 1
Metric Value
c 2
b 0
f 1
dl 3
loc 43
ccs 26
cts 29
cp 0.8966
rs 4.8196
cc 10
eloc 25
nc 9
nop 0
crap 10.1105

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Console\Output\OutputWatcher;
11
use Deployer\Console\Output\VerbosityString;
12
use Deployer\Server\Environment;
13
use Deployer\Task\Context;
14
use Pure\Server;
15
use Pure\Storage\ArrayStorage;
16
use Pure\Storage\QueueStorage;
17
use React\Socket\ConnectionException;
18
use Symfony\Component\Console\Input\ArrayInput;
19
use Symfony\Component\Console\Input\InputDefinition;
20
use Symfony\Component\Process\Process;
21
22
class ParallelExecutor implements ExecutorInterface
23
{
24
    /**
25
     * Try to start server on this port.
26
     */
27
    const START_PORT = 3333;
28
29
    /**
30
     * If fails on start port, try until stop port.
31
     */
32
    const STOP_PORT = 3340;
33
34
    /**
35
     * @var InputDefinition
36
     */
37
    private $userDefinition;
38
39
    /**
40
     * @var \Deployer\Task\Task[]
41
     */
42
    private $tasks;
43
44
    /**
45
     * @var \Deployer\Server\ServerInterface[]
46
     */
47
    private $servers;
48
49
    /**
50
     * @var \Deployer\Server\Environment[]
51
     */
52
    private $environments;
53
54
    /**
55
     * @var \Symfony\Component\Console\Input\InputInterface
56
     */
57
    private $input;
58
59
    /**
60
     * @var \Symfony\Component\Console\Output\OutputInterface
61
     */
62
    private $output;
63
64
    /**
65
     * @var Informer
66
     */
67
    private $informer;
68
69
    /**
70
     * @var int
71
     */
72
    private $port;
73
74
    /**
75
     * @var Server
76
     */
77
    private $pure;
78
79
    /**
80
     * @var \React\EventLoop\LoopInterface
81
     */
82
    private $loop;
83
84
    /**
85
     * Wait until all workers finish they tasks. When set this variable to true and send new tasks to workers.
86
     *
87
     * @var bool
88
     */
89
    private $wait = false;
90
91
    /**
92
     * @var QueueStorage
93
     */
94
    private $outputStorage;
95
96
    /**
97
     * @var QueueStorage
98
     */
99
    private $exceptionStorage;
100
101
    /**
102
     * Array will contain tasks list what workers has to before moving to next task.
103
     *
104
     * @var array
105
     */
106
    private $tasksToDo = [];
107
108
    /**
109
     * Check if current task was successfully finished on all server (no exception was triggered).
110
     *
111
     * @var bool
112
     */
113
    private $isSuccessfullyFinished = true;
114
115
    /**
116
     * Check if current task triggered a non-fatal exception.
117
     *
118
     * @var bool
119
     */
120
    private $hasNonFatalException = false;
121
122
    /**
123
     * @param InputDefinition $userDefinition
124
     */
125 2
    public function __construct(InputDefinition $userDefinition)
126
    {
127 2
        $this->userDefinition = $userDefinition;
128 2
    }
129
    
130
    /**
131
     * {@inheritdoc}
132
     */
133 2
    public function run($tasks, $servers, $environments, $input, $output)
134
    {
135 2
        $this->tasks = $tasks;
136 2
        $this->servers = $servers;
137 2
        $this->environments = $environments;
138 2
        $this->input = $input;
139 2
        $this->output = new OutputWatcher($output);
140 2
        $this->informer = new Informer($this->output);
141 2
        $this->port = self::START_PORT;
142
143
        connect:
144
145 2
        $this->pure = new Server($this->port);
146 2
        $this->loop = $this->pure->getLoop();
147
148
        // Start workers for each server.
149 2
        $this->loop->addTimer(0, [$this, 'startWorkers']);
150
151
        // Wait for output
152 2
        $this->outputStorage = $this->pure['output'] = new QueueStorage();
153 2
        $this->loop->addPeriodicTimer(0, [$this, 'catchOutput']);
154
155
        // Lookup for exception
156 2
        $this->exceptionStorage = $this->pure['exception'] = new QueueStorage();
157 2
        $this->loop->addPeriodicTimer(0, [$this, 'catchExceptions']);
158
159
        // Send workers tasks to do.
160 2
        $this->loop->addPeriodicTimer(0, [$this, 'sendTasks']);
161
162
        // Wait all workers finish they tasks.
163 2
        $this->loop->addPeriodicTimer(0, [$this, 'idle']);
164
165
        // Start loop
166
        try {
167 2
            $this->pure->run();
168 2
        } catch (ConnectionException $exception) {
169
            // If port is already used, try with another one.
170 1
            $output->writeln("<error>" . $exception->getMessage() . "</error>");
171
172 1
            if (++$this->port <= self::STOP_PORT) {
173 1
                goto connect;
174
            }
175
        }
176 2
    }
177
178
    /**
179
     * Start workers, put master port, server name to run on, and options stuff.
180
     */
181 2
    public function startWorkers()
182
    {
183
        $input = [
184 2
            '--master' => '127.0.0.1:' . $this->port,
185 2
            '--server' => '',
186 2
        ];
187
        
188
        // Get verbosity.
189 2
        $verbosity = new VerbosityString($this->output);
190
191
        // Get current deploy.php file.
192 2
        $deployPhpFile = $this->input->getOption('file');
193
194
        // Get user arguments.
195 2
        foreach ($this->userDefinition->getArguments() as $argument) {
196 2
            $input[$argument->getName()] = $this->input->getArgument($argument->getName());
197 2
        }
198
199
        // Get user options.
200 2
        foreach ($this->userDefinition->getOptions() as $option) {
201 2
            $input["--" . $option->getName()] = $this->input->getOption($option->getName());
202 2
        }
203
        
204 2
        foreach ($this->servers as $serverName => $server) {
205 2
            $input['--server'] = $serverName;
206
            
207 2
            $process = new Process(
208 2
                "php " . DEPLOYER_BIN .
209 2
                (null === $deployPhpFile ? "" : " --file=$deployPhpFile") .
210 2
                " worker " .
211 2
                new ArrayInput($input) .
212 2
                " $verbosity" .
213
                " &"
214 2
            );
215 2
            $process->disableOutput();
216 2
            $process->run();
217 2
        }
218 2
    }
219
220
    /**
221
     * Wait for output from workers.
222
     */
223 2
    public function catchOutput()
224
    {
225 2
        while (count($this->outputStorage) > 0) {
226 2
            list($server, $messages, , $type) = $this->outputStorage->pop();
227
228
            $format = function ($message) use ($server) {
229 2
                $message = rtrim($message, "\n");
230 2
                return implode("\n", array_map(function ($text) use ($server) {
231 2
                    return "[$server] $text";
232 2
                }, explode("\n", $message)));
233
234 2
            };
235
236 2
            $this->output->writeln(array_map($format, (array)$messages), $type);
237 2
        }
238 2
    }
239
240
    /**
241
     * Wait for exceptions from workers.
242
     */
243 2
    public function catchExceptions()
244
    {
245 2
        while (count($this->exceptionStorage) > 0) {
246
            list($serverName, $exceptionClass, $message) = $this->exceptionStorage->pop();
247
248
            // Print exception message.
249
            $this->informer->taskException($serverName, $exceptionClass, $message);
250
251
            // We got some exception, so not.
252
            $this->isSuccessfullyFinished = false;
253
            
254
            if ($exceptionClass == 'Deployer\Task\NonFatalException') {
255
256
                // If we got NonFatalException, continue other tasks.
257
                $this->hasNonFatalException = true;
258
            } else {
259
260
                // Do not run other task.
261
                // Finish all current worker tasks and stop loop.
262
                $this->tasks = [];
263
264
                // Worker will not mark this task as done (remove self server name from `tasks_to_do` list),
265
                // so to finish current task execution we need to manually remove it from that list.
266
                $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
267
                $taskToDoStorage->delete($serverName);
268
            }
269
        }
270 2
    }
271
272
    /**
273
     * Action time for master! Send tasks `to-do` for workers and go to sleep.
274
     * Also decide when to stop server/loop.
275
     */
276 2
    public function sendTasks()
277
    {
278 2
        if (!$this->wait) {
279 2
            if (count($this->tasks) > 0) {
280
281
                // Get task name to do.
282 2
                $task = current($this->tasks);
283 2
                $taskName = $task->getName();
284 2
                array_shift($this->tasks);
285
286 2
                $this->informer->startTask($taskName);
287
288 2
                if ($task->isOnce()) {
289
                    $task->run(new Context(null, null, $this->input, $this->output));
290
                    $this->informer->endTask();
291
                } else {
292 2
                    $this->tasksToDo = [];
293
294 2
                    foreach ($this->servers as $serverName => $server) {
295 2
                        if ($task->runOnServer($serverName)) {
296 2
                            $env = isset($this->environments[$serverName]) ? $this->environments[$serverName] : $this->environments[$serverName] = new Environment();
297
298 2 View Code Duplication
                            if (count($task->getOnlyForStage()) > 0 && (!$env->has('stages') || !$task->runForStages($env->get('stages')))) {
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
299 1
                                continue;
300
                            }
301
302 2
                            $this->informer->onServer($serverName);
303 2
                            $this->tasksToDo[$serverName] = $taskName;
304 2
                        }
305 2
                    }
306
307
                    // Inform all workers what tasks they need to do.
308 2
                    $taskToDoStorage = new ArrayStorage();
309 2
                    $taskToDoStorage->push($this->tasksToDo);
310 2
                    $this->pure->setStorage('tasks_to_do', $taskToDoStorage);
311
312 2
                    $this->wait = true;
313
                }
314 2
            } else {
315 2
                $this->loop->stop();
316
            }
317 2
        }
318 2
    }
319
320
    /**
321
     * While idle master, print information about finished tasks.
322
     */
323 2
    public function idle()
324
    {
325 2
        if ($this->wait) {
326 2
            $taskToDoStorage = $this->pure->getStorage('tasks_to_do');
327
328 2
            foreach ($this->tasksToDo as $serverName => $taskName) {
329 2
                if (!$taskToDoStorage->has($serverName)) {
330 2
                    $this->informer->endOnServer($serverName);
331 2
                    unset($this->tasksToDo[$serverName]);
332 2
                }
333 2
            }
334
335 2
            if (count($taskToDoStorage) === 0) {
336 2
                if ($this->isSuccessfullyFinished) {
337 2
                    $this->informer->endTask();
338 2
                } else {
339
                    $this->informer->taskError($this->hasNonFatalException);
340
                }
341
                
342
                // We waited all workers to finish their tasks.
343
                // Wait no more!
344 2
                $this->wait = false;
345
346
                // Reset to default for next tasks.
347 2
                $this->isSuccessfullyFinished = true;
348 2
            }
349 2
        }
350 2
    }
351
}
352