Completed
Pull Request — master (#7)
by Raffael
04:14
created

WorkerManager::setOptions()   B

Complexity

Conditions 8
Paths 12

Size

Total Lines 31
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 8

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 31
ccs 17
cts 17
cp 1
rs 8.4444
c 0
b 0
f 0
cc 8
nc 12
nop 1
crap 8
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\Database;
17
use Psr\Log\LoggerInterface;
18
use TaskScheduler\Exception\InvalidArgumentException;
19
use TaskScheduler\Exception\SpawnForkException;
20
21
class WorkerManager
22
{
23
    use InjectTrait;
24
25
    /**
26
     * Queue options.
27
     */
28
    public const OPTION_PM = 'pm';
29
    public const OPTION_MAX_CHILDREN = 'max_children';
30
    public const OPTION_MIN_CHILDREN = 'min_children';
31
32
    /**
33
     * Process handling.
34
     */
35
    public const PM_DYNAMIC = 'dynamic';
36
    public const PM_STATIC = 'static';
37
    public const PM_ONDEMAND = 'ondemand';
38
39
    /**
40
     * Fork handler actions.
41
     */
42
    public const TYPE_JOB = 1;
43
    public const TYPE_EVENT = 2;
44
45
    /**
46
     * Process management.
47
     *
48
     * @var string
49
     */
50
    protected $pm = self::PM_DYNAMIC;
51
52
    /**
53
     * Scheduler.
54
     *
55
     * @var Scheduler
56
     */
57
    protected $scheduler;
58
59
    /**
60
     * Database.
61
     *
62
     * @var Database
63
     */
64
    protected $db;
65
66
    /**
67
     * Logger.
68
     *
69
     * @var LoggerInterface
70
     */
71
    protected $logger;
72
73
    /**
74
     * Max children.
75
     *
76
     * @var int
77
     */
78
    protected $max_children = 2;
79
80
    /**
81
     * Min children.
82
     *
83
     * @var int
84
     */
85
    protected $min_children = 1;
86
87
    /**
88
     * Forks.
89
     *
90
     * @var array
91
     */
92
    protected $forks = [];
93
94
    /**
95
     * Worker/Job mapping.
96
     *
97
     * @var array
98
     */
99
    protected $job_map = [];
100
101
    /**
102
     * Queue (Communication between TaskScheduler\Queue and TaskScheduler\WorkerManager).
103
     *
104
     * @var resource
105
     */
106
    protected $queue;
107
108
    /**
109
     * Hold queue.
110
     *
111
     * @var array
112
     */
113
    protected $onhold = [];
114
115
    /**
116
     * Init queue.
117
     */
118 13
    public function __construct(WorkerFactoryInterface $factory, LoggerInterface $logger, array $config = [])
119
    {
120 13
        $this->logger = $logger;
121 13
        $this->setOptions($config);
122 13
        $this->factory = $factory;
0 ignored issues
show
Bug Best Practice introduced by
The property factory does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
123 13
    }
124
125
    /**
126
     * Set options.
127
     */
128 13
    public function setOptions(array $config = []): self
129
    {
130 13
        foreach ($config as $option => $value) {
131
            switch ($option) {
132 12
                case self::OPTION_MAX_CHILDREN:
133 11
                case self::OPTION_MIN_CHILDREN:
134 8
                    if (!is_int($value)) {
135 2
                        throw new InvalidArgumentException($option.' needs to be an integer');
136
                    }
137
138 6
                    $this->{$option} = $value;
139
140 6
                break;
141 7
                case self::OPTION_PM:
142 6
                    if (!defined('self::PM_'.strtoupper($value))) {
143 1
                        throw new InvalidArgumentException($value.' is not a valid process handling type (static, dynamic, ondemand)');
144
                    }
145
146 5
                    $this->{$option} = $value;
147
148 5
                break;
149
                default:
150 9
                    throw new InvalidArgumentException('invalid option '.$option.' given');
151
            }
152
        }
153
154 13
        if ($this->min_children > $this->max_children) {
155 1
            throw new InvalidArgumentException('option min_children must not be greater than option max_children');
156
        }
157
158 13
        return $this;
159
    }
160
161
    /**
162
     * Startup (blocking process).
163
     */
164 8
    public function process(): void
165
    {
166 8
        $this->queue = msg_get_queue(ftok(__DIR__.DIRECTORY_SEPARATOR.'Queue.php', 't'));
167 8
        $this->catchSignal();
168 8
        $this->spawnInitialWorkers();
169 8
        $this->main();
170 8
    }
171
172
    /**
173
     * Wait for child and terminate.
174
     */
175 7
    public function exitWorker(int $sig, array $pid): self
176
    {
177 7
        $this->logger->debug('worker ['.$pid['pid'].'] exit with ['.$sig.']', [
178 7
            'category' => get_class($this),
179
        ]);
180
181 7
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
182
183 7
        foreach ($this->forks as $id => $process) {
184 5
            if ($process === $pid['pid']) {
185 3
                unset($this->forks[$id]);
186
187 3
                if (isset($this->job_map[$id])) {
188 5
                    unset($this->job_map[$id]);
189
                }
190
            }
191
        }
192
193 7
        return $this;
194
    }
195
196
    /**
197
     * Count children.
198
     */
199 8
    public function count(): int
200
    {
201 8
        return count($this->forks);
202
    }
203
204
    /**
205
     * Cleanup.
206
     */
207 1
    public function cleanup(int $sig): void
208
    {
209 1
        $this->logger->debug('received signal ['.$sig.']', [
210 1
            'category' => get_class($this),
211
        ]);
212
213 1
        foreach ($this->getForks() as $id => $pid) {
214 1
            $this->logger->debug('forward signal ['.$sig.'] to worker ['.$id.'] running with pid ['.$pid.']', [
215 1
                'category' => get_class($this),
216
            ]);
217
218 1
            posix_kill($pid, $sig);
219
        }
220
221 1
        $this->exit();
222 1
    }
223
224
    /**
225
     * Start initial workers.
226
     */
227 8
    protected function spawnInitialWorkers()
228
    {
229 8
        $this->logger->debug('spawn initial ['.$this->min_children.'] workers', [
230 8
            'category' => get_class($this),
231
        ]);
232
233 8
        if (self::PM_DYNAMIC === $this->pm || self::PM_STATIC === $this->pm) {
234 6
            for ($i = $this->count(); $i < $this->min_children; ++$i) {
235 5
                $this->spawnWorker();
236
            }
237
        }
238 8
    }
239
240
    /**
241
     * Start worker.
242
     *
243
     * @see https://github.com/mongodb/mongo-php-driver/issues/828
244
     * @see https://github.com/mongodb/mongo-php-driver/issues/174
245
     */
246 7
    protected function spawnWorker(?ObjectId $job = null)
247
    {
248 7
        $this->logger->debug('spawn new worker', [
249 7
            'category' => get_class($this),
250
        ]);
251
252 7
        $id = new ObjectId();
253 7
        $pid = pcntl_fork();
254
255 7
        if (-1 === $pid) {
256
            throw new SpawnForkException('failed to spawn new worker');
257
        }
258
259 7
        if (!$pid) {
260
            $worker = $this->factory->buildWorker($id);
261
262
            if (null === $job) {
263
                $worker->processAll();
264
            } else {
265
                $worker->processOne($job);
266
            }
267
268
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
269
        }
270
271 7
        $this->forks[(string) $id] = $pid;
272 7
        $this->logger->debug('spawned worker ['.$id.'] with pid ['.$pid.']', [
273 7
            'category' => get_class($this),
274
        ]);
275
276 7
        return $pid;
277
    }
278
279
    /**
280
     * Get forks (array of pid's).
281
     */
282 1
    protected function getForks(): array
283
    {
284 1
        return $this->forks;
285
    }
286
287
    /**
288
     * Main.
289
     */
290 8
    protected function main(): void
291
    {
292 8
        while ($this->loop()) {
293 8
            if (count($this->onhold) > 0 || !$this->loop()) {
294 8
                $wait = MSG_IPC_NOWAIT;
295 8
                usleep(200);
296 8
                $this->processLocalQueue();
297
            } else {
298 1
                $wait = 0;
299
            }
300
301 8
            if (msg_receive($this->queue, 0, $type, 16384, $msg, true, $wait)) {
302 3
                $this->logger->debug('received systemv message type ['.$type.']', [
303 3
                    'category' => get_class($this),
304
                ]);
305
306
                switch ($type) {
307 3
                    case self::TYPE_JOB:
308 3
                        $this->handleJob($msg);
309
310 3
                    break;
311
                    case self::TYPE_EVENT:
312
                        $this->handleEvent($msg);
313
314
                    break;
315
                    default:
316
                        $this->logger->warning('received unknown systemv message type ['.$type.']', [
317
                            'category' => get_class($this),
318
                        ]);
319
                }
320
            }
321
        }
322 8
    }
323
324
    /**
325
     * Handle events.
326
     */
327
    protected function handleEvent(array $event): self
328
    {
329
        $this->logger->debug('handle event ['.$event['status'].'] for job ['.$event['job'].']', [
330
            'category' => get_class($this),
331
        ]);
332
333
        switch ($event['status']) {
334
            case JobInterface::STATUS_PROCESSING:
335
                $this->job_map[(string) $event['worker']] = $event['job'];
336
337
                return $this;
338
            case JobInterface::STATUS_DONE:
339
            case JobInterface::STATUS_FAILED:
340
            case JobInterface::STATUS_TIMEOUT:
341
                $worker = array_search((string) $event['job'], $this->job_map);
342
                if (false === $worker) {
343
                    return $this;
344
                }
345
346
                unset($this->job_map[$worker]);
347
348
                return $this;
349
350
            break;
0 ignored issues
show
Unused Code introduced by
break is not strictly necessary here and could be removed.

The break statement is not necessary if it is preceded for example by a return statement:

switch ($x) {
    case 1:
        return 'foo';
        break; // This break is not necessary and can be left off.
}

If you would like to keep this construct to be consistent with other case statements, you can safely mark this issue as a false-positive.

Loading history...
351
            case JobInterface::STATUS_CANCELED:
352
                $worker = array_search($event['job'], $this->job_map);
353
                if (false === $worker) {
354
                    return $this;
355
                }
356
357
                $this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$worker.']', [
358
                    'category' => get_class($this),
359
                ]);
360
361
                if (isset($this->forks[(string) $worker])) {
362
                    $this->logger->debug('found running worker ['.$worker.'] on this queue node, terminate it now', [
363
                        'category' => get_class($this),
364
                    ]);
365
366
                    unset($this->job_map[(string) $worker]);
367
                    posix_kill($this->forks[(string) $worker], SIGKILL);
368
                }
369
370
                return $this;
371
            default:
372
                $this->logger->warning('received event ['.$event['_id'].'] with unknown status ['.$event['status'].']', [
373
                    'category' => get_class($this),
374
                ]);
375
376
                return $this;
377
        }
378
    }
379
380
    /**
381
     * Process onhold (only used if pm === ondemand or for postponed FORCE_SPAWN jobs).
382
     */
383 8
    protected function processLocalQueue(): self
384
    {
385 8
        foreach ($this->onhold as $id => $job) {
386
            if ($job['options']['at'] <= time() && ($this->count() < $this->max_children || true === $job['options']['force_spawn'])) {
387
                $this->logger->debug('hold ondemand job ['.$id.'] may no be executed', [
388
                    'category' => get_class($this),
389
                ]);
390
391
                unset($this->onhold[$id]);
392
                $this->spawnWorker($job['_id']);
393
            }
394
        }
395
396 8
        return $this;
397
    }
398
399
    /**
400
     * Handle job.
401
     */
402 3
    protected function handleJob(array $job): self
403
    {
404 3
        if (true === $job['options'][Scheduler::OPTION_FORCE_SPAWN]) {
405 1
            if ($job['options']['at'] > time()) {
406
                $this->logger->debug('found postponed job ['.$job['_id'].'] with force_spawn, keep in local queue', [
407
                    'category' => get_class($this),
408
                ]);
409
410
                $this->onhold[(string) $job['_id']] = $job;
411
412
                return $this;
413
            }
414
415 1
            $this->logger->debug('job ['.$job['_id'].'] deployed with force_spawn, spawn new worker', [
416 1
                'category' => get_class($this),
417
            ]);
418
419 1
            $this->spawnWorker($job['_id']);
420
421 1
            return $this;
422
        }
423
424 2
        if (self::PM_ONDEMAND === $this->pm) {
425 1
            if ($job['options']['at'] > time()) {
426
                $this->logger->debug('found ondemand postponed job ['.$job['_id'].'], keep in local queue', [
427
                    'category' => get_class($this),
428
                ]);
429
430
                $this->onhold[(string) $job['_id']] = $job;
431
432
                return $this;
433
            }
434
435 1
            if ($this->count() < $this->max_children) {
436 1
                $this->spawnWorker($job['_id']);
437
            } else {
438
                $this->onhold[(string) $job['_id']] = $job;
439
            }
440
441 1
            return $this;
442
        }
443
444 1
        if ($this->count() < $this->max_children && self::PM_DYNAMIC === $this->pm) {
445 1
            $this->logger->debug('max_children ['.$this->max_children.'] workers not reached ['.$this->count().'], spawn new worker', [
446 1
                'category' => get_class($this),
447
            ]);
448
449 1
            $this->spawnWorker();
450
451 1
            return $this;
452
        }
453
454
        $this->logger->debug('max children ['.$this->max_children.'] reached for job ['.$job['_id'].'], do not spawn new worker', [
455
            'category' => get_class($this),
456
        ]);
457
458
        return $this;
459
    }
460
461
    /**
462
     * Catch signals and cleanup.
463
     */
464 8
    protected function catchSignal(): self
465
    {
466 8
        pcntl_async_signals(true);
467 8
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
468 8
        pcntl_signal(SIGINT, [$this, 'cleanup']);
469 8
        pcntl_signal(SIGCHLD, [$this, 'exitWorker']);
470
471 8
        return $this;
472
    }
473
}
474