Passed
Branch master (da2ca3)
by Raffael
06:25 queued 03:52
created

Async   D

Complexity

Total Complexity 58

Size/Duplication

Total Lines 648
Duplicated Lines 0 %

Test Coverage

Coverage 84.38%

Importance

Changes 0
Metric Value
dl 0
loc 648
ccs 189
cts 224
cp 0.8438
rs 4.3741
c 0
b 0
f 0
wmc 58

19 Methods

Rating   Name   Duplication   Size   Complexity  
B startDaemon() 0 24 4
B addJob() 0 36 2
B validateOptions() 0 19 7
A updateJob() 0 13 1
A getJobs() 0 23 2
A cancelJob() 0 3 1
A addJobOnce() 0 22 2
C setOptions() 0 22 8
B startOnce() 0 22 4
A executeJob() 0 20 4
B collectJob() 0 26 2
B processJob() 0 56 5
A queueJob() 0 13 3
B getCursor() 0 26 2
A retrieveNextJob() 0 11 2
A __construct() 0 8 2
A processLocalQueue() 0 19 4
A getJob() 0 17 2
A createQueue() 0 11 1

How to fix   Complexity   

Complex Class

Complex classes like Async often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Async, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Exception\RuntimeException;
20
use MongoDB\Operation\Find;
21
use Psr\Container\ContainerInterface;
22
use Psr\Log\LoggerInterface;
23
use Traversable;
24
25
class Async
26
{
27
    /**
28
     * Job status.
29
     */
30
    const STATUS_WAITING = 0;
31
    const STATUS_POSTPONED = 1;
32
    const STATUS_PROCESSING = 2;
33
    const STATUS_DONE = 3;
34
    const STATUS_FAILED = 4;
35
    const STATUS_CANCELED = 5;
36
37
    /**
38
     * Job options.
39
     */
40
    const OPTION_AT = 'at';
41
    const OPTION_INTERVAL = 'interval';
42
    const OPTION_RETRY = 'retry';
43
    const OPTION_RETRY_INTERVAL = 'retry_interval';
44
45
    /**
46
     * Database.
47
     *
48
     * @var Database
49
     */
50
    protected $db;
51
52
    /**
53
     * LoggerInterface.
54
     *
55
     * @var LoggerInterface
56
     */
57
    protected $logger;
58
59
    /**
60
     * Local queue.
61
     *
62
     * @var array
63
     */
64
    protected $queue = [];
65
66
    /**
67
     * Collection name.
68
     *
69
     * @var string
70
     */
71
    protected $collection_name = 'queue';
72
73
    /**
74
     * Container.
75
     *
76
     * @var ContainerInterface
77
     */
78
    protected $container;
79
80
    /**
81
     * Default at (Secconds from now).
82
     *
83
     * @var int
84
     */
85
    protected $default_at = 0;
86
87
    /**
88
     * Default interval (secconds).
89
     *
90
     * @var int
91
     */
92
    protected $default_interval = -1;
93
94
    /**
95
     * Default retry.
96
     *
97
     * @var int
98
     */
99
    protected $default_retry = 0;
100
101
    /**
102
     * Default retry interval (secconds).
103
     *
104
     * @var int
105
     */
106
    protected $default_retry_interval = 300;
107
108
    /**
109
     * Queue size.
110
     *
111
     * @var int
112
     */
113
    protected $queue_size = 100000;
114
115
    /**
116
     * Init queue.
117
     *
118
     * @param Database           $db
119
     * @param LoggerInterface    $logger
120
     * @param ContainerInterface $container
121
     * @param iterable           $config
122
     */
123 30
    public function __construct(Database $db, LoggerInterface $logger, ?ContainerInterface $container = null, ?Iterable $config = null)
124
    {
125 30
        $this->db = $db;
126 30
        $this->logger = $logger;
127 30
        $this->container = $container;
128
129 30
        if (null !== $config) {
130 1
            $this->setOptions($config);
131
        }
132 30
    }
133
134
    /**
135
     * Set options.
136
     *
137
     * @param iterable $config
138
     *
139
     * @return Async
140
     */
141 1
    public function setOptions(Iterable $config = null): self
142
    {
143 1
        foreach ($config as $option => $value) {
144
            switch ($option) {
145 1
                case 'collection_name':
146 1
                    $this->{$option} = (string) $value;
147
148 1
                break;
149 1
                case 'default_retry':
150 1
                case 'default_at':
151 1
                case 'default_retry_interval':
152 1
                case 'default_interval':
153 1
                case 'queue_size':
154 1
                    $this->{$option} = (int) $value;
155
156 1
                break;
157
                default:
158 1
                    throw new Exception('invalid option '.$option.' given');
159
            }
160
        }
161
162 1
        return $this;
163
    }
164
165
    /**
166
     * Create queue collection.
167
     *
168
     * @return Async
169
     */
170
    public function createQueue(): self
171
    {
172
        $this->db->createCollection(
173
            $this->collection_name,
174
            [
175
                'capped' => true,
176
                'size' => $this->queue_size,
177
            ]
178
        );
179
180
        return $this;
181
    }
182
183
    /**
184
     * Get job by ID.
185
     *
186
     * @param ObjectId
187
     *
188
     * @return array
189
     */
190 23
    public function getJob(ObjectId $id): array
191
    {
192 23
        $result = $this->db->{$this->collection_name}->findOne([
193 23
            '_id' => $id,
194
        ], [
195 23
            'typeMap' => [
196
                'document' => 'array',
197
                'root' => 'array',
198
                'array' => 'array',
199
            ],
200
        ]);
201
202 23
        if (null === $result) {
203 1
            throw new Exception('job '.$id.' was not found');
204
        }
205
206 22
        return $result;
207
    }
208
209
    /**
210
     * Cancel job.
211
     *
212
     * @param ObjectId $id
213
     *
214
     * @return bool
215
     */
216 1
    public function cancelJob(ObjectId $id): bool
217
    {
218 1
        return $this->updateJob($id, self::STATUS_CANCELED);
219
    }
220
221
    /**
222
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
223
     *
224
     * @param array $filter
225
     *
226
     * @return Traversable
227
     */
228 3
    public function getJobs(array $filter = []): Traversable
229
    {
230 3
        if (0 === count($filter)) {
231
            $filter = [
232 2
                self::STATUS_WAITING,
233 2
                self::STATUS_PROCESSING,
234 2
                self::STATUS_POSTPONED,
235
            ];
236
        }
237
238 3
        $result = $this->db->{$this->collection_name}->find([
239
            'status' => [
240 3
                '$in' => $filter,
241
            ],
242
        ], [
243 3
            'typeMap' => [
244
                'document' => 'array',
245
                'root' => 'array',
246
                'array' => 'array',
247
            ],
248
        ]);
249
250 3
        return $result;
251
    }
252
253
    /**
254
     * Validate given job options.
255
     *
256
     * @param array $options
257
     *
258
     * @return Async
259
     */
260 29
    public function validateOptions(array $options): self
261
    {
262 29
        foreach ($options as $option => $value) {
263
            switch ($option) {
264 29
                case self::OPTION_AT:
265 29
                case self::OPTION_RETRY:
266 29
                case self::OPTION_RETRY_INTERVAL:
267 29
                case self::OPTION_INTERVAL:
268 29
                    if (!is_int($value)) {
269 1
                        throw new Exception('option '.$option.' must be an integer');
270
                    }
271
272 29
                break;
273
                default:
274 29
                    throw new Exception('invalid option '.$option.' given');
275
            }
276
        }
277
278 28
        return $this;
279
    }
280
281
    /**
282
     * Add job to queue.
283
     *
284
     * @param string $class
285
     * @param mixed  $data
286
     * @param array  $options
287
     *
288
     * @return ObjectId
289
     */
290 29
    public function addJob(string $class, $data, array $options = []): ObjectId
291
    {
292
        $defaults = [
293 29
            self::OPTION_AT => $this->default_at,
294 29
            self::OPTION_INTERVAL => $this->default_interval,
295 29
            self::OPTION_RETRY => $this->default_retry,
296 29
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
297
        ];
298
299 29
        $options = array_merge($defaults, $options);
300 29
        $this->validateOptions($options);
301
302 28
        if ($options[self::OPTION_AT] > 0) {
303 7
            $at = new UTCDateTime($options[self::OPTION_AT] * 1000);
304
        } else {
305 23
            $at = null;
306
        }
307
308 28
        $result = $this->db->{$this->collection_name}->insertOne([
309 28
            'class' => $class,
310 28
            'status' => self::STATUS_WAITING,
311 28
            'timestamp' => new UTCDateTime(),
312 28
            'at' => $at,
313 28
            'retry' => $options[self::OPTION_RETRY],
314 28
            'retry_interval' => $options[self::OPTION_RETRY_INTERVAL],
315 28
            'interval' => $options[self::OPTION_INTERVAL],
316 28
            'data' => $data,
317 28
        ], ['$isolated' => true]);
318
319 28
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
320 28
            'category' => get_class($this),
321 28
            'params' => $options,
322 28
            'data' => $data,
323
        ]);
324
325 28
        return $result->getInsertedId();
326
    }
327
328
    /**
329
     * Only add job if not in queue yet.
330
     *
331
     * @param string $class
332
     * @param mixed  $data
333
     * @param array  $options
334
     *
335
     * @return ObjectId
336
     */
337 1
    public function addJobOnce(string $class, $data, array $options = []): ObjectId
338
    {
339
        $filter = [
340 1
            'class' => $class,
341 1
            'data' => $data,
342
            '$or' => [
343 1
                ['status' => self::STATUS_WAITING],
344 1
                ['status' => self::STATUS_POSTPONED],
345
            ],
346
        ];
347
348 1
        $result = $this->db->queue->findOne($filter);
349
350 1
        if (null === $result) {
351 1
            return $this->addJob($class, $data, $options);
352
        }
353 1
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
354 1
                'category' => get_class($this),
355 1
                'data' => $data,
356
            ]);
357
358 1
        return $result['_id'];
359
    }
360
361
    /**
362
     * Execute job queue as endless loop.
363
     */
364
    public function startDaemon()
365
    {
366
        $cursor = $this->getCursor();
367
368
        while (true) {
369
            $this->processLocalQueue();
370
371
            if (null === $cursor->current()) {
372
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

372
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
373
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
374
                        'category' => get_class($this),
375
                    ]);
376
377
                    return $this->startDaemon();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->startDaemon() targeting TaskScheduler\Async::startDaemon() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
378
                }
379
380
                $this->retrieveNextJob($cursor);
0 ignored issues
show
Bug introduced by
$cursor of type IteratorIterator is incompatible with the type iterable expected by parameter $cursor of TaskScheduler\Async::retrieveNextJob(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

380
                $this->retrieveNextJob(/** @scrutinizer ignore-type */ $cursor);
Loading history...
381
382
                continue;
383
            }
384
385
            $job = $cursor->current();
386
            $this->retrieveNextJob($cursor);
387
            $this->queueJob($job);
388
        }
389
    }
390
391
    /**
392
     * Execute job queue.
393
     *
394
     * @return bool
395
     */
396 1
    public function startOnce(): bool
397
    {
398 1
        $cursor = $this->getCursor(false);
399
400 1
        while (true) {
401 1
            $this->processLocalQueue();
402
403 1
            if (null === $cursor->current()) {
404 1
                if ($cursor->getInnerIterator()->isDead()) {
405
                    $this->logger->debug('all jobs were processed', [
406
                        'category' => get_class($this),
407
                    ]);
408
409
                    return false;
410
                }
411
412 1
                return true;
413
            }
414
415 1
            $job = $cursor->current();
416 1
            $cursor->next();
417 1
            $this->queueJob($job);
418
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
419
    }
420
421
    /**
422
     * Retrieve next job.
423
     *
424
     * @param iterable $cursor
425
     */
426 1
    protected function retrieveNextJob(Iterable $cursor)
427
    {
428
        try {
429 1
            $cursor->next();
430
        } catch (RuntimeException $e) {
431
            $this->logger->error('job queue cursor failed to retrieve next job, restart daemon', [
432
                'category' => get_class($this),
433
                'exception' => $e,
434
            ]);
435
436
            $this->startDaemon();
437
        }
438 1
    }
439
440
    /**
441
     * Queue job.
442
     *
443
     * @param array $job
444
     */
445 1
    protected function queueJob(array $job): bool
446
    {
447 1
        if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING)) {
448 1
            $this->processJob($job);
449
        } elseif (self::STATUS_POSTPONED === $job['status']) {
450
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
451
                'category' => get_class($this),
452
            ]);
453
454
            $this->queue[] = $job;
455
        }
456
457 1
        return true;
458
    }
459
460
    /**
461
     * Get cursor.
462
     *
463
     * @param bool $tailable
464
     *
465
     * @return IteratorIterator
466
     */
467 4
    protected function getCursor(bool $tailable = true): IteratorIterator
468
    {
469
        $options = [
470 4
            'typeMap' => [
471
                'document' => 'array',
472
                'root' => 'array',
473
                'array' => 'array',
474
            ],
475
        ];
476
477 4
        if (true === $tailable) {
478 3
            $options['cursorType'] = Find::TAILABLE;
479 3
            $options['noCursorTimeout'] = true;
480
        }
481
482 4
        $cursor = $this->db->{$this->collection_name}->find([
483
            '$or' => [
484 4
                ['status' => self::STATUS_WAITING],
485 4
                ['status' => self::STATUS_POSTPONED],
486
            ],
487 4
        ], $options);
488
489 4
        $iterator = new IteratorIterator($cursor);
490 4
        $iterator->rewind();
491
492 4
        return $iterator;
493
    }
494
495
    /**
496
     * Update job status.
497
     *
498
     * @param ObjectId $id
499
     * @param int      $status
500
     * @param mixed    $from_status
501
     *
502
     * @return bool
503
     */
504 5
    protected function collectJob(ObjectId $id, int $status, $from_status = self::STATUS_WAITING): bool
505
    {
506 5
        $result = $this->db->{$this->collection_name}->updateMany([
507 5
            '_id' => $id,
508 5
            'status' => $from_status,
509
            '$isolated' => true,
510
        ], [
511
            '$set' => [
512 5
                'status' => $status,
513 5
                'timestamp' => new UTCDateTime(),
514
            ],
515
        ]);
516
517 5
        if (1 === $result->getModifiedCount()) {
518 5
            $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
519 5
                'category' => get_class($this),
520
            ]);
521
522 5
            return true;
523
        }
524
525 1
        $this->logger->debug('job ['.$id.'] is already collected with status ['.$status.']', [
526 1
            'category' => get_class($this),
527
        ]);
528
529 1
        return false;
530
    }
531
532
    /**
533
     * Update job status.
534
     *
535
     * @param ObjectId $id
536
     * @param int      $status
537
     *
538
     * @return bool
539
     */
540 14
    protected function updateJob(ObjectId $id, int $status): bool
541
    {
542 14
        $result = $this->db->{$this->collection_name}->updateMany([
543 14
            '_id' => $id,
544
            '$isolated' => true,
545
        ], [
546
            '$set' => [
547 14
                'status' => $status,
548 14
                'timestamp' => new UTCDateTime(),
549
            ],
550
        ]);
551
552 14
        return $result->isAcknowledged();
553
    }
554
555
    /**
556
     * Check local queue for postponed jobs.
557
     *
558
     * @return bool
559
     */
560 4
    protected function processLocalQueue(): bool
561
    {
562 4
        $now = new UTCDateTime();
563 4
        foreach ($this->queue as $key => $job) {
564 3
            if ($job['at'] <= $now) {
565 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
566 2
                    'category' => get_class($this),
567
                ]);
568
569 2
                unset($this->queue[$key]);
570 2
                $job['at'] = null;
571
572 2
                if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING, self::STATUS_POSTPONED)) {
573 3
                    $this->processJob($job);
574
                }
575
            }
576
        }
577
578 4
        return true;
579
    }
580
581
    /**
582
     * Process job.
583
     *
584
     * @param array $job
585
     *
586
     * @return ObjectId
587
     */
588 8
    protected function processJob(array $job): ObjectId
589
    {
590 8
        if ($job['at'] instanceof UTCDateTime) {
591 1
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
0 ignored issues
show
Bug introduced by
It seems like $job['_id'] can also be of type MongoDB\BSON\UTCDateTime; however, parameter $id of TaskScheduler\Async::updateJob() does only seem to accept MongoDB\BSON\ObjectId, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

591
            $this->updateJob(/** @scrutinizer ignore-type */ $job['_id'], self::STATUS_POSTPONED);
Loading history...
592 1
            $this->queue[] = $job;
593
594 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
595 1
                'category' => get_class($this),
596
            ]);
597
598 1
            return $job['_id'];
0 ignored issues
show
Bug Best Practice introduced by
The expression return $job['_id'] could return the type MongoDB\BSON\UTCDateTime which is incompatible with the type-hinted return MongoDB\BSON\ObjectId. Consider adding an additional type-check to rule them out.
Loading history...
599
        }
600
601 7
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
602 7
            'category' => get_class($this),
603 7
            'params' => $job['data'],
604
        ]);
605
606
        try {
607 7
            $this->executeJob($job);
608 5
        } catch (\Exception $e) {
609 5
            $this->logger->error('failed execute job ['.$job['_id'].']', [
610 5
                'category' => get_class($this),
611 5
                'exception' => $e,
612
            ]);
613
614 5
            $this->updateJob($job['_id'], self::STATUS_FAILED);
615
616 5
            if ($job['retry'] > 0) {
617 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
618 1
                    'category' => get_class($this),
619
                ]);
620
621 1
                return $this->addJob($job['class'], $job['data'], [
622 1
                    self::OPTION_AT => time() + $job['retry_interval'],
623 1
                    self::OPTION_INTERVAL => $job['interval'],
624 1
                    self::OPTION_RETRY => --$job['retry'],
625 1
                    self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
626
                ]);
627
            }
628
        }
629
630 6
        if ($job['interval'] >= 0) {
631 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['interval'].'s]', [
632 1
                'category' => get_class($this),
633
            ]);
634
635 1
            return $this->addJob($job['class'], $job['data'], [
636 1
                self::OPTION_AT => time() + $job['interval'],
637 1
                self::OPTION_INTERVAL => $job['interval'],
638 1
                self::OPTION_RETRY => $job['retry'],
639 1
                self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
640
            ]);
641
        }
642
643 5
        return $job['_id'];
644
    }
645
646
    /**
647
     * Execute job.
648
     *
649
     * @param array $job
650
     *
651
     * @return bool
652
     */
653 11
    protected function executeJob(array $job): bool
654
    {
655 11
        if (!class_exists($job['class'])) {
656 4
            throw new Exception('job class does not exists');
657
        }
658
659 7
        if (null === $this->container) {
660 6
            $instance = new $job['class']();
661
        } else {
662 1
            $instance = $this->container->get($job['class']);
663
        }
664
665 7
        if (!($instance instanceof JobInterface)) {
666
            throw new Exception('job must implement JobInterface');
667
        }
668
669 7
        $instance->setData($job['data'])
670 7
            ->start();
671
672 4
        return $this->updateJob($job['_id'], self::STATUS_DONE);
673
    }
674
}
675