1 | <?php |
||
2 | |||
3 | declare(strict_types=1); |
||
4 | |||
5 | /** |
||
6 | * TaskScheduler |
||
7 | * |
||
8 | * @author Raffael Sahli <[email protected]> |
||
9 | * @copyright Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com) |
||
10 | * @license MIT https://opensource.org/licenses/MIT |
||
11 | */ |
||
12 | |||
13 | namespace TaskScheduler; |
||
14 | |||
15 | use MongoDB\BSON\ObjectId; |
||
16 | use MongoDB\Database; |
||
17 | use Psr\Log\LoggerInterface; |
||
18 | use TaskScheduler\Exception\InvalidArgumentException; |
||
19 | use TaskScheduler\Exception\SpawnForkException; |
||
20 | |||
21 | class WorkerManager |
||
22 | { |
||
23 | use InjectTrait; |
||
24 | |||
25 | /** |
||
26 | * Queue options. |
||
27 | */ |
||
28 | public const OPTION_PM = 'pm'; |
||
29 | public const OPTION_MAX_CHILDREN = 'max_children'; |
||
30 | public const OPTION_MIN_CHILDREN = 'min_children'; |
||
31 | |||
32 | /** |
||
33 | * Process handling. |
||
34 | */ |
||
35 | public const PM_DYNAMIC = 'dynamic'; |
||
36 | public const PM_STATIC = 'static'; |
||
37 | public const PM_ONDEMAND = 'ondemand'; |
||
38 | |||
39 | /** |
||
40 | * Fork handler actions. |
||
41 | */ |
||
42 | public const TYPE_JOB = 1; |
||
43 | public const TYPE_EVENT = 2; |
||
44 | |||
45 | /** |
||
46 | * Process management. |
||
47 | * |
||
48 | * @var string |
||
49 | */ |
||
50 | protected $pm = self::PM_DYNAMIC; |
||
51 | |||
52 | /** |
||
53 | * Scheduler. |
||
54 | * |
||
55 | * @var Scheduler |
||
56 | */ |
||
57 | protected $scheduler; |
||
58 | |||
59 | /** |
||
60 | * Database. |
||
61 | * |
||
62 | * @var Database |
||
63 | */ |
||
64 | protected $db; |
||
65 | |||
66 | /** |
||
67 | * Logger. |
||
68 | * |
||
69 | * @var LoggerInterface |
||
70 | */ |
||
71 | protected $logger; |
||
72 | |||
73 | /** |
||
74 | * Max children. |
||
75 | * |
||
76 | * @var int |
||
77 | */ |
||
78 | protected $max_children = 2; |
||
79 | |||
80 | /** |
||
81 | * Min children. |
||
82 | * |
||
83 | * @var int |
||
84 | */ |
||
85 | protected $min_children = 1; |
||
86 | |||
87 | /** |
||
88 | * Forks. |
||
89 | * |
||
90 | * @var array |
||
91 | */ |
||
92 | protected $forks = []; |
||
93 | |||
94 | /** |
||
95 | * Worker/Job mapping. |
||
96 | * |
||
97 | * @var array |
||
98 | */ |
||
99 | protected $job_map = []; |
||
100 | |||
101 | /** |
||
102 | * Queue (Communication between TaskScheduler\Queue and TaskScheduler\WorkerManager). |
||
103 | * |
||
104 | * @var resource |
||
105 | */ |
||
106 | protected $queue; |
||
107 | |||
108 | /** |
||
109 | * Hold queue. |
||
110 | * |
||
111 | * @var array |
||
112 | */ |
||
113 | protected $onhold = []; |
||
114 | |||
115 | /** |
||
116 | * Worker factory. |
||
117 | * |
||
118 | * @var WorkerFactoryInterface |
||
119 | */ |
||
120 | protected $factory; |
||
121 | |||
122 | /** |
||
123 | * Init queue. |
||
124 | */ |
||
125 | 15 | public function __construct(WorkerFactoryInterface $factory, LoggerInterface $logger, array $config = []) |
|
126 | { |
||
127 | 15 | $this->logger = $logger; |
|
128 | 15 | $this->setOptions($config); |
|
129 | 15 | $this->factory = $factory; |
|
130 | 15 | } |
|
131 | |||
132 | /** |
||
133 | * Set options. |
||
134 | */ |
||
135 | 15 | public function setOptions(array $config = []): self |
|
136 | { |
||
137 | 15 | foreach ($config as $option => $value) { |
|
138 | switch ($option) { |
||
139 | 14 | case self::OPTION_MAX_CHILDREN: |
|
140 | 13 | case self::OPTION_MIN_CHILDREN: |
|
141 | 11 | if (!is_int($value)) { |
|
142 | 2 | throw new InvalidArgumentException($option.' needs to be an integer'); |
|
143 | } |
||
144 | |||
145 | 9 | $this->{$option} = $value; |
|
146 | |||
147 | 9 | break; |
|
148 | 9 | case self::OPTION_PM: |
|
149 | 8 | if (!defined('self::PM_'.strtoupper($value))) { |
|
150 | 1 | throw new InvalidArgumentException($value.' is not a valid process handling type (static, dynamic, ondemand)'); |
|
151 | } |
||
152 | |||
153 | 7 | $this->{$option} = $value; |
|
154 | |||
155 | 7 | break; |
|
156 | default: |
||
157 | 11 | throw new InvalidArgumentException('invalid option '.$option.' given'); |
|
158 | } |
||
159 | } |
||
160 | |||
161 | 15 | if ($this->min_children > $this->max_children) { |
|
162 | 1 | throw new InvalidArgumentException('option min_children must not be greater than option max_children'); |
|
163 | } |
||
164 | |||
165 | 15 | return $this; |
|
166 | } |
||
167 | |||
168 | /** |
||
169 | * Startup (blocking process). |
||
170 | */ |
||
171 | 10 | public function process(): void |
|
172 | { |
||
173 | 10 | $this->queue = msg_get_queue(ftok(__DIR__.DIRECTORY_SEPARATOR.'Queue.php', 't')); |
|
174 | 10 | $this->catchSignal(); |
|
175 | 10 | $this->spawnInitialWorkers(); |
|
176 | 10 | $this->main(); |
|
177 | 10 | } |
|
178 | |||
179 | /** |
||
180 | * Wait for child and terminate. |
||
181 | */ |
||
182 | 10 | public function exitWorker(int $sig, array $pid): self |
|
183 | { |
||
184 | 10 | $this->logger->debug('worker ['.$pid['pid'].'] exit with ['.$sig.']', [ |
|
185 | 10 | 'category' => get_class($this), |
|
186 | ]); |
||
187 | |||
188 | 10 | pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED); |
|
189 | |||
190 | 10 | foreach ($this->forks as $id => $process) { |
|
191 | 10 | if ($process === $pid['pid']) { |
|
192 | 3 | unset($this->forks[$id]); |
|
193 | |||
194 | 3 | if (isset($this->job_map[$id])) { |
|
195 | 10 | unset($this->job_map[$id]); |
|
196 | } |
||
197 | } |
||
198 | } |
||
199 | |||
200 | 10 | $this->spawnMinimumWorkers(); |
|
201 | 10 | return $this; |
|
202 | } |
||
203 | |||
204 | /** |
||
205 | * Count children. |
||
206 | */ |
||
207 | 17 | public function count(): int |
|
208 | { |
||
209 | 17 | return count($this->forks); |
|
210 | } |
||
211 | |||
212 | /** |
||
213 | * Cleanup. |
||
214 | */ |
||
215 | 1 | public function cleanup(int $sig): void |
|
216 | { |
||
217 | 1 | $this->logger->debug('received signal ['.$sig.']', [ |
|
218 | 1 | 'category' => get_class($this), |
|
219 | ]); |
||
220 | |||
221 | 1 | foreach ($this->getForks() as $id => $pid) { |
|
222 | 1 | $this->logger->debug('forward signal ['.$sig.'] to worker ['.$id.'] running with pid ['.$pid.']', [ |
|
223 | 1 | 'category' => get_class($this), |
|
224 | ]); |
||
225 | |||
226 | 1 | posix_kill($pid, $sig); |
|
227 | } |
||
228 | |||
229 | 1 | $this->exit(); |
|
230 | 1 | } |
|
231 | |||
232 | /** |
||
233 | * Start initial workers. |
||
234 | */ |
||
235 | 10 | protected function spawnInitialWorkers() |
|
236 | { |
||
237 | 10 | $this->logger->debug('spawn initial ['.$this->min_children.'] workers', [ |
|
238 | 10 | 'category' => get_class($this), |
|
239 | ]); |
||
240 | |||
241 | 10 | if (self::PM_DYNAMIC === $this->pm || self::PM_STATIC === $this->pm) { |
|
242 | 8 | for ($i = $this->count(); $i < $this->min_children; ++$i) { |
|
243 | 6 | $this->spawnWorker(); |
|
244 | } |
||
245 | } |
||
246 | 10 | } |
|
247 | |||
248 | |||
249 | /** |
||
250 | * Start minumum number of workers. |
||
251 | */ |
||
252 | 10 | protected function spawnMinimumWorkers() |
|
253 | { |
||
254 | 10 | $this->logger->debug('verify that the minimum number ['.$this->min_children.'] of workers are running', [ |
|
255 | 10 | 'category' => get_class($this), |
|
256 | ]); |
||
257 | |||
258 | 10 | for ($i = $this->count(); $i < $this->min_children; ++$i) { |
|
259 | 2 | $this->spawnWorker(); |
|
260 | } |
||
261 | 10 | } |
|
262 | |||
263 | |||
264 | /** |
||
265 | * Start worker. |
||
266 | * |
||
267 | * @see https://github.com/mongodb/mongo-php-driver/issues/828 |
||
268 | * @see https://github.com/mongodb/mongo-php-driver/issues/174 |
||
269 | */ |
||
270 | 9 | protected function spawnWorker(?ObjectId $job = null) |
|
271 | { |
||
272 | 9 | $this->logger->debug('spawn new worker', [ |
|
273 | 9 | 'category' => get_class($this), |
|
274 | ]); |
||
275 | |||
276 | 9 | $id = new ObjectId(); |
|
277 | 9 | $pid = pcntl_fork(); |
|
278 | |||
279 | 9 | if (-1 === $pid) { |
|
280 | throw new SpawnForkException('failed to spawn new worker'); |
||
281 | } |
||
282 | |||
283 | 9 | if (!$pid) { |
|
284 | $worker = $this->factory->buildWorker($id); |
||
285 | |||
286 | if (null === $job) { |
||
287 | $worker->processAll(); |
||
288 | } else { |
||
289 | $worker->processOne($job); |
||
290 | } |
||
291 | |||
292 | exit(); |
||
0 ignored issues
–
show
|
|||
293 | } |
||
294 | |||
295 | 9 | $this->forks[(string) $id] = $pid; |
|
296 | 9 | $this->logger->debug('spawned worker ['.$id.'] with pid ['.$pid.']', [ |
|
297 | 9 | 'category' => get_class($this), |
|
298 | ]); |
||
299 | |||
300 | 9 | return $pid; |
|
301 | } |
||
302 | |||
303 | /** |
||
304 | * Get forks (array of pid's). |
||
305 | */ |
||
306 | 1 | protected function getForks(): array |
|
307 | { |
||
308 | 1 | return $this->forks; |
|
309 | } |
||
310 | |||
311 | /** |
||
312 | * Main. |
||
313 | */ |
||
314 | 10 | protected function main(): void |
|
315 | { |
||
316 | 10 | while ($this->loop()) { |
|
317 | 10 | if (count($this->onhold) > 0 || !$this->loop()) { |
|
318 | 10 | $wait = MSG_IPC_NOWAIT; |
|
319 | 10 | usleep(200); |
|
320 | 10 | $this->processLocalQueue(); |
|
321 | } else { |
||
322 | 2 | $wait = 0; |
|
323 | } |
||
324 | |||
325 | 10 | if (msg_receive($this->queue, 0, $type, 16384, $msg, true, $wait)) { |
|
326 | 5 | $this->logger->debug('received systemv message type ['.$type.']', [ |
|
327 | 5 | 'category' => get_class($this), |
|
328 | ]); |
||
329 | |||
330 | switch ($type) { |
||
331 | 5 | case self::TYPE_JOB: |
|
332 | 4 | $this->handleJob($msg); |
|
333 | |||
334 | 4 | break; |
|
335 | 1 | case self::TYPE_EVENT: |
|
336 | 1 | $this->handleEvent($msg); |
|
337 | |||
338 | 1 | break; |
|
339 | default: |
||
340 | $this->logger->warning('received unknown systemv message type ['.$type.']', [ |
||
341 | 'category' => get_class($this), |
||
342 | ]); |
||
343 | } |
||
344 | } |
||
345 | } |
||
346 | 10 | } |
|
347 | |||
348 | /** |
||
349 | * Handle events. |
||
350 | */ |
||
351 | 1 | protected function handleEvent(array $event): self |
|
352 | { |
||
353 | 1 | $this->logger->debug('handle event ['.$event['status'].'] for job ['.$event['job'].']', [ |
|
354 | 1 | 'category' => get_class($this), |
|
355 | ]); |
||
356 | |||
357 | 1 | switch ($event['status']) { |
|
358 | 1 | case JobInterface::STATUS_PROCESSING: |
|
359 | $this->job_map[(string) $event['worker']] = $event['job']; |
||
360 | |||
361 | return $this; |
||
362 | 1 | case JobInterface::STATUS_DONE: |
|
363 | 1 | case JobInterface::STATUS_FAILED: |
|
364 | 1 | case JobInterface::STATUS_TIMEOUT: |
|
365 | $worker = array_search((string) $event['job'], $this->job_map); |
||
366 | if (false === $worker) { |
||
367 | return $this; |
||
368 | } |
||
369 | |||
370 | unset($this->job_map[$worker]); |
||
371 | |||
372 | return $this; |
||
373 | |||
374 | break; |
||
0 ignored issues
–
show
break is not strictly necessary here and could be removed.
The switch ($x) {
case 1:
return 'foo';
break; // This break is not necessary and can be left off.
}
If you would like to keep this construct to be consistent with other ![]() |
|||
375 | 1 | case JobInterface::STATUS_CANCELED: |
|
376 | 1 | $worker = array_search($event['job'], $this->job_map); |
|
377 | 1 | if (false === $worker) { |
|
378 | return $this; |
||
379 | } |
||
380 | |||
381 | 1 | $this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$worker.']', [ |
|
382 | 1 | 'category' => get_class($this), |
|
383 | ]); |
||
384 | |||
385 | 1 | if (isset($this->forks[(string) $worker])) { |
|
386 | 1 | $this->logger->debug('found running worker ['.$worker.'] on this queue node, terminate it now', [ |
|
387 | 1 | 'category' => get_class($this), |
|
388 | ]); |
||
389 | |||
390 | 1 | unset($this->job_map[(string) $worker]); |
|
391 | 1 | posix_kill($this->forks[(string) $worker], SIGKILL); |
|
392 | } |
||
393 | |||
394 | 1 | return $this; |
|
395 | default: |
||
396 | $this->logger->warning('received event ['.$event['_id'].'] with unknown status ['.$event['status'].']', [ |
||
397 | 'category' => get_class($this), |
||
398 | ]); |
||
399 | |||
400 | return $this; |
||
401 | } |
||
402 | } |
||
403 | |||
404 | /** |
||
405 | * Process onhold (only used if pm === ondemand or for postponed FORCE_SPAWN jobs). |
||
406 | */ |
||
407 | 10 | protected function processLocalQueue(): self |
|
408 | { |
||
409 | 10 | foreach ($this->onhold as $id => $job) { |
|
410 | 1 | if ($job['options']['at'] <= time() && ($this->count() < $this->max_children || true === $job['options']['force_spawn'])) { |
|
411 | 1 | $this->logger->debug('hold ondemand job ['.$id.'] may no be executed', [ |
|
412 | 1 | 'category' => get_class($this), |
|
413 | ]); |
||
414 | |||
415 | 1 | unset($this->onhold[$id]); |
|
416 | 1 | $this->spawnWorker($job['_id']); |
|
417 | } |
||
418 | } |
||
419 | |||
420 | 10 | return $this; |
|
421 | } |
||
422 | |||
423 | /** |
||
424 | * Handle job. |
||
425 | */ |
||
426 | 4 | protected function handleJob(array $job): self |
|
427 | { |
||
428 | 4 | if (true === $job['options'][Scheduler::OPTION_FORCE_SPAWN]) { |
|
429 | 2 | if ($job['options']['at'] > time()) { |
|
430 | 1 | $this->logger->debug('found postponed job ['.$job['_id'].'] with force_spawn, keep in local queue', [ |
|
431 | 1 | 'category' => get_class($this), |
|
432 | ]); |
||
433 | |||
434 | 1 | $this->onhold[(string) $job['_id']] = $job; |
|
435 | |||
436 | 1 | return $this; |
|
437 | } |
||
438 | |||
439 | 1 | $this->logger->debug('job ['.$job['_id'].'] deployed with force_spawn, spawn new worker', [ |
|
440 | 1 | 'category' => get_class($this), |
|
441 | ]); |
||
442 | |||
443 | 1 | $this->spawnWorker($job['_id']); |
|
444 | |||
445 | 1 | return $this; |
|
446 | } |
||
447 | |||
448 | 2 | if (self::PM_ONDEMAND === $this->pm) { |
|
449 | 1 | if ($job['options']['at'] > time()) { |
|
450 | $this->logger->debug('found ondemand postponed job ['.$job['_id'].'], keep in local queue', [ |
||
451 | 'category' => get_class($this), |
||
452 | ]); |
||
453 | |||
454 | $this->onhold[(string) $job['_id']] = $job; |
||
455 | |||
456 | return $this; |
||
457 | } |
||
458 | |||
459 | 1 | if ($this->count() < $this->max_children) { |
|
460 | 1 | $this->spawnWorker($job['_id']); |
|
461 | } else { |
||
462 | $this->onhold[(string) $job['_id']] = $job; |
||
463 | } |
||
464 | |||
465 | 1 | return $this; |
|
466 | } |
||
467 | |||
468 | 1 | if ($this->count() < $this->max_children && self::PM_DYNAMIC === $this->pm) { |
|
469 | 1 | $this->logger->debug('max_children ['.$this->max_children.'] workers not reached ['.$this->count().'], spawn new worker', [ |
|
470 | 1 | 'category' => get_class($this), |
|
471 | ]); |
||
472 | |||
473 | 1 | $this->spawnWorker(); |
|
474 | |||
475 | 1 | return $this; |
|
476 | } |
||
477 | |||
478 | $this->logger->debug('max children ['.$this->max_children.'] reached for job ['.$job['_id'].'], do not spawn new worker', [ |
||
479 | 'category' => get_class($this), |
||
480 | ]); |
||
481 | |||
482 | return $this; |
||
483 | } |
||
484 | |||
485 | /** |
||
486 | * Catch signals and cleanup. |
||
487 | */ |
||
488 | 10 | protected function catchSignal(): self |
|
489 | { |
||
490 | 10 | pcntl_async_signals(true); |
|
491 | 10 | pcntl_signal(SIGTERM, [$this, 'cleanup']); |
|
492 | 10 | pcntl_signal(SIGINT, [$this, 'cleanup']); |
|
493 | 10 | pcntl_signal(SIGCHLD, [$this, 'exitWorker']); |
|
494 | |||
495 | 10 | return $this; |
|
496 | } |
||
497 | } |
||
498 |
In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.