1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Spatie\Async; |
4
|
|
|
|
5
|
|
|
use ArrayAccess; |
6
|
|
|
use Spatie\Async\Runtime\ParentRuntime; |
7
|
|
|
|
8
|
|
|
class Pool implements ArrayAccess |
9
|
|
|
{ |
10
|
|
|
protected $concurrency = 20; |
11
|
|
|
protected $tasksPerProcess = 1; |
12
|
|
|
protected $timeout = 300; |
13
|
|
|
protected $sleepTime = 50000; |
14
|
|
|
|
15
|
|
|
/** @var \Spatie\Async\ParallelProcess[] */ |
16
|
|
|
protected $queue = []; |
17
|
|
|
|
18
|
|
|
/** @var \Spatie\Async\ParallelProcess[] */ |
19
|
|
|
protected $inProgress = []; |
20
|
|
|
|
21
|
|
|
/** @var \Spatie\Async\ParallelProcess[] */ |
22
|
|
|
protected $finished = []; |
23
|
|
|
|
24
|
|
|
/** @var \Spatie\Async\ParallelProcess[] */ |
25
|
|
|
protected $failed = []; |
26
|
|
|
|
27
|
|
|
protected $results = []; |
28
|
|
|
|
29
|
|
|
public function __construct() |
30
|
|
|
{ |
31
|
|
|
$this->registerListener(); |
32
|
|
|
} |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @return static |
36
|
|
|
*/ |
37
|
|
|
public static function create() |
38
|
|
|
{ |
39
|
|
|
return new static(); |
40
|
|
|
} |
41
|
|
|
|
42
|
|
|
public static function isSupported(): bool |
43
|
|
|
{ |
44
|
|
|
return function_exists('pcntl_async_signals') && function_exists('posix_kill'); |
45
|
|
|
} |
46
|
|
|
|
47
|
|
|
public function concurrency(int $concurrency): self |
48
|
|
|
{ |
49
|
|
|
$this->concurrency = $concurrency; |
50
|
|
|
|
51
|
|
|
return $this; |
52
|
|
|
} |
53
|
|
|
|
54
|
|
|
public function timeout(int $timeout): self |
55
|
|
|
{ |
56
|
|
|
$this->timeout = $timeout; |
57
|
|
|
|
58
|
|
|
return $this; |
59
|
|
|
} |
60
|
|
|
|
61
|
|
|
public function autoload(string $autoloader): self |
62
|
|
|
{ |
63
|
|
|
ParentRuntime::init($autoloader); |
64
|
|
|
|
65
|
|
|
return $this; |
66
|
|
|
} |
67
|
|
|
|
68
|
|
|
public function sleepTime(int $sleepTime): self |
69
|
|
|
{ |
70
|
|
|
$this->sleepTime = $sleepTime; |
71
|
|
|
|
72
|
|
|
return $this; |
73
|
|
|
} |
74
|
|
|
|
75
|
|
|
public function notify() |
76
|
|
|
{ |
77
|
|
|
if (count($this->inProgress) >= $this->concurrency) { |
78
|
|
|
return; |
79
|
|
|
} |
80
|
|
|
|
81
|
|
|
$process = array_shift($this->queue); |
82
|
|
|
|
83
|
|
|
if (! $process) { |
84
|
|
|
return; |
85
|
|
|
} |
86
|
|
|
|
87
|
|
|
$this->putInProgress($process); |
88
|
|
|
} |
89
|
|
|
|
90
|
|
|
/** |
91
|
|
|
* @param \Spatie\Async\ParallelProcess|callable $process |
92
|
|
|
* |
93
|
|
|
* @return \Spatie\Async\ParallelProcess |
94
|
|
|
*/ |
95
|
|
|
public function add($process): ParallelProcess |
96
|
|
|
{ |
97
|
|
|
if (! $process instanceof ParallelProcess) { |
98
|
|
|
$process = ParentRuntime::createChildProcess($process); |
99
|
|
|
} |
100
|
|
|
|
101
|
|
|
$this->putInQueue($process); |
102
|
|
|
|
103
|
|
|
return $process; |
104
|
|
|
} |
105
|
|
|
|
106
|
|
|
public function wait(): array |
107
|
|
|
{ |
108
|
|
|
while ($this->inProgress) { |
|
|
|
|
109
|
|
|
foreach ($this->inProgress as $process) { |
110
|
|
|
if ($process->getCurrentExecutionTime() > $this->timeout) { |
111
|
|
|
$this->markAsTimedOut($process); |
112
|
|
|
} |
113
|
|
|
} |
114
|
|
|
|
115
|
|
|
if (! $this->inProgress) { |
|
|
|
|
116
|
|
|
break; |
117
|
|
|
} |
118
|
|
|
|
119
|
|
|
usleep($this->sleepTime); |
120
|
|
|
} |
121
|
|
|
|
122
|
|
|
return $this->results; |
123
|
|
|
} |
124
|
|
|
|
125
|
|
|
public function putInQueue(ParallelProcess $process) |
126
|
|
|
{ |
127
|
|
|
$this->queue[$process->getId()] = $process; |
128
|
|
|
|
129
|
|
|
$this->notify(); |
130
|
|
|
} |
131
|
|
|
|
132
|
|
|
public function putInProgress(ParallelProcess $process) |
133
|
|
|
{ |
134
|
|
|
$process->getProcess()->setTimeout($this->timeout); |
135
|
|
|
|
136
|
|
|
$process->start(); |
137
|
|
|
|
138
|
|
|
unset($this->queue[$process->getId()]); |
139
|
|
|
|
140
|
|
|
$this->inProgress[$process->getPid()] = $process; |
141
|
|
|
} |
142
|
|
|
|
143
|
|
|
public function markAsFinished(ParallelProcess $process) |
144
|
|
|
{ |
145
|
|
|
$this->results[] = $process->triggerSuccess(); |
146
|
|
|
|
147
|
|
|
unset($this->inProgress[$process->getPid()]); |
148
|
|
|
|
149
|
|
|
$this->finished[$process->getPid()] = $process; |
150
|
|
|
|
151
|
|
|
$this->notify(); |
152
|
|
|
} |
153
|
|
|
|
154
|
|
View Code Duplication |
public function markAsTimedOut(ParallelProcess $process) |
|
|
|
|
155
|
|
|
{ |
156
|
|
|
$process->triggerTimeout(); |
157
|
|
|
|
158
|
|
|
unset($this->inProgress[$process->getPid()]); |
159
|
|
|
|
160
|
|
|
$this->failed[$process->getPid()] = $process; |
161
|
|
|
|
162
|
|
|
$this->notify(); |
163
|
|
|
} |
164
|
|
|
|
165
|
|
View Code Duplication |
public function markAsFailed(ParallelProcess $process) |
|
|
|
|
166
|
|
|
{ |
167
|
|
|
$process->triggerError(); |
168
|
|
|
|
169
|
|
|
unset($this->inProgress[$process->getPid()]); |
170
|
|
|
|
171
|
|
|
$this->failed[$process->getPid()] = $process; |
172
|
|
|
|
173
|
|
|
$this->notify(); |
174
|
|
|
} |
175
|
|
|
|
176
|
|
|
public function offsetExists($offset) |
177
|
|
|
{ |
178
|
|
|
// TODO |
179
|
|
|
|
180
|
|
|
return false; |
181
|
|
|
} |
182
|
|
|
|
183
|
|
|
public function offsetGet($offset) |
184
|
|
|
{ |
185
|
|
|
// TODO |
186
|
|
|
} |
187
|
|
|
|
188
|
|
|
public function offsetSet($offset, $value) |
189
|
|
|
{ |
190
|
|
|
$this->add($value); |
|
|
|
|
191
|
|
|
} |
192
|
|
|
|
193
|
|
|
public function offsetUnset($offset) |
194
|
|
|
{ |
195
|
|
|
// TODO |
196
|
|
|
} |
197
|
|
|
|
198
|
|
|
/** |
199
|
|
|
* @return \Spatie\Async\ParallelProcess[] |
200
|
|
|
*/ |
201
|
|
|
public function getFinished(): array |
202
|
|
|
{ |
203
|
|
|
return $this->finished; |
204
|
|
|
} |
205
|
|
|
|
206
|
|
|
/** |
207
|
|
|
* @return \Spatie\Async\ParallelProcess[] |
208
|
|
|
*/ |
209
|
|
|
public function getFailed(): array |
210
|
|
|
{ |
211
|
|
|
return $this->failed; |
212
|
|
|
} |
213
|
|
|
|
214
|
|
|
protected function registerListener() |
215
|
|
|
{ |
216
|
|
|
pcntl_async_signals(true); |
217
|
|
|
|
218
|
|
|
pcntl_signal(SIGCHLD, function ($signo, $status) { |
219
|
|
|
while (true) { |
220
|
|
|
$pid = pcntl_waitpid(-1, $processState, WNOHANG | WUNTRACED); |
221
|
|
|
|
222
|
|
|
if ($pid <= 0) { |
223
|
|
|
break; |
224
|
|
|
} |
225
|
|
|
|
226
|
|
|
$process = $this->inProgress[$pid] ?? null; |
227
|
|
|
|
228
|
|
|
if (! $process) { |
229
|
|
|
continue; |
230
|
|
|
} |
231
|
|
|
|
232
|
|
|
if ($status['status'] === 0) { |
233
|
|
|
$this->markAsFinished($process); |
234
|
|
|
|
235
|
|
|
continue; |
236
|
|
|
} |
237
|
|
|
|
238
|
|
|
$this->markAsFailed($process); |
239
|
|
|
} |
240
|
|
|
}); |
241
|
|
|
} |
242
|
|
|
} |
243
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.