WorkerManager::exitWorker()   A
last analyzed

Complexity

Conditions 4
Paths 4

Size

Total Lines 20
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 4

Importance

Changes 1
Bugs 1 Features 0
Metric Value
eloc 10
c 1
b 1
f 0
dl 0
loc 20
ccs 11
cts 11
cp 1
rs 9.9332
cc 4
nc 4
nop 2
crap 4
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\Database;
17
use Psr\Log\LoggerInterface;
18
use TaskScheduler\Exception\InvalidArgumentException;
19
use TaskScheduler\Exception\SpawnForkException;
20
21
class WorkerManager
22
{
23
    use InjectTrait;
24
25
    /**
26
     * Queue options.
27
     */
28
    public const OPTION_PM = 'pm';
29
    public const OPTION_MAX_CHILDREN = 'max_children';
30
    public const OPTION_MIN_CHILDREN = 'min_children';
31
32
    /**
33
     * Process handling.
34
     */
35
    public const PM_DYNAMIC = 'dynamic';
36
    public const PM_STATIC = 'static';
37
    public const PM_ONDEMAND = 'ondemand';
38
39
    /**
40
     * Fork handler actions.
41
     */
42
    public const TYPE_JOB = 1;
43
    public const TYPE_EVENT = 2;
44
45
    /**
46
     * Process management.
47
     *
48
     * @var string
49
     */
50
    protected $pm = self::PM_DYNAMIC;
51
52
    /**
53
     * Scheduler.
54
     *
55
     * @var Scheduler
56
     */
57
    protected $scheduler;
58
59
    /**
60
     * Database.
61
     *
62
     * @var Database
63
     */
64
    protected $db;
65
66
    /**
67
     * Logger.
68
     *
69
     * @var LoggerInterface
70
     */
71
    protected $logger;
72
73
    /**
74
     * Max children.
75
     *
76
     * @var int
77
     */
78
    protected $max_children = 2;
79
80
    /**
81
     * Min children.
82
     *
83
     * @var int
84
     */
85
    protected $min_children = 1;
86
87
    /**
88
     * Forks.
89
     *
90
     * @var array
91
     */
92
    protected $forks = [];
93
94
    /**
95
     * Worker/Job mapping.
96
     *
97
     * @var array
98
     */
99
    protected $job_map = [];
100
101
    /**
102
     * Queue (Communication between TaskScheduler\Queue and TaskScheduler\WorkerManager).
103
     *
104
     * @var resource
105
     */
106
    protected $queue;
107
108
    /**
109
     * Hold queue.
110
     *
111
     * @var array
112
     */
113
    protected $onhold = [];
114
115
    /**
116
     * Worker factory.
117
     *
118
     * @var WorkerFactoryInterface
119
     */
120
    protected $factory;
121
122
    /**
123
     * Init queue.
124
     */
125 15
    public function __construct(WorkerFactoryInterface $factory, LoggerInterface $logger, array $config = [])
126
    {
127 15
        $this->logger = $logger;
128 15
        $this->setOptions($config);
129 15
        $this->factory = $factory;
130 15
    }
131
132
    /**
133
     * Set options.
134
     */
135 15
    public function setOptions(array $config = []): self
136
    {
137 15
        foreach ($config as $option => $value) {
138
            switch ($option) {
139 14
                case self::OPTION_MAX_CHILDREN:
140 13
                case self::OPTION_MIN_CHILDREN:
141 11
                    if (!is_int($value)) {
142 2
                        throw new InvalidArgumentException($option.' needs to be an integer');
143
                    }
144
145 9
                    $this->{$option} = $value;
146
147 9
                break;
148 9
                case self::OPTION_PM:
149 8
                    if (!defined('self::PM_'.strtoupper($value))) {
150 1
                        throw new InvalidArgumentException($value.' is not a valid process handling type (static, dynamic, ondemand)');
151
                    }
152
153 7
                    $this->{$option} = $value;
154
155 7
                break;
156
                default:
157 11
                    throw new InvalidArgumentException('invalid option '.$option.' given');
158
            }
159
        }
160
161 15
        if ($this->min_children > $this->max_children) {
162 1
            throw new InvalidArgumentException('option min_children must not be greater than option max_children');
163
        }
164
165 15
        return $this;
166
    }
167
168
    /**
169
     * Startup (blocking process).
170
     */
171 10
    public function process(): void
172
    {
173 10
        $this->queue = msg_get_queue(ftok(__DIR__.DIRECTORY_SEPARATOR.'Queue.php', 't'));
174 10
        $this->catchSignal();
175 10
        $this->spawnInitialWorkers();
176 10
        $this->main();
177 10
    }
178
179
    /**
180
     * Wait for child and terminate.
181
     */
182 10
    public function exitWorker(int $sig, array $pid): self
183
    {
184 10
        $this->logger->debug('worker ['.$pid['pid'].'] exit with ['.$sig.']', [
185 10
            'category' => get_class($this),
186
        ]);
187
188 10
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
189
190 10
        foreach ($this->forks as $id => $process) {
191 10
            if ($process === $pid['pid']) {
192 3
                unset($this->forks[$id]);
193
194 3
                if (isset($this->job_map[$id])) {
195 10
                    unset($this->job_map[$id]);
196
                }
197
            }
198
        }
199
200 10
        $this->spawnMinimumWorkers();
201 10
        return $this;
202
    }
203
204
    /**
205
     * Count children.
206
     */
207 17
    public function count(): int
208
    {
209 17
        return count($this->forks);
210
    }
211
212
    /**
213
     * Cleanup.
214
     */
215 1
    public function cleanup(int $sig): void
216
    {
217 1
        $this->logger->debug('received signal ['.$sig.']', [
218 1
            'category' => get_class($this),
219
        ]);
220
221 1
        foreach ($this->getForks() as $id => $pid) {
222 1
            $this->logger->debug('forward signal ['.$sig.'] to worker ['.$id.'] running with pid ['.$pid.']', [
223 1
                'category' => get_class($this),
224
            ]);
225
226 1
            posix_kill($pid, $sig);
227
        }
228
229 1
        $this->exit();
230 1
    }
231
232
    /**
233
     * Start initial workers.
234
     */
235 10
    protected function spawnInitialWorkers()
236
    {
237 10
        $this->logger->debug('spawn initial ['.$this->min_children.'] workers', [
238 10
            'category' => get_class($this),
239
        ]);
240
241 10
        if (self::PM_DYNAMIC === $this->pm || self::PM_STATIC === $this->pm) {
242 8
            for ($i = $this->count(); $i < $this->min_children; ++$i) {
243 6
                $this->spawnWorker();
244
            }
245
        }
246 10
    }
247
248
249
    /**
250
     * Start minumum number of workers.
251
     */
252 10
    protected function spawnMinimumWorkers()
253
    {
254 10
        $this->logger->debug('verify that the minimum number ['.$this->min_children.'] of workers are running', [
255 10
            'category' => get_class($this),
256
        ]);
257
258 10
        for ($i = $this->count(); $i < $this->min_children; ++$i) {
259 2
            $this->spawnWorker();
260
        }
261 10
    }
262
263
264
    /**
265
     * Start worker.
266
     *
267
     * @see https://github.com/mongodb/mongo-php-driver/issues/828
268
     * @see https://github.com/mongodb/mongo-php-driver/issues/174
269
     */
270 9
    protected function spawnWorker(?ObjectId $job = null)
271
    {
272 9
        $this->logger->debug('spawn new worker', [
273 9
            'category' => get_class($this),
274
        ]);
275
276 9
        $id = new ObjectId();
277 9
        $pid = pcntl_fork();
278
279 9
        if (-1 === $pid) {
280
            throw new SpawnForkException('failed to spawn new worker');
281
        }
282
283 9
        if (!$pid) {
284
            $worker = $this->factory->buildWorker($id);
285
286
            if (null === $job) {
287
                $worker->processAll();
288
            } else {
289
                $worker->processOne($job);
290
            }
291
292
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
293
        }
294
295 9
        $this->forks[(string) $id] = $pid;
296 9
        $this->logger->debug('spawned worker ['.$id.'] with pid ['.$pid.']', [
297 9
            'category' => get_class($this),
298
        ]);
299
300 9
        return $pid;
301
    }
302
303
    /**
304
     * Get forks (array of pid's).
305
     */
306 1
    protected function getForks(): array
307
    {
308 1
        return $this->forks;
309
    }
310
311
    /**
312
     * Main.
313
     */
314 10
    protected function main(): void
315
    {
316 10
        while ($this->loop()) {
317 10
            if (count($this->onhold) > 0 || !$this->loop()) {
318 10
                $wait = MSG_IPC_NOWAIT;
319 10
                usleep(200);
320 10
                $this->processLocalQueue();
321
            } else {
322 2
                $wait = 0;
323
            }
324
325 10
            if (msg_receive($this->queue, 0, $type, 16384, $msg, true, $wait)) {
326 5
                $this->logger->debug('received systemv message type ['.$type.']', [
327 5
                    'category' => get_class($this),
328
                ]);
329
330
                switch ($type) {
331 5
                    case self::TYPE_JOB:
332 4
                        $this->handleJob($msg);
333
334 4
                    break;
335 1
                    case self::TYPE_EVENT:
336 1
                        $this->handleEvent($msg);
337
338 1
                    break;
339
                    default:
340
                        $this->logger->warning('received unknown systemv message type ['.$type.']', [
341
                            'category' => get_class($this),
342
                        ]);
343
                }
344
            }
345
        }
346 10
    }
347
348
    /**
349
     * Handle events.
350
     */
351 1
    protected function handleEvent(array $event): self
352
    {
353 1
        $this->logger->debug('handle event ['.$event['status'].'] for job ['.$event['job'].']', [
354 1
            'category' => get_class($this),
355
        ]);
356
357 1
        switch ($event['status']) {
358 1
            case JobInterface::STATUS_PROCESSING:
359
                $this->job_map[(string) $event['worker']] = $event['job'];
360
361
                return $this;
362 1
            case JobInterface::STATUS_DONE:
363 1
            case JobInterface::STATUS_FAILED:
364 1
            case JobInterface::STATUS_TIMEOUT:
365
                $worker = array_search((string) $event['job'], $this->job_map);
366
                if (false === $worker) {
367
                    return $this;
368
                }
369
370
                unset($this->job_map[$worker]);
371
372
                return $this;
373
374
            break;
0 ignored issues
show
Unused Code introduced by
break is not strictly necessary here and could be removed.

The break statement is not necessary if it is preceded for example by a return statement:

switch ($x) {
    case 1:
        return 'foo';
        break; // This break is not necessary and can be left off.
}

If you would like to keep this construct to be consistent with other case statements, you can safely mark this issue as a false-positive.

Loading history...
375 1
            case JobInterface::STATUS_CANCELED:
376 1
                $worker = array_search($event['job'], $this->job_map);
377 1
                if (false === $worker) {
378
                    return $this;
379
                }
380
381 1
                $this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$worker.']', [
382 1
                    'category' => get_class($this),
383
                ]);
384
385 1
                if (isset($this->forks[(string) $worker])) {
386 1
                    $this->logger->debug('found running worker ['.$worker.'] on this queue node, terminate it now', [
387 1
                        'category' => get_class($this),
388
                    ]);
389
390 1
                    unset($this->job_map[(string) $worker]);
391 1
                    posix_kill($this->forks[(string) $worker], SIGKILL);
392
                }
393
394 1
                return $this;
395
            default:
396
                $this->logger->warning('received event ['.$event['_id'].'] with unknown status ['.$event['status'].']', [
397
                    'category' => get_class($this),
398
                ]);
399
400
                return $this;
401
        }
402
    }
403
404
    /**
405
     * Process onhold (only used if pm === ondemand or for postponed FORCE_SPAWN jobs).
406
     */
407 10
    protected function processLocalQueue(): self
408
    {
409 10
        foreach ($this->onhold as $id => $job) {
410 1
            if ($job['options']['at'] <= time() && ($this->count() < $this->max_children || true === $job['options']['force_spawn'])) {
411 1
                $this->logger->debug('hold ondemand job ['.$id.'] may no be executed', [
412 1
                    'category' => get_class($this),
413
                ]);
414
415 1
                unset($this->onhold[$id]);
416 1
                $this->spawnWorker($job['_id']);
417
            }
418
        }
419
420 10
        return $this;
421
    }
422
423
    /**
424
     * Handle job.
425
     */
426 4
    protected function handleJob(array $job): self
427
    {
428 4
        if (true === $job['options'][Scheduler::OPTION_FORCE_SPAWN]) {
429 2
            if ($job['options']['at'] > time()) {
430 1
                $this->logger->debug('found postponed job ['.$job['_id'].'] with force_spawn, keep in local queue', [
431 1
                    'category' => get_class($this),
432
                ]);
433
434 1
                $this->onhold[(string) $job['_id']] = $job;
435
436 1
                return $this;
437
            }
438
439 1
            $this->logger->debug('job ['.$job['_id'].'] deployed with force_spawn, spawn new worker', [
440 1
                'category' => get_class($this),
441
            ]);
442
443 1
            $this->spawnWorker($job['_id']);
444
445 1
            return $this;
446
        }
447
448 2
        if (self::PM_ONDEMAND === $this->pm) {
449 1
            if ($job['options']['at'] > time()) {
450
                $this->logger->debug('found ondemand postponed job ['.$job['_id'].'], keep in local queue', [
451
                    'category' => get_class($this),
452
                ]);
453
454
                $this->onhold[(string) $job['_id']] = $job;
455
456
                return $this;
457
            }
458
459 1
            if ($this->count() < $this->max_children) {
460 1
                $this->spawnWorker($job['_id']);
461
            } else {
462
                $this->onhold[(string) $job['_id']] = $job;
463
            }
464
465 1
            return $this;
466
        }
467
468 1
        if ($this->count() < $this->max_children && self::PM_DYNAMIC === $this->pm) {
469 1
            $this->logger->debug('max_children ['.$this->max_children.'] workers not reached ['.$this->count().'], spawn new worker', [
470 1
                'category' => get_class($this),
471
            ]);
472
473 1
            $this->spawnWorker();
474
475 1
            return $this;
476
        }
477
478
        $this->logger->debug('max children ['.$this->max_children.'] reached for job ['.$job['_id'].'], do not spawn new worker', [
479
            'category' => get_class($this),
480
        ]);
481
482
        return $this;
483
    }
484
485
    /**
486
     * Catch signals and cleanup.
487
     */
488 10
    protected function catchSignal(): self
489
    {
490 10
        pcntl_async_signals(true);
491 10
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
492 10
        pcntl_signal(SIGINT, [$this, 'cleanup']);
493 10
        pcntl_signal(SIGCHLD, [$this, 'exitWorker']);
494
495 10
        return $this;
496
    }
497
}
498