Passed
Branch dev (b1b11b)
by Raffael
03:32
created

Queue   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 456
Duplicated Lines 0 %

Test Coverage

Coverage 71.78%

Importance

Changes 0
Metric Value
wmc 43
eloc 171
dl 0
loc 456
ccs 117
cts 163
cp 0.7178
rs 8.96
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
A manageChildren() 0 24 4
A process() 0 13 2
A spawnWorker() 0 22 3
A count() 0 3 1
A handleSignal() 0 14 2
A catchSignal() 0 8 1
A cleanup() 0 4 1
A loop() 0 3 1
B setOptions() 0 31 8
A getForks() 0 3 1
A spawnInitialWorkers() 0 10 4
B main() 0 69 6
A __construct() 0 12 1
A exitChild() 0 20 4
A handleEvent() 0 28 4

How to fix   Complexity   

Complex Class

Complex classes like Queue often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Queue, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Log\LoggerInterface;
19
use TaskScheduler\Exception\InvalidArgumentException;
20
use TaskScheduler\Exception\QueueRuntimeException;
21
22
class Queue
23
{
24
    /**
25
     * Process identifier.
26
     */
27
    public const MAIN_PROCESS = 'main';
28
29
    /**
30
     * Queue options.
31
     */
32
    public const OPTION_PM = 'pm';
33
    public const OPTION_MAX_CHILDREN = 'max_children';
34
    public const OPTION_MIN_CHILDREN = 'min_children';
35
36
    /**
37
     * Process handling.
38
     */
39
    public const PM_DYNAMIC = 'dynamic';
40
    public const PM_STATIC = 'static';
41
    public const PM_ONDEMAND = 'ondemand';
42
43
    /**
44
     * Process management.
45
     *
46
     * @var string
47
     */
48
    protected $pm = self::PM_DYNAMIC;
49
50
    /**
51
     * Scheduler.
52
     *
53
     * @var Scheduler
54
     */
55
    protected $scheduler;
56
57
    /**
58
     * Database.
59
     *
60
     * @var Database
61
     */
62
    protected $db;
63
64
    /**
65
     * Logger.
66
     *
67
     * @var LoggerInterface
68
     */
69
    protected $logger;
70
71
    /**
72
     * Max children.
73
     *
74
     * @var int
75
     */
76
    protected $max_children = 2;
77
78
    /**
79
     * Min children.
80
     *
81
     * @var int
82
     */
83
    protected $min_children = 1;
84
85
    /**
86
     * Forks.
87
     *
88
     * @var array
89
     */
90
    protected $forks = [];
91
92
    /**
93
     * Worker/Job mapping.
94
     *
95
     * @var array
96
     */
97
    protected $job_map = [];
98
99
    /**
100
     * Worker factory.
101
     *
102
     * @var WorkerFactoryInterface
103
     */
104
    protected $factory;
105
106
    /**
107
     * Jobs queue.
108
     *
109
     * @var MessageQueue
110
     */
111
    protected $jobs;
112
113
    /**
114
     * Events queue.
115
     *
116
     * @var MessageQueue
117
     */
118
    protected $events;
119
120
    /**
121
     * Main process name.
122
     *
123
     * @var string
124
     */
125
    protected $process;
126
127
    protected $started;
128
129
    /**
130
     * Init queue.
131
     */
132 13
    public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger, array $config = [])
133
    {
134 13
        $this->scheduler = $scheduler;
135 13
        $this->db = $db;
136 13
        $this->logger = $logger;
137 13
        $this->setOptions($config);
138 13
        $this->process = self::MAIN_PROCESS;
139 13
        $this->factory = $factory;
140
141 13
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
142 13
        $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger);
143 13
        $this->started = new UTCDateTime();
144 13
    }
145
146
    /**
147
     * Set options.
148
     *
149
     *
150
     * @return Queue
151
     */
152 13
    public function setOptions(array $config = []): self
153
    {
154 13
        foreach ($config as $option => $value) {
155
            switch ($option) {
156 10
                case self::OPTION_MAX_CHILDREN:
157 9
                case self::OPTION_MIN_CHILDREN:
158 6
                    if (!is_int($value)) {
159 2
                        throw new InvalidArgumentException($option.' needs to be an integer');
160
                    }
161
162 4
                    $this->{$option} = $value;
163
164 4
                break;
165 5
                case self::OPTION_PM:
166 4
                    if (!defined('self::PM_'.strtoupper($value))) {
167 1
                        throw new InvalidArgumentException($value.' is not a valid process handling type (static, dynamic, ondemand)');
168
                    }
169
170 3
                    $this->{$option} = $value;
171
172 3
                break;
173
                default:
174 7
                    throw new InvalidArgumentException('invalid option '.$option.' given');
175
            }
176
        }
177
178 13
        if ($this->min_children > $this->max_children) {
179 1
            throw new InvalidArgumentException('option min_children must not be greater than option max_children');
180
        }
181
182 13
        return $this;
183
    }
184
185
    /**
186
     * Startup (blocking process).
187
     */
188 8
    public function process(): void
189
    {
190
        try {
191 8
            $this->spawnInitialWorkers();
192 8
            $this->main();
193
        } catch (\Exception $e) {
194
            $this->logger->error('main() throw an exception, cleanup and exit', [
195
                'class' => get_class($this),
196
                'exception' => $e,
197
                'pm' => $this->process,
198
            ]);
199
200
            $this->cleanup(SIGTERM);
201
        }
202 8
    }
203
204
    /**
205
     * Cleanup and exit.
206
     */
207
    public function cleanup(int $sig)
208
    {
209
        $this->handleSignal($sig);
210
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
211
    }
212
213
    /**
214
     * Wait for child and terminate.
215
     */
216 6
    public function exitChild(int $sig, array $pid): self
217
    {
218 6
        $this->logger->debug('worker ['.$pid['pid'].'] exit with ['.$sig.']', [
219 6
            'category' => get_class($this),
220 6
            'pm' => $this->process,
221
        ]);
222
223 6
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
224
225 6
        foreach ($this->forks as $id => $pid) {
226 5
            if ($pid === $pid['pi']) {
227
                unset($this->forks[$id]);
228
229
                if (isset($this->jobs[$id])) {
230 5
                    unset($this->jobs[$id]);
231
                }
232
            }
233
        }
234
235 6
        return $this;
236
    }
237
238
    /**
239
     * Count children.
240
     */
241 8
    public function count(): int
242
    {
243 8
        return count($this->forks);
244
    }
245
246
    /**
247
     * Start initial workers.
248
     */
249 8
    protected function spawnInitialWorkers()
250
    {
251 8
        $this->logger->debug('spawn initial ['.$this->min_children.'] workers', [
252 8
            'category' => get_class($this),
253 8
            'pm' => $this->process,
254
        ]);
255
256 8
        if (self::PM_DYNAMIC === $this->pm || self::PM_STATIC === $this->pm) {
257 6
            for ($i = $this->count(); $i < $this->min_children; ++$i) {
258 6
                $this->spawnWorker();
259
            }
260
        }
261 8
    }
262
263
    /**
264
     * Start worker.
265
     *
266
     * @see https://github.com/mongodb/mongo-php-driver/issues/828
267
     * @see https://github.com/mongodb/mongo-php-driver/issues/174
268
     */
269 7
    protected function spawnWorker()
270
    {
271 7
        $id = new ObjectId();
272 7
        $pid = pcntl_fork();
273
274 7
        if (-1 === $pid) {
275
            throw new QueueRuntimeException('failed to spawn new worker');
276
        }
277
278 7
        $this->forks[(string) $id] = $pid;
279
280 7
        if (!$pid) {
281
            $this->factory->build($id)->start();
282
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
283
        }
284
285 7
        $this->logger->debug('spawn worker ['.$id.'] with pid ['.$pid.']', [
286 7
            'category' => get_class($this),
287 7
            'pm' => $this->process,
288
        ]);
289
290 7
        return $pid;
291
    }
292
293
    /**
294
     * Get forks (array of pid's).
295
     *
296
     * @return int[]
297
     */
298 1
    protected function getForks(): array
299
    {
300 1
        return $this->forks;
301
    }
302
303
    /**
304
     * This method may seem useless but is actually very useful to mock the loop.
305
     */
306
    protected function loop(): bool
307
    {
308
        return true;
309
    }
310
311
    /**
312
     * Fork handling, blocking process.
313
     */
314 8
    protected function main(): void
315
    {
316 8
        $cursor_jobs = $this->jobs->getCursor([
317 8
            '$or' => [
318
                ['status' => JobInterface::STATUS_WAITING],
319
                ['status' => JobInterface::STATUS_POSTPONED],
320
            ],
321
        ]);
322
323 8
        $cursor_events = $this->events->getCursor([
324 8
            '$or' => [
325
                ['status' => JobInterface::STATUS_CANCELED],
326
                ['status' => JobInterface::STATUS_PROCESSING],
327
            ],
328 8
            'timestamp' => ['$gte' => $this->started],
329
        ]);
330
331 8
        $this->catchSignal();
332
333 8
        while ($this->loop()) {
334 8
            $event = $cursor_events->current();
335 8
            $this->events->next($cursor_events, function () {
336
                $this->main();
337 8
            });
338
339 8
            if (null === $event) {
340 8
                if ($cursor_events->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

340
                if ($cursor_events->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
341
                    $this->logger->error('event queue cursor is dead, is it a capped collection?', [
342
                        'category' => get_class($this),
343
                        'pm' => $this->process,
344
                    ]);
345
346
                    $this->events->create();
347
348
                    $this->main();
349
350 8
                    break;
351
                }
352
            } else {
353
                $this->handleEvent($event);
354
            }
355
356 8
            if (null === $cursor_jobs->current()) {
357 5
                if ($cursor_jobs->getInnerIterator()->isDead()) {
358
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
359
                        'category' => get_class($this),
360
                        'pm' => $this->process,
361
                    ]);
362
363
                    $this->jobs->create();
364
365
                    $this->main();
366
367
                    break;
368
                }
369
370 5
                $this->jobs->next($cursor_jobs, function () {
371
                    $this->main();
372 5
                });
373
374 5
                continue;
375
            }
376
377 3
            $job = $cursor_jobs->current();
378 3
            $this->jobs->next($cursor_jobs, function () {
379
                $this->main();
380 3
            });
381
382 3
            $this->manageChildren($job);
383
        }
384 8
    }
385
386
    /**
387
     * Handle cancel event.
388
     */
389
    protected function handleEvent(array $event): self
390
    {
391
        if (JobInterface::STATUS_PROCESSING === $event['status']) {
392
            $this->job_map[(string) $event['worker']] = $event['job'];
393
394
            return $this;
395
        }
396
397
        $worker = array_search((string) $event['job'], $this->job_map, true);
398
399
        if (false === $worker) {
400
            return $this;
401
        }
402
403
        $this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$worker.']', [
404
            'category' => get_class($this),
405
        ]);
406
407
        if (isset($this->forks[(string) $worker])) {
408
            $this->logger->debug('found running worker ['.$worker.'] on this queue node, terminate it now', [
409
                'category' => get_class($this),
410
            ]);
411
412
            unset($this->job_map[(string) $event['job']]);
413
            posix_kill($this->forks[(string) $worker], SIGKILL);
414
        }
415
416
        return $this;
417
    }
418
419
    /**
420
     * Manage children.
421
     */
422 3
    protected function manageChildren(array $job): self
423
    {
424 3
        if ($this->count() < $this->max_children && self::PM_STATIC !== $this->pm) {
425 3
            $this->logger->debug('max_children ['.$this->max_children.'] workers not reached ['.$this->count().'], spawn new worker', [
426 3
                'category' => get_class($this),
427 3
                'pm' => $this->process,
428
            ]);
429
430 3
            $this->spawnWorker();
431 2
        } elseif (true === $job['options'][Scheduler::OPTION_IGNORE_MAX_CHILDREN]) {
432 1
            $this->logger->debug('job ['.$job['_id'].'] deployed with ignore_max_children, spawn new worker', [
433 1
                'category' => get_class($this),
434 1
                'pm' => $this->process,
435
            ]);
436
437 1
            $this->spawnWorker();
438
        } else {
439 1
            $this->logger->debug('max children ['.$this->max_children.'] reached for job ['.$job['_id'].'], do not spawn new worker', [
440 1
                'category' => get_class($this),
441 1
                'pm' => $this->process,
442
            ]);
443
        }
444
445 3
        return $this;
446
    }
447
448
    /**
449
     * Catch signals and cleanup.
450
     */
451 8
    protected function catchSignal(): self
452
    {
453 8
        pcntl_async_signals(true);
454 8
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
455 8
        pcntl_signal(SIGINT, [$this, 'cleanup']);
456 8
        pcntl_signal(SIGCHLD, [$this, 'exitChild']);
457
458 8
        return $this;
459
    }
460
461
    /**
462
     * Cleanup.
463
     */
464 1
    protected function handleSignal(int $sig): void
465
    {
466 1
        $this->logger->debug('received signal ['.$sig.']', [
467 1
            'category' => get_class($this),
468 1
            'pm' => $this->process,
469
        ]);
470
471 1
        foreach ($this->getForks() as $id => $pid) {
472 1
            $this->logger->debug('forward signal ['.$sig.'] to worker ['.$id.'] running with pid ['.$pid.']', [
473 1
                'category' => get_class($this),
474 1
                'pm' => $this->process,
475
            ]);
476
477 1
            posix_kill($pid, $sig);
478
        }
479 1
    }
480
}
481