1
|
|
|
<?php |
2
|
|
|
/* (c) Anton Medvedev <[email protected]> |
3
|
|
|
* |
4
|
|
|
* For the full copyright and license information, please view the LICENSE |
5
|
|
|
* file that was distributed with this source code. |
6
|
|
|
*/ |
7
|
|
|
|
8
|
|
|
namespace Deployer\Executor; |
9
|
|
|
|
10
|
|
|
use Deployer\Console\Output\OutputWatcher; |
11
|
|
|
use Deployer\Console\Output\VerbosityString; |
12
|
|
|
use Deployer\Server\Environment; |
13
|
|
|
use Deployer\Task\Context; |
14
|
|
|
use Pure\Server; |
15
|
|
|
use Pure\Storage\ArrayStorage; |
16
|
|
|
use Pure\Storage\QueueStorage; |
17
|
|
|
use React\Socket\ConnectionException; |
18
|
|
|
use Symfony\Component\Console\Input\ArrayInput; |
19
|
|
|
use Symfony\Component\Console\Input\InputDefinition; |
20
|
|
|
use Symfony\Component\Process\Process; |
21
|
|
|
|
22
|
|
|
class ParallelExecutor implements ExecutorInterface |
23
|
|
|
{ |
24
|
|
|
/** |
25
|
|
|
* Try to start server on this port. |
26
|
|
|
*/ |
27
|
|
|
const START_PORT = 3333; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* If fails on start port, try until stop port. |
31
|
|
|
*/ |
32
|
|
|
const STOP_PORT = 3340; |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @var InputDefinition |
36
|
|
|
*/ |
37
|
|
|
private $userDefinition; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* @var \Deployer\Task\Task[] |
41
|
|
|
*/ |
42
|
|
|
private $tasks; |
43
|
|
|
|
44
|
|
|
/** |
45
|
|
|
* @var \Deployer\Server\ServerInterface[] |
46
|
|
|
*/ |
47
|
|
|
private $servers; |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* @var \Deployer\Server\Environment[] |
51
|
|
|
*/ |
52
|
|
|
private $environments; |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* @var \Symfony\Component\Console\Input\InputInterface |
56
|
|
|
*/ |
57
|
|
|
private $input; |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* @var \Symfony\Component\Console\Output\OutputInterface |
61
|
|
|
*/ |
62
|
|
|
private $output; |
63
|
|
|
|
64
|
|
|
/** |
65
|
|
|
* @var Informer |
66
|
|
|
*/ |
67
|
|
|
private $informer; |
68
|
|
|
|
69
|
|
|
/** |
70
|
|
|
* @var int |
71
|
|
|
*/ |
72
|
|
|
private $port; |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* @var Server |
76
|
|
|
*/ |
77
|
|
|
private $pure; |
78
|
|
|
|
79
|
|
|
/** |
80
|
|
|
* @var \React\EventLoop\LoopInterface |
81
|
|
|
*/ |
82
|
|
|
private $loop; |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* Wait until all workers finish they tasks. When set this variable to true and send new tasks to workers. |
86
|
|
|
* |
87
|
|
|
* @var bool |
88
|
|
|
*/ |
89
|
|
|
private $wait = false; |
90
|
|
|
|
91
|
|
|
/** |
92
|
|
|
* @var QueueStorage |
93
|
|
|
*/ |
94
|
|
|
private $outputStorage; |
95
|
|
|
|
96
|
|
|
/** |
97
|
|
|
* @var QueueStorage |
98
|
|
|
*/ |
99
|
|
|
private $exceptionStorage; |
100
|
|
|
|
101
|
|
|
/** |
102
|
|
|
* Array will contain tasks list what workers has to before moving to next task. |
103
|
|
|
* |
104
|
|
|
* @var array |
105
|
|
|
*/ |
106
|
|
|
private $tasksToDo = []; |
107
|
|
|
|
108
|
|
|
/** |
109
|
|
|
* Check if current task was successfully finished on all server (no exception was triggered). |
110
|
|
|
* |
111
|
|
|
* @var bool |
112
|
|
|
*/ |
113
|
|
|
private $isSuccessfullyFinished = true; |
114
|
|
|
|
115
|
|
|
/** |
116
|
|
|
* Check if current task triggered a non-fatal exception. |
117
|
|
|
* |
118
|
|
|
* @var bool |
119
|
|
|
*/ |
120
|
|
|
private $hasNonFatalException = false; |
121
|
|
|
|
122
|
|
|
/** |
123
|
|
|
* @param InputDefinition $userDefinition |
124
|
|
|
*/ |
125
|
2 |
|
public function __construct(InputDefinition $userDefinition) |
126
|
|
|
{ |
127
|
2 |
|
$this->userDefinition = $userDefinition; |
128
|
2 |
|
} |
129
|
|
|
|
130
|
|
|
/** |
131
|
|
|
* {@inheritdoc} |
132
|
|
|
*/ |
133
|
2 |
|
public function run($tasks, $servers, $environments, $input, $output) |
134
|
|
|
{ |
135
|
2 |
|
$this->tasks = $tasks; |
136
|
2 |
|
$this->servers = $servers; |
137
|
2 |
|
$this->environments = $environments; |
138
|
2 |
|
$this->input = $input; |
139
|
2 |
|
$this->output = new OutputWatcher($output); |
140
|
2 |
|
$this->informer = new Informer($this->output); |
141
|
2 |
|
$this->port = self::START_PORT; |
142
|
|
|
|
143
|
|
|
connect: |
144
|
|
|
|
145
|
2 |
|
$this->pure = new Server($this->port); |
146
|
2 |
|
$this->loop = $this->pure->getLoop(); |
147
|
|
|
|
148
|
|
|
// Start workers for each server. |
149
|
2 |
|
$this->loop->addTimer(0, [$this, 'startWorkers']); |
150
|
|
|
|
151
|
|
|
// Wait for output |
152
|
2 |
|
$this->outputStorage = $this->pure['output'] = new QueueStorage(); |
153
|
2 |
|
$this->loop->addPeriodicTimer(0, [$this, 'catchOutput']); |
154
|
|
|
|
155
|
|
|
// Lookup for exception |
156
|
2 |
|
$this->exceptionStorage = $this->pure['exception'] = new QueueStorage(); |
157
|
2 |
|
$this->loop->addPeriodicTimer(0, [$this, 'catchExceptions']); |
158
|
|
|
|
159
|
|
|
// Send workers tasks to do. |
160
|
2 |
|
$this->loop->addPeriodicTimer(0, [$this, 'sendTasks']); |
161
|
|
|
|
162
|
|
|
// Wait all workers finish they tasks. |
163
|
2 |
|
$this->loop->addPeriodicTimer(0, [$this, 'idle']); |
164
|
|
|
|
165
|
|
|
// Start loop |
166
|
|
|
try { |
167
|
2 |
|
$this->pure->run(); |
168
|
2 |
|
} catch (ConnectionException $exception) { |
169
|
|
|
// If port is already used, try with another one. |
170
|
1 |
|
$output->writeln("<error>" . $exception->getMessage() . "</error>"); |
171
|
|
|
|
172
|
1 |
|
if (++$this->port <= self::STOP_PORT) { |
173
|
1 |
|
goto connect; |
174
|
|
|
} |
175
|
|
|
} |
176
|
2 |
|
} |
177
|
|
|
|
178
|
|
|
/** |
179
|
|
|
* Start workers, put master port, server name to run on, and options stuff. |
180
|
|
|
*/ |
181
|
2 |
|
public function startWorkers() |
182
|
|
|
{ |
183
|
|
|
// Get verbosity. |
184
|
2 |
|
$verbosity = new VerbosityString($this->output); |
185
|
2 |
|
|
186
|
2 |
|
// Get current deploy.php file. |
187
|
|
|
$deployPhpFile = $this->input->getOption('file'); |
188
|
|
|
|
189
|
2 |
|
// User input. |
190
|
|
|
$input = ''; |
191
|
|
|
|
192
|
2 |
|
// Get user arguments. |
193
|
|
|
foreach ($this->userDefinition->getArguments() as $argument) { |
194
|
|
|
$value = $this->input->getArgument($argument->getName()); |
195
|
2 |
|
|
196
|
2 |
|
if ($value) { |
197
|
2 |
|
$input .= " $value"; |
198
|
|
|
} |
199
|
|
|
} |
200
|
2 |
|
|
201
|
2 |
|
// Get user options. |
202
|
2 |
|
foreach ($this->userDefinition->getOptions() as $option) { |
203
|
|
|
$value = $this->input->getOption($option->getName()); |
204
|
2 |
|
|
205
|
2 |
|
if ($value) { |
206
|
|
|
$input .= " --{$option->getName()} $value"; |
207
|
2 |
|
} |
208
|
2 |
|
} |
209
|
2 |
|
|
210
|
2 |
|
foreach ($this->servers as $serverName => $server) { |
211
|
2 |
|
$process = new Process( |
212
|
2 |
|
"php " . DEPLOYER_BIN . |
213
|
|
|
(null === $deployPhpFile ? "" : " --file=$deployPhpFile") . |
214
|
2 |
|
" worker " . |
215
|
2 |
|
" --master 127.0.0.1:{$this->port}" . |
216
|
2 |
|
" --server $serverName" . |
217
|
2 |
|
" $input " . |
218
|
2 |
|
" $verbosity" . |
219
|
|
|
" &" |
220
|
|
|
); |
221
|
|
|
$process->disableOutput(); |
222
|
|
|
$process->start(); |
223
|
2 |
|
} |
224
|
|
|
} |
225
|
2 |
|
|
226
|
1 |
|
/** |
227
|
|
|
* Wait for output from workers. |
228
|
|
|
*/ |
229
|
1 |
|
public function catchOutput() |
230
|
1 |
|
{ |
231
|
1 |
|
while (count($this->outputStorage) > 0) { |
232
|
1 |
|
list($server, $messages, , $type) = $this->outputStorage->pop(); |
233
|
1 |
|
|
234
|
|
|
$format = function ($message) use ($server) { |
235
|
1 |
|
$message = rtrim($message, "\n"); |
236
|
1 |
|
return implode("\n", array_map(function ($text) use ($server) { |
237
|
2 |
|
return "[$server] $text"; |
238
|
|
|
}, explode("\n", $message))); |
239
|
|
|
}; |
240
|
|
|
|
241
|
|
|
$this->output->writeln(array_map($format, (array)$messages), $type); |
242
|
2 |
|
} |
243
|
|
|
} |
244
|
2 |
|
|
245
|
|
|
/** |
246
|
|
|
* Wait for exceptions from workers. |
247
|
|
|
*/ |
248
|
|
|
public function catchExceptions() |
249
|
|
|
{ |
250
|
|
|
while (count($this->exceptionStorage) > 0) { |
251
|
|
|
list($serverName, $exceptionClass, $message) = $this->exceptionStorage->pop(); |
252
|
|
|
|
253
|
|
|
// Print exception message. |
254
|
|
|
$this->informer->taskException($serverName, $exceptionClass, $message); |
255
|
|
|
|
256
|
|
|
// We got some exception, so not. |
257
|
|
|
$this->isSuccessfullyFinished = false; |
258
|
|
|
|
259
|
|
|
if ($exceptionClass == 'Deployer\Task\NonFatalException') { |
260
|
|
|
|
261
|
|
|
// If we got NonFatalException, continue other tasks. |
262
|
|
|
$this->hasNonFatalException = true; |
263
|
|
|
} else { |
264
|
|
|
|
265
|
|
|
// Do not run other task. |
266
|
|
|
// Finish all current worker tasks and stop loop. |
267
|
|
|
$this->tasks = []; |
268
|
|
|
|
269
|
2 |
|
// Worker will not mark this task as done (remove self server name from `tasks_to_do` list), |
270
|
|
|
// so to finish current task execution we need to manually remove it from that list. |
271
|
|
|
$taskToDoStorage = $this->pure->getStorage('tasks_to_do'); |
272
|
|
|
$taskToDoStorage->delete($serverName); |
273
|
|
|
} |
274
|
|
|
} |
275
|
2 |
|
} |
276
|
|
|
|
277
|
2 |
|
/** |
278
|
2 |
|
* Action time for master! Send tasks `to-do` for workers and go to sleep. |
279
|
|
|
* Also decide when to stop server/loop. |
280
|
|
|
*/ |
281
|
2 |
|
public function sendTasks() |
282
|
2 |
|
{ |
283
|
2 |
|
if (!$this->wait) { |
284
|
|
|
if (count($this->tasks) > 0) { |
285
|
2 |
|
|
286
|
|
|
// Get task name to do. |
287
|
2 |
|
$task = current($this->tasks); |
288
|
2 |
|
$taskName = $task->getName(); |
289
|
2 |
|
array_shift($this->tasks); |
290
|
2 |
|
|
291
|
2 |
|
$this->informer->startTask($taskName); |
292
|
|
|
|
293
|
2 |
|
if ($task->isOnce()) { |
294
|
2 |
|
$task->run(new Context(null, new Environment(), $this->input, $this->output)); |
295
|
2 |
|
$this->informer->endTask(); |
296
|
|
|
} else { |
297
|
2 |
|
$this->tasksToDo = []; |
298
|
1 |
|
|
299
|
|
|
foreach ($this->servers as $serverName => $server) { |
300
|
|
|
if ($task->runOnServer($serverName)) { |
301
|
1 |
|
$env = isset($this->environments[$serverName]) ? $this->environments[$serverName] : $this->environments[$serverName] = new Environment(); |
302
|
1 |
|
|
303
|
1 |
View Code Duplication |
if (count($task->getOnlyForStage()) > 0 && (!$env->has('stages') || !$task->runForStages($env->get('stages')))) { |
|
|
|
|
304
|
2 |
|
continue; |
305
|
|
|
} |
306
|
|
|
|
307
|
2 |
|
$this->informer->onServer($serverName); |
308
|
2 |
|
$this->tasksToDo[$serverName] = $taskName; |
309
|
2 |
|
} |
310
|
|
|
} |
311
|
2 |
|
|
312
|
|
|
// Inform all workers what tasks they need to do. |
313
|
2 |
|
$taskToDoStorage = new ArrayStorage(); |
314
|
2 |
|
$taskToDoStorage->push($this->tasksToDo); |
315
|
|
|
$this->pure->setStorage('tasks_to_do', $taskToDoStorage); |
316
|
2 |
|
|
317
|
2 |
|
$this->wait = true; |
318
|
|
|
} |
319
|
|
|
} else { |
320
|
|
|
$this->loop->stop(); |
321
|
|
|
} |
322
|
2 |
|
} |
323
|
|
|
} |
324
|
2 |
|
|
325
|
2 |
|
/** |
326
|
|
|
* While idle master, print information about finished tasks. |
327
|
2 |
|
*/ |
328
|
1 |
|
public function idle() |
329
|
1 |
|
{ |
330
|
1 |
|
if ($this->wait) { |
331
|
1 |
|
$taskToDoStorage = $this->pure->getStorage('tasks_to_do'); |
332
|
2 |
|
|
333
|
|
|
foreach ($this->tasksToDo as $serverName => $taskName) { |
334
|
2 |
|
if (!$taskToDoStorage->has($serverName)) { |
335
|
2 |
|
$this->informer->endOnServer($serverName); |
336
|
2 |
|
unset($this->tasksToDo[$serverName]); |
337
|
2 |
|
} |
338
|
|
|
} |
339
|
|
|
|
340
|
|
|
if (count($taskToDoStorage) === 0) { |
341
|
|
|
if ($this->isSuccessfullyFinished) { |
342
|
|
|
$this->informer->endTask(); |
343
|
2 |
|
} else { |
344
|
|
|
$this->informer->taskError($this->hasNonFatalException); |
345
|
|
|
// TODO: Get rid of hard dependency. Use DI container. |
346
|
2 |
|
\Deployer\dispatcher()->dispatch('error'); |
347
|
2 |
|
} |
348
|
2 |
|
|
349
|
2 |
|
// We waited all workers to finish their tasks. |
350
|
|
|
// Wait no more! |
351
|
|
|
$this->wait = false; |
352
|
|
|
|
353
|
|
|
// Reset to default for next tasks. |
354
|
|
|
$this->isSuccessfullyFinished = true; |
355
|
|
|
} |
356
|
|
|
} |
357
|
|
|
} |
358
|
|
|
} |
359
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.