Completed
Branch dev (58fe22)
by Raffael
02:55
created

Queue   A

Complexity

Total Complexity 40

Size/Duplication

Total Lines 429
Duplicated Lines 0 %

Test Coverage

Coverage 74.83%

Importance

Changes 0
Metric Value
wmc 40
eloc 159
dl 0
loc 429
ccs 116
cts 155
cp 0.7483
rs 9.2
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
A manageChildren() 0 24 4
A process() 0 13 2
A spawnWorker() 0 21 3
A count() 0 3 1
A handleSignal() 0 14 2
A catchSignal() 0 8 1
A handleCancel() 0 19 2
A cleanup() 0 4 1
B setOptions() 0 31 8
A getForks() 0 3 1
A loop() 0 3 1
A spawnInitialWorkers() 0 10 4
B main() 0 66 6
A __construct() 0 11 1
A exitChild() 0 16 3

How to fix   Complexity   

Complex Class

Complex classes like Queue often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Queue, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Log\LoggerInterface;
19
use TaskScheduler\Exception\InvalidArgumentException;
20
use TaskScheduler\Exception\QueueRuntimeException;
21
22
class Queue
23
{
24
    /**
25
     * Process identifier.
26
     */
27
    public const MAIN_PROCESS = 'main';
28
29
    /**
30
     * Queue options.
31
     */
32
    public const OPTION_PM = 'pm';
33
    public const OPTION_MAX_CHILDREN = 'max_children';
34
    public const OPTION_MIN_CHILDREN = 'min_children';
35
36
    /**
37
     * Process handling.
38
     */
39
    public const PM_DYNAMIC = 'dynamic';
40
    public const PM_STATIC = 'static';
41
    public const PM_ONDEMAND = 'ondemand';
42
43
    /**
44
     * Process management.
45
     *
46
     * @var string
47
     */
48
    protected $pm = self::PM_DYNAMIC;
49
50
    /**
51
     * Scheduler.
52
     *
53
     * @var Scheduler
54
     */
55
    protected $scheduler;
56
57
    /**
58
     * Database.
59
     *
60
     * @var Database
61
     */
62
    protected $db;
63
64
    /**
65
     * Logger.
66
     *
67
     * @var LoggerInterface
68
     */
69
    protected $logger;
70
71
    /**
72
     * Max children.
73
     *
74
     * @var int
75
     */
76
    protected $max_children = 2;
77
78
    /**
79
     * Min children.
80
     *
81
     * @var int
82
     */
83
    protected $min_children = 1;
84
85
    /**
86
     * Forks.
87
     *
88
     * @var array
89
     */
90
    protected $forks = [];
91
92
    /**
93
     * Worker factory.
94
     *
95
     * @var WorkerFactoryInterface
96
     */
97
    protected $factory;
98
99
    /**
100
     * Jobs queue.
101
     *
102
     * @var MessageQueue
103
     */
104
    protected $jobs;
105
106
    /**
107
     * Events queue.
108
     *
109
     * @var MessageQueue
110
     */
111
    protected $events;
112
113
    /**
114
     * Main process name.
115
     *
116
     * @var string
117
     */
118
    protected $process;
119
120
    /**
121
     * Init queue.
122
     */
123 13
    public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger, array $config = [])
124
    {
125 13
        $this->scheduler = $scheduler;
126 13
        $this->db = $db;
127 13
        $this->logger = $logger;
128 13
        $this->setOptions($config);
129 13
        $this->process = self::MAIN_PROCESS;
130 13
        $this->factory = $factory;
131
132 13
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
133 13
        $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger);
134 13
    }
135
136
    /**
137
     * Set options.
138
     *
139
     *
140
     * @return Queue
141
     */
142 13
    public function setOptions(array $config = []): self
143
    {
144 13
        foreach ($config as $option => $value) {
145
            switch ($option) {
146 10
                case self::OPTION_MAX_CHILDREN:
147 9
                case self::OPTION_MIN_CHILDREN:
148 6
                    if (!is_int($value)) {
149 2
                        throw new InvalidArgumentException($option.' needs to be an integer');
150
                    }
151
152 4
                    $this->{$option} = $value;
153
154 4
                break;
155 5
                case self::OPTION_PM:
156 4
                    if (!defined('self::PM_'.strtoupper($value))) {
157 1
                        throw new InvalidArgumentException($value.' is not a valid process handling type (static, dynamic, ondemand)');
158
                    }
159
160 3
                    $this->{$option} = $value;
161
162 3
                break;
163
                default:
164 7
                    throw new InvalidArgumentException('invalid option '.$option.' given');
165
            }
166
        }
167
168 13
        if ($this->min_children > $this->max_children) {
169 1
            throw new InvalidArgumentException('option min_children must not be greater than option max_children');
170
        }
171
172 13
        return $this;
173
    }
174
175
    /**
176
     * Startup (blocking process).
177
     */
178 8
    public function process(): void
179
    {
180
        try {
181 8
            $this->spawnInitialWorkers();
182 8
            $this->main();
183
        } catch (Exception $e) {
184
            $this->logger->error('main() throw an exception, cleanup and exit', [
185
                'class' => get_class($this),
186
                'exception' => $e,
187
                'pm' => $this->process,
188
            ]);
189
190
            $this->cleanup(SIGTERM);
191
        }
192 8
    }
193
194
    /**
195
     * Cleanup and exit.
196
     */
197
    public function cleanup(int $sig)
198
    {
199
        $this->handleSignal($sig);
200
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
201
    }
202
203
    /**
204
     * Wait for child and terminate.
205
     */
206 6
    public function exitChild(int $sig, array $pid): self
207
    {
208 6
        $this->logger->debug('worker ['.$pid['pid'].'] exit with ['.$sig.']', [
209 6
            'category' => get_class($this),
210 6
            'pm' => $this->process,
211
        ]);
212
213 6
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
214
215 6
        foreach ($this->forks as $id => $pid) {
216 6
            if ($pid === $pid['pi']) {
217 6
                unset($this->forks[$id]);
218
            }
219
        }
220
221 6
        return $this;
222
    }
223
224
    /**
225
     * Count children.
226
     */
227 8
    public function count(): int
228
    {
229 8
        return count($this->forks);
230
    }
231
232
    /**
233
     * Start initial workers.
234
     */
235 8
    protected function spawnInitialWorkers()
236
    {
237 8
        $this->logger->debug('spawn initial ['.$this->min_children.'] workers', [
238 8
            'category' => get_class($this),
239 8
            'pm' => $this->process,
240
        ]);
241
242 8
        if (self::PM_DYNAMIC === $this->pm || self::PM_STATIC === $this->pm) {
243 6
            for ($i = $this->count(); $i < $this->min_children; ++$i) {
244 6
                $this->spawnWorker();
245
            }
246
        }
247 8
    }
248
249
    /**
250
     * Start worker.
251
     *
252
     * @see https://github.com/mongodb/mongo-php-driver/issues/828
253
     * @see https://github.com/mongodb/mongo-php-driver/issues/174
254
     */
255 7
    protected function spawnWorker()
256
    {
257 7
        $id = new ObjectId();
258 7
        $pid = pcntl_fork();
259
260 7
        if (-1 === $pid) {
261
            throw new QueueRuntimeException('failed to spawn new worker');
262
        }
263
264 7
        $this->forks[(string) $id] = $pid;
265 7
        if (!$pid) {
266
            $worker = $this->factory->build($id)->start();
0 ignored issues
show
Unused Code introduced by
The assignment to $worker is dead and can be removed.
Loading history...
Bug introduced by
Are you sure the assignment to $worker is correct as $this->factory->build($id)->start() targeting TaskScheduler\Worker::start() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
267
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
268
        }
269
270 7
        $this->logger->debug('spawn worker ['.$id.'] with pid ['.$pid.']', [
271 7
            'category' => get_class($this),
272 7
            'pm' => $this->process,
273
        ]);
274
275 7
        return $pid;
276
    }
277
278
    /**
279
     * Get forks (array of pid's).
280
     *
281
     * @return int[]
282
     */
283 1
    protected function getForks(): array
284
    {
285 1
        return $this->forks;
286
    }
287
288
    /**
289
     * This method may seem useless but is actually very useful to mock the loop.
290
     */
291
    protected function loop(): bool
292
    {
293
        return true;
294
    }
295
296
    /**
297
     * Fork handling, blocking process.
298
     */
299 8
    protected function main(): void
300
    {
301 8
        $cursor_jobs = $this->jobs->getCursor([
302 8
            '$or' => [
303
                ['status' => JobInterface::STATUS_WAITING],
304
                ['status' => JobInterface::STATUS_POSTPONED],
305
            ],
306
        ]);
307
308 8
        $cursor_events = $this->events->getCursor([
309 8
            'status' => JobInterface::STATUS_CANCELED,
310 8
            'timestamp' => ['$gte' => new UTCDateTime()],
311
        ]);
312
313 8
        $this->catchSignal();
314
315 8
        while ($this->loop()) {
316 8
            $event = $cursor_events->current();
317 8
            $this->events->next($cursor_events, function () {
318
                $this->main();
319 8
            });
320
321 8
            if (null === $event) {
322 8
                if ($cursor_events->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

322
                if ($cursor_events->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
323
                    $this->logger->error('event queue cursor is dead, is it a capped collection?', [
324
                        'category' => get_class($this),
325
                        'pm' => $this->process,
326
                    ]);
327
328
                    $this->events->create();
329
330
                    $this->main();
331
332 8
                    break;
333
                }
334
            } else {
335
                $this->handleCancel($event);
336
            }
337
338 8
            if (null === $cursor_jobs->current()) {
339 5
                if ($cursor_jobs->getInnerIterator()->isDead()) {
340
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
341
                        'category' => get_class($this),
342
                        'pm' => $this->process,
343
                    ]);
344
345
                    $this->jobs->create();
346
347
                    $this->main();
348
349
                    break;
350
                }
351
352 5
                $this->jobs->next($cursor_jobs, function () {
353
                    $this->main();
354 5
                });
355
356 5
                continue;
357
            }
358
359 3
            $job = $cursor_jobs->current();
360 3
            $this->jobs->next($cursor_jobs, function () {
361
                $this->main();
362 3
            });
363
364 3
            $this->manageChildren($job);
365
        }
366 8
    }
367
368
    /**
369
     * Handle cancel event.
370
     */
371
    protected function handleCancel(array $event): self
372
    {
373
        $process = $this->scheduler->getJob($event['job']);
374
375
        $this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$process->getWorker().']', [
376
            'category' => get_class($this),
377
        ]);
378
379
        $worker = $process->getWorker();
380
381
        if (isset($this->forks[(string) $worker])) {
382
            $this->logger->debug('found running worker ['.$process->getWorker().'] on this queue node, terminate it now', [
383
                'category' => get_class($this),
384
            ]);
385
386
            posix_kill($this->forks[(string) $worker], SIGKILL);
387
        }
388
389
        return $this;
390
    }
391
392
    /**
393
     * Manage children.
394
     */
395 3
    protected function manageChildren(array $job): self
396
    {
397 3
        if ($this->count() < $this->max_children && self::PM_STATIC !== $this->pm) {
398 3
            $this->logger->debug('max_children ['.$this->max_children.'] workers not reached ['.$this->count().'], spawn new worker', [
399 3
                'category' => get_class($this),
400 3
                'pm' => $this->process,
401
            ]);
402
403 3
            $this->spawnWorker();
404 2
        } elseif (true === $job['options'][Scheduler::OPTION_IGNORE_MAX_CHILDREN]) {
405 1
            $this->logger->debug('job ['.$job['_id'].'] deployed with ignore_max_children, spawn new worker', [
406 1
                'category' => get_class($this),
407 1
                'pm' => $this->process,
408
            ]);
409
410 1
            $this->spawnWorker();
411
        } else {
412 1
            $this->logger->debug('max children ['.$this->max_children.'] reached for job ['.$job['_id'].'], do not spawn new worker', [
413 1
                'category' => get_class($this),
414 1
                'pm' => $this->process,
415
            ]);
416
        }
417
418 3
        return $this;
419
    }
420
421
    /**
422
     * Catch signals and cleanup.
423
     */
424 8
    protected function catchSignal(): self
425
    {
426 8
        pcntl_async_signals(true);
427 8
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
428 8
        pcntl_signal(SIGINT, [$this, 'cleanup']);
429 8
        pcntl_signal(SIGCHLD, [$this, 'exitChild']);
430
431 8
        return $this;
432
    }
433
434
    /**
435
     * Cleanup.
436
     */
437 1
    protected function handleSignal(int $sig): void
438
    {
439 1
        $this->logger->debug('received signal ['.$sig.']', [
440 1
            'category' => get_class($this),
441 1
            'pm' => $this->process,
442
        ]);
443
444 1
        foreach ($this->getForks() as $id => $pid) {
445 1
            $this->logger->debug('forward signal ['.$sig.'] to worker ['.$id.'] running with pid ['.$pid.']', [
446 1
                'category' => get_class($this),
447 1
                'pm' => $this->process,
448
            ]);
449
450 1
            posix_kill($pid, $sig);
451
        }
452 1
    }
453
}
454