Completed
Branch dev (31fb20)
by Raffael
03:39
created

Queue::spawnWorker()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 22
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3.1105

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 22
ccs 10
cts 13
cp 0.7692
rs 9.8666
c 0
b 0
f 0
cc 3
nc 3
nop 0
crap 3.1105
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Log\LoggerInterface;
19
use TaskScheduler\Exception\InvalidArgumentException;
20
use TaskScheduler\Exception\QueueRuntimeException;
21
22
class Queue extends AbstractHandler
23
{
24
    /**
25
     * Process identifier.
26
     */
27
    public const MAIN_PROCESS = 'main';
28
29
    /**
30
     * Queue options.
31
     */
32
    public const OPTION_PM = 'pm';
33
    public const OPTION_MAX_CHILDREN = 'max_children';
34
    public const OPTION_MIN_CHILDREN = 'min_children';
35
36
    /**
37
     * Process handling.
38
     */
39
    public const PM_DYNAMIC = 'dynamic';
40
    public const PM_STATIC = 'static';
41
    public const PM_ONDEMAND = 'ondemand';
42
43
    /**
44
     * Process management.
45
     *
46
     * @var string
47
     */
48
    protected $pm = self::PM_DYNAMIC;
49
50
    /**
51
     * Scheduler.
52
     *
53
     * @var Scheduler
54
     */
55
    protected $scheduler;
56
57
    /**
58
     * Database.
59
     *
60
     * @var Database
61
     */
62
    protected $db;
63
64
    /**
65
     * Logger.
66
     *
67
     * @var LoggerInterface
68
     */
69
    protected $logger;
70
71
    /**
72
     * Max children.
73
     *
74
     * @var int
75
     */
76
    protected $max_children = 2;
77
78
    /**
79
     * Min children.
80
     *
81
     * @var int
82
     */
83
    protected $min_children = 1;
84
85
    /**
86
     * Forks.
87
     *
88
     * @var array
89
     */
90
    protected $forks = [];
91
92
    /**
93
     * Worker/Job mapping.
94
     *
95
     * @var array
96
     */
97
    protected $job_map = [];
98
99
    /**
100
     * Worker factory.
101
     *
102
     * @var WorkerFactoryInterface
103
     */
104
    protected $factory;
105
106
    /**
107
     * Jobs queue.
108
     *
109
     * @var MessageQueue
110
     */
111
    protected $jobs;
112
113
    /**
114
     * Events queue.
115
     *
116
     * @var MessageQueue
117
     */
118
    protected $events;
119
120
    /**
121
     * Main process name.
122
     *
123
     * @var string
124
     */
125
    protected $process;
126
127
    /**
128
     * Init queue.
129
     */
130 13
    public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger, array $config = [])
131
    {
132 13
        $this->scheduler = $scheduler;
133 13
        $this->db = $db;
134 13
        $this->logger = $logger;
135 13
        $this->setOptions($config);
136 13
        $this->process = self::MAIN_PROCESS;
137 13
        $this->factory = $factory;
138
139 13
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
140 13
        $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger);
141 13
    }
142
143
    /**
144
     * Set options.
145
     *
146
     *
147
     * @return Queue
148
     */
149 13
    public function setOptions(array $config = []): self
150
    {
151 13
        foreach ($config as $option => $value) {
152
            switch ($option) {
153 10
                case self::OPTION_MAX_CHILDREN:
154 9
                case self::OPTION_MIN_CHILDREN:
155 6
                    if (!is_int($value)) {
156 2
                        throw new InvalidArgumentException($option.' needs to be an integer');
157
                    }
158
159 4
                    $this->{$option} = $value;
160
161 4
                break;
162 5
                case self::OPTION_PM:
163 4
                    if (!defined('self::PM_'.strtoupper($value))) {
164 1
                        throw new InvalidArgumentException($value.' is not a valid process handling type (static, dynamic, ondemand)');
165
                    }
166
167 3
                    $this->{$option} = $value;
168
169 3
                break;
170
                default:
171 7
                    throw new InvalidArgumentException('invalid option '.$option.' given');
172
            }
173
        }
174
175 13
        if ($this->min_children > $this->max_children) {
176 1
            throw new InvalidArgumentException('option min_children must not be greater than option max_children');
177
        }
178
179 13
        return $this;
180
    }
181
182
    /**
183
     * Startup (blocking process).
184
     */
185 8
    public function process(): void
186
    {
187
        try {
188 8
            $this->spawnInitialWorkers();
189 8
            $this->main();
190
        } catch (\Exception $e) {
191
            $this->logger->error('main() throw an exception, cleanup and exit', [
192
                'class' => get_class($this),
193
                'exception' => $e,
194
                'pm' => $this->process,
195
            ]);
196
197
            $this->cleanup(SIGTERM);
198
        }
199 8
    }
200
201
    /**
202
     * Wait for child and terminate.
203
     */
204 6
    public function exitChild(int $sig, array $pid): self
205
    {
206 6
        $this->logger->debug('worker ['.$pid['pid'].'] exit with ['.$sig.']', [
207 6
            'category' => get_class($this),
208 6
            'pm' => $this->process,
209
        ]);
210
211 6
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
212
213 6
        foreach ($this->forks as $id => $pid) {
214 5
            if ($pid === $pid['pi']) {
215
                unset($this->forks[$id]);
216
217
                if (isset($this->jobs[$id])) {
218 5
                    unset($this->jobs[$id]);
219
                }
220
            }
221
        }
222
223 6
        return $this;
224
    }
225
226
    /**
227
     * Count children.
228
     */
229 8
    public function count(): int
230
    {
231 8
        return count($this->forks);
232
    }
233
234
    /**
235
     * Cleanup.
236
     */
237 1
    public function cleanup(int $sig): void
238
    {
239 1
        $this->logger->debug('received signal ['.$sig.']', [
240 1
            'category' => get_class($this),
241 1
            'pm' => $this->process,
242
        ]);
243
244 1
        foreach ($this->getForks() as $id => $pid) {
245 1
            $this->logger->debug('forward signal ['.$sig.'] to worker ['.$id.'] running with pid ['.$pid.']', [
246 1
                'category' => get_class($this),
247 1
                'pm' => $this->process,
248
            ]);
249
250 1
            posix_kill($pid, $sig);
251
        }
252
253 1
        $this->exit();
254 1
    }
255
256
    /**
257
     * Start initial workers.
258
     */
259 8
    protected function spawnInitialWorkers()
260
    {
261 8
        $this->logger->debug('spawn initial ['.$this->min_children.'] workers', [
262 8
            'category' => get_class($this),
263 8
            'pm' => $this->process,
264
        ]);
265
266 8
        if (self::PM_DYNAMIC === $this->pm || self::PM_STATIC === $this->pm) {
267 6
            for ($i = $this->count(); $i < $this->min_children; ++$i) {
268 6
                $this->spawnWorker();
269
            }
270
        }
271 8
    }
272
273
    /**
274
     * Start worker.
275
     *
276
     * @see https://github.com/mongodb/mongo-php-driver/issues/828
277
     * @see https://github.com/mongodb/mongo-php-driver/issues/174
278
     */
279 7
    protected function spawnWorker()
280
    {
281 7
        $id = new ObjectId();
282 7
        $pid = pcntl_fork();
283
284 7
        if (-1 === $pid) {
285
            throw new QueueRuntimeException('failed to spawn new worker');
286
        }
287
288 7
        $this->forks[(string) $id] = $pid;
289
290 7
        if (!$pid) {
291
            $this->factory->build($id)->start();
292
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
293
        }
294
295 7
        $this->logger->debug('spawn worker ['.$id.'] with pid ['.$pid.']', [
296 7
            'category' => get_class($this),
297 7
            'pm' => $this->process,
298
        ]);
299
300 7
        return $pid;
301
    }
302
303
    /**
304
     * Get forks (array of pid's).
305
     *
306
     * @return int[]
307
     */
308 1
    protected function getForks(): array
309
    {
310 1
        return $this->forks;
311
    }
312
313
    /**
314
     * Fork handling, blocking process.
315
     */
316 8
    protected function main(): void
317
    {
318 8
        $cursor_jobs = $this->jobs->getCursor([
319 8
            '$or' => [
320
                ['status' => JobInterface::STATUS_WAITING],
321
                ['status' => JobInterface::STATUS_POSTPONED],
322
            ],
323
        ]);
324
325 8
        $cursor_events = $this->events->getCursor([
326 8
            '$or' => [
327
                ['status' => JobInterface::STATUS_CANCELED],
328
                ['status' => JobInterface::STATUS_PROCESSING],
329
            ],
330 8
            'timestamp' => ['$gte' => new UTCDateTime()],
331
        ]);
332
333 8
        $this->catchSignal();
334
335 8
        while ($this->loop()) {
336 8
            $event = $cursor_events->current();
337 8
            $this->events->next($cursor_events, function () {
338
                $this->main();
339 8
            });
340
341 8
            if (null === $event) {
342 8
                if ($cursor_events->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

342
                if ($cursor_events->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
343
                    $this->logger->error('event queue cursor is dead, is it a capped collection?', [
344
                        'category' => get_class($this),
345
                        'pm' => $this->process,
346
                    ]);
347
348
                    $this->events->create();
349
350
                    $this->main();
351
352 8
                    break;
353
                }
354
            } else {
355
                $this->handleEvent($event);
356
            }
357
358 8
            if (null === $cursor_jobs->current()) {
359 5
                if ($cursor_jobs->getInnerIterator()->isDead()) {
360
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
361
                        'category' => get_class($this),
362
                        'pm' => $this->process,
363
                    ]);
364
365
                    $this->jobs->create();
366
367
                    $this->main();
368
369
                    break;
370
                }
371
372 5
                $this->jobs->next($cursor_jobs, function () {
373
                    $this->main();
374 5
                });
375
376 5
                continue;
377
            }
378
379 3
            $job = $cursor_jobs->current();
380 3
            $this->jobs->next($cursor_jobs, function () {
381
                $this->main();
382 3
            });
383
384 3
            $this->manageChildren($job);
385
        }
386 8
    }
387
388
    /**
389
     * Handle cancel event.
390
     */
391
    protected function handleEvent(array $event): self
392
    {
393
        if (JobInterface::STATUS_PROCESSING === $event['status']) {
394
            $this->job_map[(string) $event['worker']] = $event['job'];
395
396
            return $this;
397
        }
398
399
        $worker = array_search((string) $event['job'], $this->job_map, true);
400
401
        if (false === $worker) {
402
            return $this;
403
        }
404
405
        $this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$worker.']', [
406
            'category' => get_class($this),
407
        ]);
408
409
        if (isset($this->forks[(string) $worker])) {
410
            $this->logger->debug('found running worker ['.$worker.'] on this queue node, terminate it now', [
411
                'category' => get_class($this),
412
            ]);
413
414
            unset($this->job_map[(string) $event['job']]);
415
            posix_kill($this->forks[(string) $worker], SIGKILL);
416
        }
417
418
        return $this;
419
    }
420
421
    /**
422
     * Manage children.
423
     */
424 3
    protected function manageChildren(array $job): self
425
    {
426 3
        if ($this->count() < $this->max_children && self::PM_STATIC !== $this->pm) {
427 3
            $this->logger->debug('max_children ['.$this->max_children.'] workers not reached ['.$this->count().'], spawn new worker', [
428 3
                'category' => get_class($this),
429 3
                'pm' => $this->process,
430
            ]);
431
432 3
            $this->spawnWorker();
433
434 3
            return $this;
435
        }
436 2
        if (true === $job['options'][Scheduler::OPTION_IGNORE_MAX_CHILDREN]) {
437 1
            $this->logger->debug('job ['.$job['_id'].'] deployed with ignore_max_children, spawn new worker', [
438 1
                'category' => get_class($this),
439 1
                'pm' => $this->process,
440
            ]);
441
442 1
            $this->spawnWorker();
443
444 1
            return $this;
445
        }
446
447 1
        $this->logger->debug('max children ['.$this->max_children.'] reached for job ['.$job['_id'].'], do not spawn new worker', [
448 1
            'category' => get_class($this),
449 1
            'pm' => $this->process,
450
        ]);
451
452 1
        return $this;
453
    }
454
455
    /**
456
     * Catch signals and cleanup.
457
     */
458 8
    protected function catchSignal(): self
459
    {
460 8
        pcntl_async_signals(true);
461 8
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
462 8
        pcntl_signal(SIGINT, [$this, 'cleanup']);
463 8
        pcntl_signal(SIGCHLD, [$this, 'exitChild']);
464
465 8
        return $this;
466
    }
467
}
468