This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace Spatie\Async; |
||
4 | |||
5 | use ArrayAccess; |
||
6 | use InvalidArgumentException; |
||
7 | use Spatie\Async\Process\ParallelProcess; |
||
8 | use Spatie\Async\Process\Runnable; |
||
9 | use Spatie\Async\Process\SynchronousProcess; |
||
10 | use Spatie\Async\Runtime\ParentRuntime; |
||
11 | |||
12 | class Pool implements ArrayAccess |
||
13 | { |
||
14 | public static $forceSynchronous = false; |
||
15 | |||
16 | protected $concurrency = 20; |
||
17 | protected $tasksPerProcess = 1; |
||
18 | protected $timeout = 300; |
||
19 | protected $sleepTime = 50000; |
||
20 | |||
21 | /** @var \Spatie\Async\Process\Runnable[] */ |
||
22 | protected $queue = []; |
||
23 | |||
24 | /** @var \Spatie\Async\Process\Runnable[] */ |
||
25 | protected $inProgress = []; |
||
26 | |||
27 | /** @var \Spatie\Async\Process\Runnable[] */ |
||
28 | protected $finished = []; |
||
29 | |||
30 | /** @var \Spatie\Async\Process\Runnable[] */ |
||
31 | protected $failed = []; |
||
32 | |||
33 | /** @var \Spatie\Async\Process\Runnable[] */ |
||
34 | protected $timeouts = []; |
||
35 | |||
36 | protected $results = []; |
||
37 | |||
38 | protected $status; |
||
39 | |||
40 | protected $stopped = false; |
||
41 | |||
42 | protected $binary = PHP_BINARY; |
||
43 | |||
44 | public function __construct() |
||
45 | { |
||
46 | if (static::isSupported()) { |
||
47 | $this->registerListener(); |
||
48 | } |
||
49 | |||
50 | $this->status = new PoolStatus($this); |
||
51 | } |
||
52 | |||
53 | /** |
||
54 | * @return static |
||
55 | */ |
||
56 | public static function create() |
||
57 | { |
||
58 | return new static(); |
||
59 | } |
||
60 | |||
61 | public static function isSupported(): bool |
||
62 | { |
||
63 | return |
||
64 | function_exists('pcntl_async_signals') |
||
65 | && function_exists('posix_kill') |
||
66 | && ! self::$forceSynchronous; |
||
67 | } |
||
68 | |||
69 | public function concurrency(int $concurrency): self |
||
70 | { |
||
71 | $this->concurrency = $concurrency; |
||
72 | |||
73 | return $this; |
||
74 | } |
||
75 | |||
76 | public function timeout(float $timeout): self |
||
77 | { |
||
78 | $this->timeout = $timeout; |
||
79 | |||
80 | return $this; |
||
81 | } |
||
82 | |||
83 | public function autoload(string $autoloader): self |
||
84 | { |
||
85 | ParentRuntime::init($autoloader); |
||
86 | |||
87 | return $this; |
||
88 | } |
||
89 | |||
90 | public function sleepTime(int $sleepTime): self |
||
91 | { |
||
92 | $this->sleepTime = $sleepTime; |
||
93 | |||
94 | return $this; |
||
95 | } |
||
96 | |||
97 | public function withBinary(string $binary): self |
||
98 | { |
||
99 | $this->binary = $binary; |
||
100 | |||
101 | return $this; |
||
102 | } |
||
103 | |||
104 | public function notify() |
||
105 | { |
||
106 | if (count($this->inProgress) >= $this->concurrency) { |
||
107 | return; |
||
108 | } |
||
109 | |||
110 | $process = array_shift($this->queue); |
||
111 | |||
112 | if (! $process) { |
||
113 | return; |
||
114 | } |
||
115 | |||
116 | $this->putInProgress($process); |
||
117 | } |
||
118 | |||
119 | /** |
||
120 | * @param \Spatie\Async\Process\Runnable|callable $process |
||
121 | * @param int|null $outputLength |
||
122 | * |
||
123 | * @return \Spatie\Async\Process\Runnable |
||
124 | */ |
||
125 | public function add($process, ?int $outputLength = null): Runnable |
||
126 | { |
||
127 | if (! is_callable($process) && ! $process instanceof Runnable) { |
||
128 | throw new InvalidArgumentException('The process passed to Pool::add should be callable.'); |
||
129 | } |
||
130 | |||
131 | if (! $process instanceof Runnable) { |
||
132 | $process = ParentRuntime::createProcess( |
||
133 | $process, |
||
134 | $outputLength, |
||
135 | $this->binary |
||
136 | ); |
||
137 | } |
||
138 | |||
139 | $this->putInQueue($process); |
||
140 | |||
141 | return $process; |
||
142 | } |
||
143 | |||
144 | public function wait(?callable $intermediateCallback = null): array |
||
145 | { |
||
146 | while ($this->inProgress) { |
||
0 ignored issues
–
show
|
|||
147 | foreach ($this->inProgress as $process) { |
||
148 | if ($process->getCurrentExecutionTime() > $this->timeout) { |
||
149 | $this->markAsTimedOut($process); |
||
150 | } |
||
151 | |||
152 | if ($process instanceof SynchronousProcess) { |
||
153 | $this->markAsFinished($process); |
||
154 | } |
||
155 | } |
||
156 | |||
157 | if (! $this->inProgress) { |
||
0 ignored issues
–
show
The expression
$this->inProgress of type Spatie\Async\Process\Runnable[] is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent. Consider making the comparison explicit by using ![]() |
|||
158 | break; |
||
159 | } |
||
160 | |||
161 | if ($intermediateCallback) { |
||
162 | call_user_func_array($intermediateCallback, [$this]); |
||
163 | } |
||
164 | |||
165 | usleep($this->sleepTime); |
||
166 | } |
||
167 | |||
168 | return $this->results; |
||
169 | } |
||
170 | |||
171 | public function putInQueue(Runnable $process) |
||
172 | { |
||
173 | $this->queue[$process->getId()] = $process; |
||
174 | |||
175 | $this->notify(); |
||
176 | } |
||
177 | |||
178 | public function putInProgress(Runnable $process) |
||
179 | { |
||
180 | if ($this->stopped) { |
||
181 | return; |
||
182 | } |
||
183 | |||
184 | if ($process instanceof ParallelProcess) { |
||
185 | $process->getProcess()->setTimeout($this->timeout); |
||
186 | } |
||
187 | |||
188 | $process->start(); |
||
189 | |||
190 | unset($this->queue[$process->getId()]); |
||
191 | |||
192 | $this->inProgress[$process->getPid()] = $process; |
||
193 | } |
||
194 | |||
195 | public function markAsFinished(Runnable $process) |
||
196 | { |
||
197 | unset($this->inProgress[$process->getPid()]); |
||
198 | |||
199 | $this->notify(); |
||
200 | |||
201 | $this->results[] = $process->triggerSuccess(); |
||
202 | |||
203 | $this->finished[$process->getPid()] = $process; |
||
204 | } |
||
205 | |||
206 | public function markAsTimedOut(Runnable $process) |
||
207 | { |
||
208 | unset($this->inProgress[$process->getPid()]); |
||
209 | |||
210 | $process->stop(); |
||
211 | |||
212 | $process->triggerTimeout(); |
||
213 | $this->timeouts[$process->getPid()] = $process; |
||
214 | |||
215 | $this->notify(); |
||
216 | } |
||
217 | |||
218 | public function markAsFailed(Runnable $process) |
||
219 | { |
||
220 | unset($this->inProgress[$process->getPid()]); |
||
221 | |||
222 | $this->notify(); |
||
223 | |||
224 | $process->triggerError(); |
||
225 | |||
226 | $this->failed[$process->getPid()] = $process; |
||
227 | } |
||
228 | |||
229 | public function offsetExists($offset) |
||
230 | { |
||
231 | // TODO |
||
232 | |||
233 | return false; |
||
234 | } |
||
235 | |||
236 | public function offsetGet($offset) |
||
237 | { |
||
238 | // TODO |
||
239 | } |
||
240 | |||
241 | public function offsetSet($offset, $value) |
||
242 | { |
||
243 | $this->add($value); |
||
244 | } |
||
245 | |||
246 | public function offsetUnset($offset) |
||
247 | { |
||
248 | // TODO |
||
249 | } |
||
250 | |||
251 | /** |
||
252 | * @return \Spatie\Async\Process\Runnable[] |
||
253 | */ |
||
254 | public function getQueue(): array |
||
255 | { |
||
256 | return $this->queue; |
||
257 | } |
||
258 | |||
259 | /** |
||
260 | * @return \Spatie\Async\Process\Runnable[] |
||
261 | */ |
||
262 | public function getInProgress(): array |
||
263 | { |
||
264 | return $this->inProgress; |
||
265 | } |
||
266 | |||
267 | /** |
||
268 | * @return \Spatie\Async\Process\Runnable[] |
||
269 | */ |
||
270 | public function getFinished(): array |
||
271 | { |
||
272 | return $this->finished; |
||
273 | } |
||
274 | |||
275 | /** |
||
276 | * @return \Spatie\Async\Process\Runnable[] |
||
277 | */ |
||
278 | public function getFailed(): array |
||
279 | { |
||
280 | return $this->failed; |
||
281 | } |
||
282 | |||
283 | /** |
||
284 | * @return \Spatie\Async\Process\Runnable[] |
||
285 | */ |
||
286 | public function getTimeouts(): array |
||
287 | { |
||
288 | return $this->timeouts; |
||
289 | } |
||
290 | |||
291 | public function status(): PoolStatus |
||
292 | { |
||
293 | return $this->status; |
||
294 | } |
||
295 | |||
296 | protected function registerListener() |
||
297 | { |
||
298 | pcntl_async_signals(true); |
||
299 | |||
300 | pcntl_signal(SIGCHLD, function ($signo, $status) { |
||
301 | while (true) { |
||
302 | $pid = pcntl_waitpid(-1, $processState, WNOHANG | WUNTRACED); |
||
303 | |||
304 | if ($pid <= 0) { |
||
305 | break; |
||
306 | } |
||
307 | |||
308 | $process = $this->inProgress[$pid] ?? null; |
||
309 | |||
310 | if (! $process) { |
||
311 | continue; |
||
312 | } |
||
313 | |||
314 | if ($status['status'] === 0) { |
||
315 | $this->markAsFinished($process); |
||
316 | |||
317 | continue; |
||
318 | } |
||
319 | |||
320 | $this->markAsFailed($process); |
||
321 | } |
||
322 | }); |
||
323 | } |
||
324 | |||
325 | public function stop() |
||
326 | { |
||
327 | $this->stopped = true; |
||
328 | } |
||
329 | } |
||
330 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.