1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Spatie\Async; |
4
|
|
|
|
5
|
|
|
use ArrayAccess; |
6
|
|
|
use InvalidArgumentException; |
7
|
|
|
use Spatie\Async\Process\ParallelProcess; |
8
|
|
|
use Spatie\Async\Process\Runnable; |
9
|
|
|
use Spatie\Async\Process\SynchronousProcess; |
10
|
|
|
use Spatie\Async\Runtime\ParentRuntime; |
11
|
|
|
|
12
|
|
|
class Pool implements ArrayAccess |
13
|
|
|
{ |
14
|
|
|
public static $forceSynchronous = false; |
15
|
|
|
|
16
|
|
|
protected $concurrency = 20; |
17
|
|
|
protected $tasksPerProcess = 1; |
18
|
|
|
protected $timeout = 300; |
19
|
|
|
protected $sleepTime = 50000; |
20
|
|
|
|
21
|
|
|
/** @var \Spatie\Async\Process\Runnable[] */ |
22
|
|
|
protected $queue = []; |
23
|
|
|
|
24
|
|
|
/** @var \Spatie\Async\Process\Runnable[] */ |
25
|
|
|
protected $inProgress = []; |
26
|
|
|
|
27
|
|
|
/** @var \Spatie\Async\Process\Runnable[] */ |
28
|
|
|
protected $finished = []; |
29
|
|
|
|
30
|
|
|
/** @var \Spatie\Async\Process\Runnable[] */ |
31
|
|
|
protected $failed = []; |
32
|
|
|
|
33
|
|
|
/** @var \Spatie\Async\Process\Runnable[] */ |
34
|
|
|
protected $timeouts = []; |
35
|
|
|
|
36
|
|
|
protected $results = []; |
37
|
|
|
|
38
|
|
|
protected $status; |
39
|
|
|
|
40
|
|
|
protected $stopped = false; |
41
|
|
|
|
42
|
|
|
public function __construct() |
43
|
|
|
{ |
44
|
|
|
if (static::isSupported()) { |
45
|
|
|
$this->registerListener(); |
46
|
|
|
} |
47
|
|
|
|
48
|
|
|
$this->status = new PoolStatus($this); |
49
|
|
|
} |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* @return static |
53
|
|
|
*/ |
54
|
|
|
public static function create() |
55
|
|
|
{ |
56
|
|
|
return new static(); |
57
|
|
|
} |
58
|
|
|
|
59
|
|
|
public static function isSupported(): bool |
60
|
|
|
{ |
61
|
|
|
return |
62
|
|
|
function_exists('pcntl_async_signals') |
63
|
|
|
&& function_exists('posix_kill') |
64
|
|
|
&& ! self::$forceSynchronous; |
65
|
|
|
} |
66
|
|
|
|
67
|
|
|
public function concurrency(int $concurrency): self |
68
|
|
|
{ |
69
|
|
|
$this->concurrency = $concurrency; |
70
|
|
|
|
71
|
|
|
return $this; |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
public function timeout(int $timeout): self |
75
|
|
|
{ |
76
|
|
|
$this->timeout = $timeout; |
77
|
|
|
|
78
|
|
|
return $this; |
79
|
|
|
} |
80
|
|
|
|
81
|
|
|
public function autoload(string $autoloader): self |
82
|
|
|
{ |
83
|
|
|
ParentRuntime::init($autoloader); |
84
|
|
|
|
85
|
|
|
return $this; |
86
|
|
|
} |
87
|
|
|
|
88
|
|
|
public function sleepTime(int $sleepTime): self |
89
|
|
|
{ |
90
|
|
|
$this->sleepTime = $sleepTime; |
91
|
|
|
|
92
|
|
|
return $this; |
93
|
|
|
} |
94
|
|
|
|
95
|
|
|
public function notify() |
96
|
|
|
{ |
97
|
|
|
if (count($this->inProgress) >= $this->concurrency) { |
98
|
|
|
return; |
99
|
|
|
} |
100
|
|
|
|
101
|
|
|
$process = array_shift($this->queue); |
102
|
|
|
|
103
|
|
|
if (! $process) { |
104
|
|
|
return; |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
$this->putInProgress($process); |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
/** |
111
|
|
|
* @param \Spatie\Async\Process\Runnable|callable $process |
112
|
|
|
* @param int|null $outputLength |
113
|
|
|
* |
114
|
|
|
* @return \Spatie\Async\Process\Runnable |
115
|
|
|
*/ |
116
|
|
|
public function add($process, ?int $outputLength = null): Runnable |
117
|
|
|
{ |
118
|
|
|
if (! is_callable($process) && ! $process instanceof Runnable) { |
119
|
|
|
throw new InvalidArgumentException('The process passed to Pool::add should be callable.'); |
120
|
|
|
} |
121
|
|
|
|
122
|
|
|
if (! $process instanceof Runnable) { |
123
|
|
|
$process = ParentRuntime::createProcess($process, $outputLength); |
124
|
|
|
} |
125
|
|
|
|
126
|
|
|
$this->putInQueue($process); |
127
|
|
|
|
128
|
|
|
return $process; |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
public function wait(?callable $intermediateCallback = null): array |
132
|
|
|
{ |
133
|
|
|
while ($this->inProgress) { |
|
|
|
|
134
|
|
|
foreach ($this->inProgress as $process) { |
135
|
|
|
if ($process->getCurrentExecutionTime() > $this->timeout) { |
136
|
|
|
$this->markAsTimedOut($process); |
137
|
|
|
} |
138
|
|
|
|
139
|
|
|
if ($process instanceof SynchronousProcess) { |
140
|
|
|
$this->markAsFinished($process); |
141
|
|
|
} |
142
|
|
|
} |
143
|
|
|
|
144
|
|
|
if (! $this->inProgress) { |
|
|
|
|
145
|
|
|
break; |
146
|
|
|
} |
147
|
|
|
|
148
|
|
|
if ($intermediateCallback) { |
149
|
|
|
call_user_func_array($intermediateCallback, [$this]); |
150
|
|
|
} |
151
|
|
|
|
152
|
|
|
usleep($this->sleepTime); |
153
|
|
|
} |
154
|
|
|
|
155
|
|
|
return $this->results; |
156
|
|
|
} |
157
|
|
|
|
158
|
|
|
public function putInQueue(Runnable $process) |
159
|
|
|
{ |
160
|
|
|
$this->queue[$process->getId()] = $process; |
161
|
|
|
|
162
|
|
|
$this->notify(); |
163
|
|
|
} |
164
|
|
|
|
165
|
|
|
public function putInProgress(Runnable $process) |
166
|
|
|
{ |
167
|
|
|
if ($this->stopped) { |
168
|
|
|
return; |
169
|
|
|
} |
170
|
|
|
|
171
|
|
|
if ($process instanceof ParallelProcess) { |
172
|
|
|
$process->getProcess()->setTimeout($this->timeout); |
173
|
|
|
} |
174
|
|
|
|
175
|
|
|
$process->start(); |
176
|
|
|
|
177
|
|
|
unset($this->queue[$process->getId()]); |
178
|
|
|
|
179
|
|
|
$this->inProgress[$process->getPid()] = $process; |
180
|
|
|
} |
181
|
|
|
|
182
|
|
|
public function markAsFinished(Runnable $process) |
183
|
|
|
{ |
184
|
|
|
unset($this->inProgress[$process->getPid()]); |
185
|
|
|
|
186
|
|
|
$this->notify(); |
187
|
|
|
|
188
|
|
|
$this->results[] = $process->triggerSuccess(); |
189
|
|
|
|
190
|
|
|
$this->finished[$process->getPid()] = $process; |
191
|
|
|
} |
192
|
|
|
|
193
|
|
|
public function markAsTimedOut(Runnable $process) |
194
|
|
|
{ |
195
|
|
|
unset($this->inProgress[$process->getPid()]); |
196
|
|
|
|
197
|
|
|
$this->notify(); |
198
|
|
|
|
199
|
|
|
$process->triggerTimeout(); |
200
|
|
|
|
201
|
|
|
$this->timeouts[$process->getPid()] = $process; |
202
|
|
|
} |
203
|
|
|
|
204
|
|
|
public function markAsFailed(Runnable $process) |
205
|
|
|
{ |
206
|
|
|
unset($this->inProgress[$process->getPid()]); |
207
|
|
|
|
208
|
|
|
$this->notify(); |
209
|
|
|
|
210
|
|
|
$process->triggerError(); |
211
|
|
|
|
212
|
|
|
$this->failed[$process->getPid()] = $process; |
213
|
|
|
} |
214
|
|
|
|
215
|
|
|
public function offsetExists($offset) |
216
|
|
|
{ |
217
|
|
|
// TODO |
218
|
|
|
|
219
|
|
|
return false; |
220
|
|
|
} |
221
|
|
|
|
222
|
|
|
public function offsetGet($offset) |
223
|
|
|
{ |
224
|
|
|
// TODO |
225
|
|
|
} |
226
|
|
|
|
227
|
|
|
public function offsetSet($offset, $value) |
228
|
|
|
{ |
229
|
|
|
$this->add($value); |
230
|
|
|
} |
231
|
|
|
|
232
|
|
|
public function offsetUnset($offset) |
233
|
|
|
{ |
234
|
|
|
// TODO |
235
|
|
|
} |
236
|
|
|
|
237
|
|
|
/** |
238
|
|
|
* @return \Spatie\Async\Process\Runnable[] |
239
|
|
|
*/ |
240
|
|
|
public function getQueue(): array |
241
|
|
|
{ |
242
|
|
|
return $this->queue; |
243
|
|
|
} |
244
|
|
|
|
245
|
|
|
/** |
246
|
|
|
* @return \Spatie\Async\Process\Runnable[] |
247
|
|
|
*/ |
248
|
|
|
public function getInProgress(): array |
249
|
|
|
{ |
250
|
|
|
return $this->inProgress; |
251
|
|
|
} |
252
|
|
|
|
253
|
|
|
/** |
254
|
|
|
* @return \Spatie\Async\Process\Runnable[] |
255
|
|
|
*/ |
256
|
|
|
public function getFinished(): array |
257
|
|
|
{ |
258
|
|
|
return $this->finished; |
259
|
|
|
} |
260
|
|
|
|
261
|
|
|
/** |
262
|
|
|
* @return \Spatie\Async\Process\Runnable[] |
263
|
|
|
*/ |
264
|
|
|
public function getFailed(): array |
265
|
|
|
{ |
266
|
|
|
return $this->failed; |
267
|
|
|
} |
268
|
|
|
|
269
|
|
|
/** |
270
|
|
|
* @return \Spatie\Async\Process\Runnable[] |
271
|
|
|
*/ |
272
|
|
|
public function getTimeouts(): array |
273
|
|
|
{ |
274
|
|
|
return $this->timeouts; |
275
|
|
|
} |
276
|
|
|
|
277
|
|
|
public function status(): PoolStatus |
278
|
|
|
{ |
279
|
|
|
return $this->status; |
280
|
|
|
} |
281
|
|
|
|
282
|
|
|
protected function registerListener() |
283
|
|
|
{ |
284
|
|
|
pcntl_async_signals(true); |
285
|
|
|
|
286
|
|
|
pcntl_signal(SIGCHLD, function ($signo, $status) { |
287
|
|
|
while (true) { |
288
|
|
|
$pid = pcntl_waitpid(-1, $processState, WNOHANG | WUNTRACED); |
289
|
|
|
|
290
|
|
|
if ($pid <= 0) { |
291
|
|
|
break; |
292
|
|
|
} |
293
|
|
|
|
294
|
|
|
$process = $this->inProgress[$pid] ?? null; |
295
|
|
|
|
296
|
|
|
if (! $process) { |
297
|
|
|
continue; |
298
|
|
|
} |
299
|
|
|
|
300
|
|
|
if ($status['status'] === 0) { |
301
|
|
|
$this->markAsFinished($process); |
302
|
|
|
|
303
|
|
|
continue; |
304
|
|
|
} |
305
|
|
|
|
306
|
|
|
$this->markAsFailed($process); |
307
|
|
|
} |
308
|
|
|
}); |
309
|
|
|
} |
310
|
|
|
|
311
|
|
|
public function stop() |
312
|
|
|
{ |
313
|
|
|
$this->stopped = true; |
314
|
|
|
} |
315
|
|
|
} |
316
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.