1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Saxulum\ProcessesExecutor; |
4
|
|
|
|
5
|
|
|
use Psr\Log\LoggerInterface; |
6
|
|
|
use Psr\Log\NullLogger; |
7
|
|
|
use Symfony\Component\Process\Process; |
8
|
|
|
|
9
|
|
|
final class ProcessesExecutor implements ProcessesExecutorInterface |
10
|
|
|
{ |
11
|
|
|
/** |
12
|
|
|
* @var LoggerInterface |
13
|
|
|
*/ |
14
|
|
|
private $logger; |
15
|
|
|
|
16
|
|
|
/** |
17
|
|
|
* @param LoggerInterface|null $logger |
18
|
|
|
*/ |
19
|
7 |
|
public function __construct(LoggerInterface $logger = null) |
20
|
|
|
{ |
21
|
7 |
|
$this->logger = $logger ?? new NullLogger(); |
22
|
7 |
|
} |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* @param Process[]|array $processes |
26
|
|
|
* @param callable|null $startCallback |
27
|
|
|
* @param callable|null $iterationCallback |
28
|
|
|
* @param callable|null $finishCallback |
29
|
|
|
* @param int $parallelProcessCount |
30
|
|
|
* @param int $iterationSleepInMicroseconds |
31
|
|
|
*/ |
32
|
7 |
|
public function execute( |
33
|
|
|
array $processes, |
34
|
|
|
callable $startCallback = null, |
35
|
|
|
callable $iterationCallback = null, |
36
|
|
|
callable $finishCallback = null, |
37
|
|
|
int $parallelProcessCount = 8, |
38
|
|
|
int $iterationSleepInMicroseconds = 0 |
39
|
|
|
) { |
40
|
7 |
|
$this->logger->info(self::LOG_START); |
41
|
|
|
|
42
|
|
|
/** @var Process[]|array $parallelProcesses */ |
43
|
7 |
|
$parallelProcesses = []; |
44
|
|
|
|
45
|
|
|
do { |
46
|
7 |
|
foreach ($parallelProcesses as $key => $process) { |
47
|
6 |
|
if (false === $process->isRunning()) { |
48
|
6 |
|
$this->finishCallback($process, $key, $finishCallback); |
49
|
6 |
|
unset($parallelProcesses[$key]); |
50
|
|
|
} |
51
|
|
|
} |
52
|
|
|
|
53
|
7 |
|
while (count($parallelProcesses) < $parallelProcessCount) { |
54
|
7 |
|
if (null !== $key = key($processes)) { |
55
|
6 |
|
$process = current($processes); |
56
|
6 |
|
$process->start(); |
57
|
6 |
|
$this->startCallback($process, $key, $startCallback); |
58
|
6 |
|
$parallelProcesses[$key] = $process; |
59
|
6 |
|
next($processes); |
60
|
|
|
} else { |
61
|
7 |
|
break; |
62
|
|
|
} |
63
|
|
|
} |
64
|
|
|
|
65
|
7 |
|
usleep($iterationSleepInMicroseconds); |
66
|
|
|
|
67
|
7 |
|
$this->callIterationCallback($parallelProcesses, $iterationCallback); |
68
|
7 |
|
} while ([] !== $parallelProcesses); |
69
|
|
|
|
70
|
7 |
|
$this->logger->info(self::LOG_FINISHED); |
71
|
7 |
|
} |
72
|
|
|
|
73
|
|
|
/** |
74
|
|
|
* @param Process $process |
75
|
|
|
* @param mixed $key |
76
|
|
|
* @param callable|null $startCallback |
77
|
|
|
*/ |
78
|
6 |
View Code Duplication |
private function startCallback(Process $process, $key, callable $startCallback = null) |
|
|
|
|
79
|
|
|
{ |
80
|
6 |
|
if (null === $startCallback) { |
81
|
4 |
|
return; |
82
|
|
|
} |
83
|
|
|
|
84
|
2 |
|
$this->logger->debug(self::LOG_START_START_CALLBACK, ['process' => $process, 'key' => $key]); |
85
|
2 |
|
$startCallback($process, $key); |
86
|
2 |
|
$this->logger->debug(self::LOG_STOP_START_CALLBACK, ['process' => $process, 'key' => $key]); |
87
|
2 |
|
} |
88
|
|
|
|
89
|
|
|
/** |
90
|
|
|
* @param Process[]|array $processes |
91
|
|
|
* @param callable|null $iterationCallback |
92
|
|
|
*/ |
93
|
7 |
|
private function callIterationCallback(array $processes, callable $iterationCallback = null) |
94
|
|
|
{ |
95
|
7 |
|
if (null === $iterationCallback) { |
96
|
5 |
|
return; |
97
|
|
|
} |
98
|
|
|
|
99
|
2 |
|
$this->logger->debug(self::LOG_START_ITERATION_CALLBACK, ['processes' => $processes]); |
100
|
2 |
|
$iterationCallback($processes); |
101
|
2 |
|
$this->logger->debug(self::LOG_STOP_ITERATION_CALLBACK, ['processes' => $processes]); |
102
|
2 |
|
} |
103
|
|
|
|
104
|
|
|
/** |
105
|
|
|
* @param Process $process |
106
|
|
|
* @param mixed $key |
107
|
|
|
* @param callable|null $finishCallback |
108
|
|
|
*/ |
109
|
6 |
View Code Duplication |
private function finishCallback(Process $process, $key, callable $finishCallback = null) |
|
|
|
|
110
|
|
|
{ |
111
|
6 |
|
if (null === $finishCallback) { |
112
|
4 |
|
return; |
113
|
|
|
} |
114
|
|
|
|
115
|
2 |
|
$this->logger->debug(self::LOG_START_FINISH_CALLBACK, ['process' => $process, 'key' => $key]); |
116
|
2 |
|
$finishCallback($process, $key); |
117
|
2 |
|
$this->logger->debug(self::LOG_STOP_FINISH_CALLBACK, ['process' => $process, 'key' => $key]); |
118
|
2 |
|
} |
119
|
|
|
} |
120
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.