Completed
Branch dev (6d5d8c)
by Raffael
22:30 queued 21:18
created

Queue::catchSignal()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 8
ccs 6
cts 6
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Log\LoggerInterface;
19
use TaskScheduler\Exception\InvalidArgumentException;
20
use TaskScheduler\Exception\QueueRuntimeException;
21
22
class Queue extends AbstractHandler
23
{
24
    /**
25
     * Process identifier.
26
     */
27
    public const MAIN_PROCESS = 'main';
28
29
    /**
30
     * Queue options.
31
     */
32
    public const OPTION_PM = 'pm';
33
    public const OPTION_MAX_CHILDREN = 'max_children';
34
    public const OPTION_MIN_CHILDREN = 'min_children';
35
36
    /**
37
     * Process handling.
38
     */
39
    public const PM_DYNAMIC = 'dynamic';
40
    public const PM_STATIC = 'static';
41
    public const PM_ONDEMAND = 'ondemand';
42
43
    /**
44
     * Process management.
45
     *
46
     * @var string
47
     */
48
    protected $pm = self::PM_DYNAMIC;
49
50
    /**
51
     * Scheduler.
52
     *
53
     * @var Scheduler
54
     */
55
    protected $scheduler;
56
57
    /**
58
     * Database.
59
     *
60
     * @var Database
61
     */
62
    protected $db;
63
64
    /**
65
     * Logger.
66
     *
67
     * @var LoggerInterface
68
     */
69
    protected $logger;
70
71
    /**
72
     * Max children.
73
     *
74
     * @var int
75
     */
76
    protected $max_children = 2;
77
78
    /**
79
     * Min children.
80
     *
81
     * @var int
82
     */
83
    protected $min_children = 1;
84
85
    /**
86
     * Forks.
87
     *
88
     * @var array
89
     */
90
    protected $forks = [];
91
92
    /**
93
     * Worker/Job mapping.
94
     *
95
     * @var array
96
     */
97
    protected $job_map = [];
98
99
    /**
100
     * Worker factory.
101
     *
102
     * @var WorkerFactoryInterface
103
     */
104
    protected $factory;
105
106
    /**
107
     * Jobs queue.
108
     *
109
     * @var MessageQueue
110
     */
111
    protected $jobs;
112
113
    /**
114
     * Events queue.
115
     *
116
     * @var MessageQueue
117
     */
118
    protected $events;
119
120
    /**
121
     * Main process name.
122
     *
123
     * @var string
124
     */
125
    protected $process;
126
127
    /**
128
     * Init queue.
129
     */
130 13
    public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger, array $config = [])
131
    {
132 13
        $this->scheduler = $scheduler;
133 13
        $this->db = $db;
134 13
        $this->logger = $logger;
135 13
        $this->setOptions($config);
136 13
        $this->process = self::MAIN_PROCESS;
137 13
        $this->factory = $factory;
138
139 13
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
140 13
        $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger);
141 13
    }
142
143
    /**
144
     * Set options.
145
     */
146 13
    public function setOptions(array $config = []): self
147
    {
148 13
        foreach ($config as $option => $value) {
149
            switch ($option) {
150 10
                case self::OPTION_MAX_CHILDREN:
151 9
                case self::OPTION_MIN_CHILDREN:
152 6
                    if (!is_int($value)) {
153 2
                        throw new InvalidArgumentException($option.' needs to be an integer');
154
                    }
155
156 4
                    $this->{$option} = $value;
157
158 4
                break;
159 5
                case self::OPTION_PM:
160 4
                    if (!defined('self::PM_'.strtoupper($value))) {
161 1
                        throw new InvalidArgumentException($value.' is not a valid process handling type (static, dynamic, ondemand)');
162
                    }
163
164 3
                    $this->{$option} = $value;
165
166 3
                break;
167
                default:
168 7
                    throw new InvalidArgumentException('invalid option '.$option.' given');
169
            }
170
        }
171
172 13
        if ($this->min_children > $this->max_children) {
173 1
            throw new InvalidArgumentException('option min_children must not be greater than option max_children');
174
        }
175
176 13
        return $this;
177
    }
178
179
    /**
180
     * Startup (blocking process).
181
     */
182 8
    public function process(): void
183
    {
184
        try {
185 8
            $this->spawnInitialWorkers();
186 8
            $this->main();
187
        } catch (\Exception $e) {
188
            $this->logger->error('main() throw an exception, cleanup and exit', [
189
                'class' => get_class($this),
190
                'exception' => $e,
191
                'pm' => $this->process,
192
            ]);
193
194
            $this->cleanup(SIGTERM);
195
        }
196 8
    }
197
198
    /**
199
     * Wait for child and terminate.
200
     */
201 7
    public function exitChild(int $sig, array $pid): self
202
    {
203 7
        $this->logger->debug('worker ['.$pid['pid'].'] exit with ['.$sig.']', [
204 7
            'category' => get_class($this),
205 7
            'pm' => $this->process,
206
        ]);
207
208 7
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
209
210 7
        foreach ($this->forks as $id => $pid) {
211 7
            if ($pid === $pid['pi']) {
212
                unset($this->forks[$id]);
213
214
                if (isset($this->job_map[$id])) {
215 7
                    unset($this->job_map[$id]);
216
                }
217
            }
218
        }
219
220 7
        return $this;
221
    }
222
223
    /**
224
     * Count children.
225
     */
226 8
    public function count(): int
227
    {
228 8
        return count($this->forks);
229
    }
230
231
    /**
232
     * Cleanup.
233
     */
234 1
    public function cleanup(int $sig): void
235
    {
236 1
        $this->logger->debug('received signal ['.$sig.']', [
237 1
            'category' => get_class($this),
238 1
            'pm' => $this->process,
239
        ]);
240
241 1
        foreach ($this->getForks() as $id => $pid) {
242 1
            $this->logger->debug('forward signal ['.$sig.'] to worker ['.$id.'] running with pid ['.$pid.']', [
243 1
                'category' => get_class($this),
244 1
                'pm' => $this->process,
245
            ]);
246
247 1
            posix_kill($pid, $sig);
248
        }
249
250 1
        $this->exit();
251 1
    }
252
253
    /**
254
     * Start initial workers.
255
     */
256 8
    protected function spawnInitialWorkers()
257
    {
258 8
        $this->logger->debug('spawn initial ['.$this->min_children.'] workers', [
259 8
            'category' => get_class($this),
260 8
            'pm' => $this->process,
261
        ]);
262
263 8
        if (self::PM_DYNAMIC === $this->pm || self::PM_STATIC === $this->pm) {
264 6
            for ($i = $this->count(); $i < $this->min_children; ++$i) {
265 6
                $this->spawnWorker();
266
            }
267
        }
268 8
    }
269
270
    /**
271
     * Start worker.
272
     *
273
     * @see https://github.com/mongodb/mongo-php-driver/issues/828
274
     * @see https://github.com/mongodb/mongo-php-driver/issues/174
275
     */
276 7
    protected function spawnWorker()
277
    {
278 7
        $id = new ObjectId();
279 7
        $pid = pcntl_fork();
280
281 7
        if (-1 === $pid) {
282
            throw new QueueRuntimeException('failed to spawn new worker');
283
        }
284
285 7
        $this->forks[(string) $id] = $pid;
286
287 7
        if (!$pid) {
288
            $this->factory->build($id)->start();
289
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
290
        }
291
292 7
        $this->logger->debug('spawn worker ['.$id.'] with pid ['.$pid.']', [
293 7
            'category' => get_class($this),
294 7
            'pm' => $this->process,
295
        ]);
296
297 7
        return $pid;
298
    }
299
300
    /**
301
     * Get forks (array of pid's).
302
     */
303 1
    protected function getForks(): array
304
    {
305 1
        return $this->forks;
306
    }
307
308
    /**
309
     * Fork handling, blocking process.
310
     */
311 8
    protected function main(): void
312
    {
313 8
        $cursor_jobs = $this->jobs->getCursor([
314 8
            '$or' => [
315
                ['status' => JobInterface::STATUS_WAITING],
316
                ['status' => JobInterface::STATUS_POSTPONED],
317
            ],
318
        ]);
319
320 8
        $cursor_events = $this->events->getCursor([
321 8
            'timestamp' => ['$gte' => new UTCDateTime()],
322
            'status' => ['$gt' => JobInterface::STATUS_POSTPONED],
323
        ]);
324
325 8
        $this->catchSignal();
326
327 8
        while ($this->loop()) {
328 8
            $event = $cursor_events->current();
329 8
            $this->events->next($cursor_events, function () {
330
                $this->main();
331 8
            });
332
333 8
            if (null === $event) {
334 8
                if ($cursor_events->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

334
                if ($cursor_events->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
335
                    $this->logger->error('event queue cursor is dead, is it a capped collection?', [
336
                        'category' => get_class($this),
337
                        'pm' => $this->process,
338
                    ]);
339
340
                    $this->events->create();
341
342
                    $this->main();
343
344 8
                    break;
345
                }
346
            } else {
347
                $this->handleEvent($event);
348
            }
349
350 8
            if (null === $cursor_jobs->current()) {
351 5
                if ($cursor_jobs->getInnerIterator()->isDead()) {
352
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
353
                        'category' => get_class($this),
354
                        'pm' => $this->process,
355
                    ]);
356
357
                    $this->jobs->create();
358
359
                    $this->main();
360
361
                    break;
362
                }
363
364 5
                $this->jobs->next($cursor_jobs, function () {
365
                    $this->main();
366 5
                });
367
368 5
                continue;
369
            }
370
371 3
            $job = $cursor_jobs->current();
372 3
            $this->jobs->next($cursor_jobs, function () {
373
                $this->main();
374 3
            });
375
376 3
            $this->manageChildren($job);
377
        }
378 8
    }
379
380
    /**
381
     * Handle events.
382
     */
383
    protected function handleEvent(array $event): self
384
    {
385
        $this->logger->debug('handle event ['.$event['status'].'] for job ['.$event['job'].']', [
386
            'category' => get_class($this),
387
        ]);
388
389
        switch ($event['status']) {
390
            case JobInterface::STATUS_PROCESSING:
391
                $this->job_map[(string) $event['worker']] = $event['job'];
392
393
                return $this;
394
            case JobInterface::STATUS_DONE:
395
            case JobInterface::STATUS_FAILED:
396
            case JobInterface::STATUS_TIMEOUT:
397
                $worker = array_search((string) $event['job'], $this->job_map, true);
398
399
                if (false === $worker) {
400
                    return $this;
401
                }
402
403
                unset($this->job_map[$worker]);
404
405
                return $this;
406
407
            break;
0 ignored issues
show
Unused Code introduced by
break is not strictly necessary here and could be removed.

The break statement is not necessary if it is preceded for example by a return statement:

switch ($x) {
    case 1:
        return 'foo';
        break; // This break is not necessary and can be left off.
}

If you would like to keep this construct to be consistent with other case statements, you can safely mark this issue as a false-positive.

Loading history...
408
            case JobInterface::STATUS_CANCELED:
409
                $worker = array_search((string) $event['job'], $this->job_map, true);
410
411
                if (false === $worker) {
412
                    return $this;
413
                }
414
415
                $this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$worker.']', [
416
                    'category' => get_class($this),
417
                ]);
418
419
                if (isset($this->forks[(string) $worker])) {
420
                    $this->logger->debug('found running worker ['.$worker.'] on this queue node, terminate it now', [
421
                        'category' => get_class($this),
422
                    ]);
423
424
                    unset($this->job_map[(string) $event['job']]);
425
                    posix_kill($this->forks[(string) $worker], SIGKILL);
426
                }
427
428
                return $this;
429
            default:
430
                return $this;
431
        }
432
    }
433
434
    /**
435
     * Manage children.
436
     */
437 3
    protected function manageChildren(array $job): self
438
    {
439 3
        if ($this->count() < $this->max_children && self::PM_STATIC !== $this->pm) {
440 3
            $this->logger->debug('max_children ['.$this->max_children.'] workers not reached ['.$this->count().'], spawn new worker', [
441 3
                'category' => get_class($this),
442 3
                'pm' => $this->process,
443
            ]);
444
445 3
            $this->spawnWorker();
446
447 3
            return $this;
448
        }
449 2
        if (true === $job['options'][Scheduler::OPTION_IGNORE_MAX_CHILDREN]) {
450 1
            $this->logger->debug('job ['.$job['_id'].'] deployed with ignore_max_children, spawn new worker', [
451 1
                'category' => get_class($this),
452 1
                'pm' => $this->process,
453
            ]);
454
455 1
            $this->spawnWorker();
456
457 1
            return $this;
458
        }
459
460 1
        $this->logger->debug('max children ['.$this->max_children.'] reached for job ['.$job['_id'].'], do not spawn new worker', [
461 1
            'category' => get_class($this),
462 1
            'pm' => $this->process,
463
        ]);
464
465 1
        return $this;
466
    }
467
468
    /**
469
     * Catch signals and cleanup.
470
     */
471 8
    protected function catchSignal(): self
472
    {
473 8
        pcntl_async_signals(true);
474 8
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
475 8
        pcntl_signal(SIGINT, [$this, 'cleanup']);
476 8
        pcntl_signal(SIGCHLD, [$this, 'exitChild']);
477
478 8
        return $this;
479
    }
480
}
481