GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( 07fd04...6d3d42 )
by Anton
01:59
created

ParallelExecutor   A

Complexity

Total Complexity 34

Size/Duplication

Total Lines 225
Duplicated Lines 5.33 %

Coupling/Cohesion

Components 1
Dependencies 10

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
dl 12
loc 225
ccs 0
cts 142
cp 0
rs 9.68
c 0
b 0
f 0
wmc 34
lcom 1
cbo 10

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 16 1
A gatherExitCodes() 0 10 3
A connect() 6 27 5
B runTask() 6 31 6
A getProcess() 0 13 3
A startProcesses() 0 6 2
A areRunning() 0 10 3
A gatherOutput() 0 14 4
A persistHosts() 0 16 2
A run() 0 31 5

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php declare(strict_types=1);
2
/* (c) Anton Medvedev <[email protected]>
3
 *
4
 * For the full copyright and license information, please view the LICENSE
5
 * file that was distributed with this source code.
6
 */
7
8
namespace Deployer\Executor;
9
10
use Deployer\Collection\PersistentCollection;
11
use Deployer\Configuration\Configuration;
12
use Deployer\Console\Application;
13
use Deployer\Deployer;
14
use Deployer\Exception\Exception;
15
use Deployer\Exception\GracefulShutdownException;
16
use Deployer\Host\Host;
17
use Deployer\Host\Localhost;
18
use Deployer\Host\Storage;
19
use Deployer\Component\Ssh\Client;
20
use Deployer\Task\Context;
21
use Deployer\Task\Task;
22
use Symfony\Component\Console\Input\InputInterface;
23
use Symfony\Component\Console\Output\OutputInterface;
24
use Symfony\Component\Process\Process;
25
26
const FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
27
28
function spinner($message = '')
29
{
30
    $frame = FRAMES[(int)(microtime(true) * 10) % count(FRAMES)];
31
    return "  $frame $message\r";
32
}
33
34
class ParallelExecutor
35
{
36
    private $input;
37
    private $output;
38
    private $informer;
39
    private $console;
40
    private $client;
41
    private $config;
42
43
    public function __construct(
44
        InputInterface $input,
45
        OutputInterface $output,
46
        Status $informer,
47
        Application $console,
48
        Client $client,
49
        Configuration $config
50
    )
51
    {
52
        $this->input = $input;
53
        $this->output = $output;
54
        $this->informer = $informer;
55
        $this->console = $console;
56
        $this->client = $client;
57
        $this->config = $config;
58
    }
59
60
    /**
61
     * @param Host[] $hosts
62
     */
63
    private function connect(array $hosts)
64
    {
65 View Code Duplication
        $callback = function (string $output) {
66
            $output = preg_replace('/\n$/', '', $output);
67
            if (strlen($output) !== 0) {
68
                $this->output->writeln($output);
69
            }
70
        };
71
72
        // Connect to each host sequentially, to prevent getting locked.
73
        foreach ($hosts as $host) {
74
            if ($host instanceof Localhost) {
75
                continue;
76
            }
77
            $process = $this->getProcess($host, new Task('connect'));
78
            $process->start();
79
80
            while ($process->isRunning()) {
81
                $this->gatherOutput([$process], $callback);
82
                $this->output->write(spinner(str_pad("connect {$host->tag()}", intval(getenv('COLUMNS')) - 1)));
83
                usleep(1000);
84
            }
85
        }
86
87
        // Clear spinner.
88
        $this->output->write(str_repeat(' ', intval(getenv('COLUMNS')) - 1) . "\r");
89
    }
90
91
    /**
92
     * @param Task[] $tasks
93
     * @param Host[] $hosts
94
     */
95
    public function run(array $tasks, array $hosts): int
96
    {
97
        $this->persistHosts($hosts);
98
        $this->connect($hosts);
99
100
        $localhost = new Localhost();
0 ignored issues
show
Unused Code introduced by
$localhost is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
101
        $limit = (int)$this->input->getOption('limit') ?: count($hosts);
102
103
        foreach ($tasks as $task) {
104
            $this->informer->startTask($task);
105
106
//            if ($task->isLocal()) {
107
//                Storage::load(...$hosts);
108
//                {
109
//                    $task->run(new Context($localhost, $this->input, $this->output));
110
//                }
111
//                Storage::flush(...$hosts);
112
//            } else {
113
                foreach (array_chunk($hosts, $limit) as $chunk) {
114
                    $exitCode = $this->runTask($chunk, $task);
115
                    if ($exitCode !== 0) {
116
                        return $exitCode;
117
                    }
118
                }
119
//            }
120
121
            $this->informer->endTask($task);
122
        }
123
124
        return 0;
125
    }
126
127
    private function runTask(array $hosts, Task $task): int
128
    {
129
        $processes = [];
130
        foreach ($hosts as $host) {
131
            if ($task->shouldBePerformed($host)) {
132
                $processes[] = $this->getProcess($host, $task);
133
                if ($task->isOnce()) {
134
                    $task->setHasRun();
135
                }
136
            }
137
        }
138
139 View Code Duplication
        $callback = function (string $output) {
140
            $output = preg_replace('/\n$/', '', $output);
141
            if (strlen($output) !== 0) {
142
                $this->output->writeln($output);
143
            }
144
        };
145
146
        $this->startProcesses($processes);
147
148
        while ($this->areRunning($processes)) {
149
            $this->gatherOutput($processes, $callback);
150
            $this->output->write(spinner());
151
            usleep(1000);
152
        }
153
        $this->output->write("\r");
154
        $this->gatherOutput($processes, $callback);
155
156
        return $this->gatherExitCodes($processes);
157
    }
158
159
    protected function getProcess(Host $host, Task $task): Process
160
    {
161
        $dep = PHP_BINARY . ' ' . DEPLOYER_BIN;
162
        $configFile = $host->get('worker-config');
163
        $decorated = $this->output->isDecorated() ? '--decorated' : '';
164
        $command = "$dep worker $task {$host->alias()} $configFile {$this->input} $decorated";
165
166
        if ($this->output->isDebug()) {
167
            $this->output->writeln("[{$host->tag()}] $command");
168
        }
169
170
        return Process::fromShellCommandline($command);
171
    }
172
173
    /**
174
     * Start all of the processes.
175
     *
176
     * @param Process[] $processes
177
     * @return void
178
     */
179
    protected function startProcesses(array $processes)
180
    {
181
        foreach ($processes as $process) {
182
            $process->start();
183
        }
184
    }
185
186
    /**
187
     * Determine if any of the processes are running.
188
     *
189
     * @param Process[] $processes
190
     * @return bool
191
     */
192
    protected function areRunning(array $processes): bool
193
    {
194
        foreach ($processes as $process) {
195
            if ($process->isRunning()) {
196
                return true;
197
            }
198
        }
199
200
        return false;
201
    }
202
203
    /**
204
     * Gather the output from all of the processes.
205
     *
206
     * @param Process[] $processes
207
     * @param callable $callback
208
     * @return void
209
     */
210
    protected function gatherOutput(array $processes, callable $callback)
211
    {
212
        foreach ($processes as $process) {
213
            $output = $process->getIncrementalOutput();
214
            if (strlen($output) !== 0) {
215
                $callback($output);
216
            }
217
218
            $errorOutput = $process->getIncrementalErrorOutput();
219
            if (strlen($errorOutput) !== 0) {
220
                $callback($errorOutput);
221
            }
222
        }
223
    }
224
225
    /**
226
     * Gather the cumulative exit code for the processes.
227
     */
228
    protected function gatherExitCodes(array $processes): int
229
    {
230
        foreach ($processes as $process) {
231
            if ($process->getExitCode() > 0) {
232
                return $process->getExitCode();
233
            }
234
        }
235
236
        return 0;
237
    }
238
239
    /**
240
     * @param Host[] $hosts
241
     */
242
    private function persistHosts(array $hosts)
243
    {
244
        foreach ($hosts as $host) {
245
            Context::push(new Context($host, $this->input, $this->output));
246
247
            $values = $host->getConfig()->persist();
248
            $workerConfig = sys_get_temp_dir() . '/' . uniqid('deployer-') . '-' . $host->alias() . '.dep';
249
            $values['worker-config'] = $workerConfig;
250
251
            $persistentCollection = new PersistentCollection($workerConfig, $values);
252
            $persistentCollection->flush();
253
            $host->getConfig()->setCollection($persistentCollection);
254
255
            Context::pop();
256
        }
257
    }
258
}
259