1
|
|
|
<?php |
2
|
|
|
/** |
3
|
|
|
* This file is part of graze/parallel-process. |
4
|
|
|
* |
5
|
|
|
* Copyright (c) 2017 Nature Delivered Ltd. <https://www.graze.com> |
6
|
|
|
* |
7
|
|
|
* For the full copyright and license information, please view the LICENSE |
8
|
|
|
* file that was distributed with this source code. |
9
|
|
|
* |
10
|
|
|
* @license https://github.com/graze/parallel-process/blob/master/LICENSE.md |
11
|
|
|
* @link https://github.com/graze/parallel-process |
12
|
|
|
*/ |
13
|
|
|
|
14
|
|
|
namespace Graze\ParallelProcess; |
15
|
|
|
|
16
|
|
|
use Graze\DataStructure\Collection\Collection; |
17
|
|
|
use Graze\ParallelProcess\Exceptions\NotRunningException; |
18
|
|
|
use InvalidArgumentException; |
19
|
|
|
use Symfony\Component\Process\Process; |
20
|
|
|
|
21
|
|
|
class Pool extends Collection implements RunInterface |
22
|
|
|
{ |
23
|
|
|
const CHECK_INTERVAL = 0.1; |
24
|
|
|
const NO_MAX = -1; |
25
|
|
|
|
26
|
|
|
/** @var RunInterface[] */ |
27
|
|
|
protected $items = []; |
28
|
|
|
/** @var RunInterface[] */ |
29
|
|
|
protected $running = []; |
30
|
|
|
/** @var RunInterface[] */ |
31
|
|
|
protected $waiting = []; |
32
|
|
|
/** @var callable|null */ |
33
|
|
|
protected $onSuccess; |
34
|
|
|
/** @var callable|null */ |
35
|
|
|
protected $onFailure; |
36
|
|
|
/** @var callable|null */ |
37
|
|
|
protected $onProgress; |
38
|
|
|
/** @var int */ |
39
|
|
|
private $maxSimultaneous = -1; |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* Pool constructor. |
43
|
|
|
* |
44
|
|
|
* Set the default callbacks here |
45
|
|
|
* |
46
|
|
|
* @param RunInterface[]|Process[] $items |
47
|
|
|
* @param callable|null $onSuccess function (Process $process, float $duration, string $last) : void |
48
|
|
|
* @param callable|null $onFailure function (Process $process, float $duration, string $last) : void |
49
|
|
|
* @param callable|null $onProgress function (Process $process, float $duration, string $last) : void |
50
|
|
|
* @param int $maxSimultaneous |
51
|
|
|
*/ |
52
|
|
View Code Duplication |
public function __construct( |
|
|
|
|
53
|
|
|
array $items = [], |
54
|
|
|
callable $onSuccess = null, |
55
|
|
|
callable $onFailure = null, |
56
|
|
|
callable $onProgress = null, |
57
|
|
|
$maxSimultaneous = self::NO_MAX |
58
|
|
|
) { |
59
|
|
|
parent::__construct($items); |
60
|
|
|
|
61
|
|
|
$this->onSuccess = $onSuccess; |
62
|
|
|
$this->onFailure = $onFailure; |
63
|
|
|
$this->onProgress = $onProgress; |
64
|
|
|
$this->maxSimultaneous = $maxSimultaneous; |
65
|
|
|
} |
66
|
|
|
|
67
|
|
|
/** |
68
|
|
|
* @param callable|null $onSuccess function (Process $process, float $duration, string $last) : void |
69
|
|
|
* |
70
|
|
|
* @return $this |
71
|
|
|
*/ |
72
|
|
|
public function setOnSuccess($onSuccess) |
73
|
|
|
{ |
74
|
|
|
$this->onSuccess = $onSuccess; |
75
|
|
|
return $this; |
76
|
|
|
} |
77
|
|
|
|
78
|
|
|
/** |
79
|
|
|
* @param callable|null $onFailure function (Process $process, float $duration, string $last) : void |
80
|
|
|
* |
81
|
|
|
* @return $this |
82
|
|
|
*/ |
83
|
|
|
public function setOnFailure($onFailure) |
84
|
|
|
{ |
85
|
|
|
$this->onFailure = $onFailure; |
86
|
|
|
return $this; |
87
|
|
|
} |
88
|
|
|
|
89
|
|
|
/** |
90
|
|
|
* @param callable|null $onProgress function (Process $process, float $duration, string $last) : void |
91
|
|
|
* |
92
|
|
|
* @return $this |
93
|
|
|
*/ |
94
|
|
|
public function setOnProgress($onProgress) |
95
|
|
|
{ |
96
|
|
|
$this->onProgress = $onProgress; |
97
|
|
|
return $this; |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
/** |
101
|
|
|
* Add a new process to the pool |
102
|
|
|
* |
103
|
|
|
* @param RunInterface|Process $item |
104
|
|
|
* |
105
|
|
|
* @return $this |
106
|
|
|
*/ |
107
|
|
|
public function add($item) |
108
|
|
|
{ |
109
|
|
|
if ($item instanceof Process) { |
110
|
|
|
return $this->addProcess($item); |
111
|
|
|
} |
112
|
|
|
|
113
|
|
|
if (!$item instanceof RunInterface) { |
114
|
|
|
throw new InvalidArgumentException("add: Can only add `RunInterface` to this collection"); |
115
|
|
|
} |
116
|
|
|
|
117
|
|
|
if (!$this->isRunning() && $item->isRunning()) { |
118
|
|
|
throw new NotRunningException("add: unable to add a running item when the pool has not started"); |
119
|
|
|
} |
120
|
|
|
|
121
|
|
|
parent::add($item); |
122
|
|
|
|
123
|
|
|
if ($this->isRunning()) { |
124
|
|
|
$this->startRun($item); |
125
|
|
|
} |
126
|
|
|
|
127
|
|
|
return $this; |
128
|
|
|
} |
129
|
|
|
|
130
|
|
|
/** |
131
|
|
|
* Add a new process to the pool using the default callbacks |
132
|
|
|
* |
133
|
|
|
* @param Process $process |
134
|
|
|
* |
135
|
|
|
* @return $this |
136
|
|
|
*/ |
137
|
|
|
protected function addProcess(Process $process) |
138
|
|
|
{ |
139
|
|
|
return $this->add(new Run( |
140
|
|
|
$process, |
141
|
|
|
$this->onSuccess, |
142
|
|
|
$this->onFailure, |
143
|
|
|
$this->onProgress |
144
|
|
|
)); |
145
|
|
|
} |
146
|
|
|
|
147
|
|
|
/** |
148
|
|
|
* Start all the processes running |
149
|
|
|
* |
150
|
|
|
* @return $this |
151
|
|
|
*/ |
152
|
|
|
public function start() |
153
|
|
|
{ |
154
|
|
|
foreach ($this->items as $run) { |
155
|
|
|
$this->startRun($run); |
156
|
|
|
} |
157
|
|
|
|
158
|
|
|
return $this; |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
/** |
162
|
|
|
* Start a run (or queue it if we are running the maximum number of processes already) |
163
|
|
|
* |
164
|
|
|
* @param RunInterface $run |
165
|
|
|
*/ |
166
|
|
|
private function startRun(RunInterface $run) |
167
|
|
|
{ |
168
|
|
|
if ($this->maxSimultaneous === static::NO_MAX || count($this->running) < $this->maxSimultaneous) { |
169
|
|
|
$run->start(); |
170
|
|
|
$this->running[] = $run; |
171
|
|
|
} else { |
172
|
|
|
$this->waiting[] = $run; |
173
|
|
|
} |
174
|
|
|
} |
175
|
|
|
|
176
|
|
|
/** |
177
|
|
|
* Blocking call to run processes; |
178
|
|
|
* |
179
|
|
|
* @param float $checkInterval Seconds between checks |
180
|
|
|
* |
181
|
|
|
* @return bool true if all processes were successful |
182
|
|
|
*/ |
183
|
|
|
public function run($checkInterval = self::CHECK_INTERVAL) |
184
|
|
|
{ |
185
|
|
|
$this->start(); |
186
|
|
|
$interval = $checkInterval * 1000000; |
187
|
|
|
|
188
|
|
|
while ($this->poll()) { |
189
|
|
|
usleep($interval); |
190
|
|
|
} |
191
|
|
|
|
192
|
|
|
return $this->isSuccessful(); |
193
|
|
|
} |
194
|
|
|
|
195
|
|
|
/** |
196
|
|
|
* Check when a run has finished, if there are processes waiting, start them |
197
|
|
|
*/ |
198
|
|
|
private function checkFinished() |
199
|
|
|
{ |
200
|
|
|
if ($this->maxSimultaneous !== static::NO_MAX |
201
|
|
|
&& count($this->waiting) > 0 |
202
|
|
|
&& count($this->running) < $this->maxSimultaneous) { |
203
|
|
|
for ($i = count($this->running); $i < $this->maxSimultaneous && count($this->waiting) > 0; $i++) { |
204
|
|
|
$run = array_shift($this->waiting); |
205
|
|
|
$run->start(); |
206
|
|
|
$this->running[] = $run; |
207
|
|
|
} |
208
|
|
|
} |
209
|
|
|
} |
210
|
|
|
|
211
|
|
|
/** |
212
|
|
|
* Determine if any item has run |
213
|
|
|
* |
214
|
|
|
* @return bool |
215
|
|
|
*/ |
216
|
|
|
public function hasStarted() |
217
|
|
|
{ |
218
|
|
|
foreach ($this->items as $run) { |
219
|
|
|
if ($run->hasStarted()) { |
220
|
|
|
return true; |
221
|
|
|
} |
222
|
|
|
} |
223
|
|
|
return false; |
224
|
|
|
} |
225
|
|
|
|
226
|
|
|
/** |
227
|
|
|
* Are any of the processes running |
228
|
|
|
* |
229
|
|
|
* @return bool |
230
|
|
|
*/ |
231
|
|
|
public function poll() |
232
|
|
|
{ |
233
|
|
|
/** @var Run[] $running */ |
234
|
|
|
$this->running = array_filter($this->running, function (RunInterface $run) { |
235
|
|
|
return $run->poll(); |
236
|
|
|
}); |
237
|
|
|
|
238
|
|
|
$this->checkFinished(); |
239
|
|
|
|
240
|
|
|
return $this->isRunning(); |
241
|
|
|
} |
242
|
|
|
|
243
|
|
|
/** |
244
|
|
|
* @return bool |
245
|
|
|
*/ |
246
|
|
|
public function isRunning() |
247
|
|
|
{ |
248
|
|
|
return count($this->running) > 0; |
249
|
|
|
} |
250
|
|
|
|
251
|
|
|
/** |
252
|
|
|
* Return if all runs have started and were successful |
253
|
|
|
* |
254
|
|
|
* @return bool |
255
|
|
|
*/ |
256
|
|
|
public function isSuccessful() |
257
|
|
|
{ |
258
|
|
|
if (!$this->hasStarted()) { |
259
|
|
|
return false; |
260
|
|
|
} |
261
|
|
|
|
262
|
|
|
foreach ($this->items as $run) { |
263
|
|
|
if (!$run->isSuccessful()) { |
264
|
|
|
return false; |
265
|
|
|
} |
266
|
|
|
} |
267
|
|
|
|
268
|
|
|
return true; |
269
|
|
|
} |
270
|
|
|
|
271
|
|
|
/** |
272
|
|
|
* Get a list of all the currently running runs |
273
|
|
|
* |
274
|
|
|
* @return RunInterface[] |
275
|
|
|
*/ |
276
|
|
|
public function getRunning() |
277
|
|
|
{ |
278
|
|
|
return $this->running; |
279
|
|
|
} |
280
|
|
|
|
281
|
|
|
/** |
282
|
|
|
* Get a list of all the current waiting runs |
283
|
|
|
* |
284
|
|
|
* @return RunInterface[] |
285
|
|
|
*/ |
286
|
|
|
public function getWaiting() |
287
|
|
|
{ |
288
|
|
|
return $this->waiting; |
289
|
|
|
} |
290
|
|
|
|
291
|
|
|
/** |
292
|
|
|
* @return int |
293
|
|
|
*/ |
294
|
|
|
public function getMaxSimultaneous() |
295
|
|
|
{ |
296
|
|
|
return $this->maxSimultaneous; |
297
|
|
|
} |
298
|
|
|
|
299
|
|
|
/** |
300
|
|
|
* @param int $maxSimultaneous |
301
|
|
|
* |
302
|
|
|
* @return $this |
303
|
|
|
*/ |
304
|
|
|
public function setMaxSimultaneous($maxSimultaneous) |
305
|
|
|
{ |
306
|
|
|
$this->maxSimultaneous = $maxSimultaneous; |
307
|
|
|
return $this; |
308
|
|
|
} |
309
|
|
|
} |
310
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.