Passed
Branch master (c8d412)
by Raffael
02:48
created

Async::startOnce()   B

Complexity

Conditions 4
Paths 4

Size

Total Lines 22
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 4.1967

Importance

Changes 0
Metric Value
cc 4
eloc 12
nc 4
nop 0
dl 0
loc 22
ccs 10
cts 13
cp 0.7692
crap 4.1967
rs 8.9197
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Exception\RuntimeException;
20
use MongoDB\Operation\Find;
21
use Psr\Container\ContainerInterface;
22
use Psr\Log\LoggerInterface;
23
use Traversable;
24
25
class Async
26
{
27
    /**
28
     * Job status.
29
     */
30
    const STATUS_WAITING = 0;
31
    const STATUS_POSTPONED = 1;
32
    const STATUS_PROCESSING = 2;
33
    const STATUS_DONE = 3;
34
    const STATUS_FAILED = 4;
35
    const STATUS_CANCELED = 5;
36
37
    /**
38
     * Job options.
39
     */
40
    const OPTION_AT = 'at';
41
    const OPTION_INTERVAL = 'interval';
42
    const OPTION_RETRY = 'retry';
43
    const OPTION_RETRY_INTERVAL = 'retry_interval';
44
45
    /**
46
     * Database.
47
     *
48
     * @var Database
49
     */
50
    protected $db;
51
52
    /**
53
     * LoggerInterface.
54
     *
55
     * @var LoggerInterface
56
     */
57
    protected $logger;
58
59
    /**
60
     * Local queue.
61
     *
62
     * @var array
63
     */
64
    protected $queue = [];
65
66
    /**
67
     * Collection name.
68
     *
69
     * @var string
70
     */
71
    protected $collection_name = 'queue';
72
73
    /**
74
     * Container.
75
     *
76
     * @var ContainerInterface
77
     */
78
    protected $container;
79
80
    /**
81
     * Default at (Secconds from now).
82
     *
83
     * @var int
84
     */
85
    protected $default_at = 0;
86
87
    /**
88
     * Default interval (secconds).
89
     *
90
     * @var int
91
     */
92
    protected $default_interval = -1;
93
94
    /**
95
     * Default retry.
96
     *
97
     * @var int
98
     */
99
    protected $default_retry = 0;
100
101
    /**
102
     * Default retry interval (secconds).
103
     *
104
     * @var int
105
     */
106
    protected $default_retry_interval = 300;
107
108
    /**
109
     * Queue size.
110
     *
111
     * @var int
112
     */
113
    protected $queue_size = 100000;
114
115
    /**
116
     * Init queue.
117
     *
118
     * @param Database           $db
119
     * @param LoggerInterface    $logger
120
     * @param ContainerInterface $container
121
     * @param iterable           $config
122
     */
123 28
    public function __construct(Database $db, LoggerInterface $logger, ?ContainerInterface $container = null, ?Iterable $config = null)
0 ignored issues
show
Unused Code introduced by
The parameter $config is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

123
    public function __construct(Database $db, LoggerInterface $logger, ?ContainerInterface $container = null, /** @scrutinizer ignore-unused */ ?Iterable $config = null)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
124
    {
125 28
        $this->db = $db;
126 28
        $this->logger = $logger;
127 28
        $this->container = $container;
128
        //$this->setOptions($config);
129 28
    }
130
131
    /**
132
     * Set options.
133
     *
134
     * @param iterable $config
135
     *
136
     * @return Async
137
     */
138
    /*public function setOptions(? Iterable $config = null): self
139
    {
140
        if (null === $config) {
141
            return $this;
142
        }
143
144
        foreach ($config as $option => $value) {
145
            switch ($option) {
146
                case 'collection_name':
147
                    $this->{$option} = (string) $value;
148
149
                break;
150
                case 'default_retry':
151
                case 'default_at':
152
                case 'default_retry_interval':
153
                case 'default_interval':
154
                case 'queue_size':
155
                    $this->{$option} = (int) $value;
156
157
                break;
158
                default:
159
                    throw new Exception('invalid option '.$option.' given');
160
            }
161
        }
162
163
        return $this;
164
    }*/
165
166
    /**
167
     * Create queue collection.
168
     *
169
     * @return Async
170
     */
171
    public function createQueue(): self
172
    {
173
        $this->db->createCollection(
174
            $this->collection_name,
175
            [
176
                'capped' => true,
177
                'size' => $this->queue_size,
178
            ]
179
        );
180
181
        return $this;
182
    }
183
184
    /**
185
     * Get job by ID.
186
     *
187
     * @param ObjectId
188
     *
189
     * @return array
190
     */
191 21
    public function getJob(ObjectId $id): array
192
    {
193 21
        $result = $this->db->{$this->collection_name}->findOne([
194 21
            '_id' => $id,
195
        ], [
196 21
            'typeMap' => [
197
                'document' => 'array',
198
                'root' => 'array',
199
                'array' => 'array',
200
            ],
201
        ]);
202
203 21
        if (null === $result) {
204 1
            throw new Exception('job '.$id.' was not found');
205
        }
206
207 20
        return $result;
208
    }
209
210
    /**
211
     * Cancel job.
212
     *
213
     *  @param ObjectId $id
214
     */
215 1
    public function cancelJob(ObjectId $id): bool
216
    {
217 1
        return $this->updateJob($id, self::STATUS_CANCELED);
218
    }
219
220
    /**
221
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
222
     *
223
     * @param array $filter
224
     *
225
     * @return Traversable
226
     */
227 3
    public function getJobs(array $filter = []): Traversable
228
    {
229 3
        if (0 === count($filter)) {
230
            $filter = [
231 2
                self::STATUS_WAITING,
232 2
                self::STATUS_PROCESSING,
233 2
                self::STATUS_POSTPONED,
234
            ];
235
        }
236
237 3
        $result = $this->db->{$this->collection_name}->find([
238
            'status' => [
239 3
                '$in' => $filter,
240
            ],
241
        ], [
242 3
            'typeMap' => [
243
                'document' => 'array',
244
                'root' => 'array',
245
                'array' => 'array',
246
            ],
247
        ]);
248
249 3
        return $result;
250
    }
251
252
    /**
253
     * Validate given job options.
254
     *
255
     * @param array $options
256
     *
257
     * @return Async
258
     */
259 27
    public function validateOptions(array $options): self
260
    {
261 27
        foreach ($options as $option => $value) {
262
            switch ($option) {
263 27
                case self::OPTION_AT:
264 27
                case self::OPTION_RETRY:
265 27
                case self::OPTION_RETRY_INTERVAL:
266 27
                case self::OPTION_INTERVAL:
267 27
                    if (!is_int($value)) {
268 1
                        throw new Exception('option '.$option.' must be an integer');
269
                    }
270
271 27
                break;
272
                default:
273 27
                    throw new Exception('invalid option '.$option.' given');
274
            }
275
        }
276
277 26
        return $this;
278
    }
279
280
    /**
281
     * Add job to queue.
282
     *
283
     * @param string $class
284
     * @param mixed  $data
285
     * @param array  $options
286
     *
287
     * @return ObjectId
288
     */
289 27
    public function addJob(string $class, $data, array $options = []): ObjectId
290
    {
291
        $defaults = [
292 27
            self::OPTION_AT => $this->default_at,
293 27
            self::OPTION_INTERVAL => $this->default_interval,
294 27
            self::OPTION_RETRY => $this->default_retry,
295 27
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
296
        ];
297
298 27
        $options = array_merge($defaults, $options);
299 27
        $this->validateOptions($options);
300
301 26
        if ($options[self::OPTION_AT] > 0) {
302 6
            $at = new UTCDateTime($options[self::OPTION_AT] * 1000);
303
        } else {
304 22
            $at = null;
305
        }
306
307 26
        $result = $this->db->{$this->collection_name}->insertOne([
308 26
            'class' => $class,
309 26
            'status' => self::STATUS_WAITING,
310 26
            'timestamp' => new UTCDateTime(),
311 26
            'at' => $at,
312 26
            'retry' => $options[self::OPTION_RETRY],
313 26
            'retry_interval' => $options[self::OPTION_RETRY_INTERVAL],
314 26
            'interval' => $options[self::OPTION_INTERVAL],
315 26
            'data' => $data,
316 26
        ], ['$isolated' => true]);
317
318 26
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
319 26
            'category' => get_class($this),
320 26
            'params' => $options,
321 26
            'data' => $data,
322
        ]);
323
324 26
        return $result->getInsertedId();
325
    }
326
327
    /**
328
     * Only add job if not in queue yet.
329
     *
330
     * @param string $class
331
     * @param mixed  $data
332
     * @param array  $options
333
     *
334
     * @return ObjectId
335
     */
336 1
    public function addJobOnce(string $class, $data, array $options = []): ObjectId
337
    {
338
        $filter = [
339 1
            'class' => $class,
340 1
            'data' => $data,
341
            '$or' => [
342 1
                ['status' => self::STATUS_WAITING],
343 1
                ['status' => self::STATUS_POSTPONED],
344
            ],
345
        ];
346
347 1
        $result = $this->db->queue->findOne($filter);
348
349 1
        if (null === $result) {
350 1
            return $this->addJob($class, $data, $options);
351
        }
352 1
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
353 1
                'category' => get_class($this),
354 1
                'data' => $data,
355
            ]);
356
357 1
        return $result['_id'];
358
    }
359
360
    /**
361
     * Execute job queue as endless loop.
362
     */
363
    public function startDaemon()
364
    {
365
        $cursor = $this->getCursor();
366
367
        while (true) {
368
            $this->processLocalQueue();
369
370
            if (null === $cursor->current()) {
371
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

371
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
372
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
373
                        'category' => get_class($this),
374
                    ]);
375
376
                    return $this->startDaemon();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->startDaemon() targeting TaskScheduler\Async::startDaemon() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
377
                }
378
379
                $this->retrieveNextJob($cursor);
0 ignored issues
show
Bug introduced by
$cursor of type IteratorIterator is incompatible with the type iterable expected by parameter $cursor of TaskScheduler\Async::retrieveNextJob(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

379
                $this->retrieveNextJob(/** @scrutinizer ignore-type */ $cursor);
Loading history...
380
381
                continue;
382
            }
383
384
            $job = $cursor->current();
385
            $this->retrieveNextJob($cursor);
386
            $this->queueJob($job);
387
        }
388
    }
389
390
    /**
391
     * Execute job queue.
392
     *
393
     * @return bool
394
     */
395 1
    public function startOnce(): bool
396
    {
397 1
        $cursor = $this->getCursor(false);
398
399 1
        while (true) {
400 1
            $this->processLocalQueue();
401
402 1
            if (null === $cursor->current()) {
403 1
                if ($cursor->getInnerIterator()->isDead()) {
404
                    $this->logger->debug('all jobs were processed', [
405
                        'category' => get_class($this),
406
                    ]);
407
408
                    return false;
409
                }
410
411 1
                return true;
412
            }
413
414 1
            $job = $cursor->current();
415 1
            $cursor->next();
416 1
            $this->queueJob($job);
417
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
418
    }
419
420
    /**
421
     * Retrieve next job.
422
     *
423
     * @param iterable $cursor
424
     */
425 1
    protected function retrieveNextJob(Iterable $cursor)
426
    {
427
        try {
428 1
            $cursor->next();
429
        } catch (RuntimeException $e) {
430
            $this->logger->error('job queue cursor failed to retrieve next job, restart daemon', [
431
                'category' => get_class($this),
432
                'exception' => $e,
433
            ]);
434
435
            $this->startDaemon();
436
        }
437 1
    }
438
439
    /**
440
     * Queue job.
441
     *
442
     * @param array $job
443
     */
444 1
    protected function queueJob(array $job): bool
445
    {
446 1
        if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING)) {
447 1
            $this->processJob($job);
448
        } elseif (self::STATUS_POSTPONED === $job['status']) {
449
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
450
                'category' => get_class($this),
451
            ]);
452
453
            $this->queue[] = $job;
454
        }
455
456 1
        return true;
457
    }
458
459
    /**
460
     * Get cursor.
461
     *
462
     * @param bool $tailable
463
     *
464
     * @return IteratorIterator
465
     */
466 4
    protected function getCursor(bool $tailable = true): IteratorIterator
467
    {
468
        $options = [
469 4
            'typeMap' => [
470
                'document' => 'array',
471
                'root' => 'array',
472
                'array' => 'array',
473
            ],
474
        ];
475
476 4
        if (true === $tailable) {
477 3
            $options['cursorType'] = Find::TAILABLE;
478 3
            $options['noCursorTimeout'] = true;
479
        }
480
481 4
        $cursor = $this->db->{$this->collection_name}->find([
482
            '$or' => [
483 4
                ['status' => self::STATUS_WAITING],
484 4
                ['status' => self::STATUS_POSTPONED],
485
            ],
486 4
        ], $options);
487
488 4
        $iterator = new IteratorIterator($cursor);
489 4
        $iterator->rewind();
490
491 4
        return $iterator;
492
    }
493
494
    /**
495
     * Update job status.
496
     *
497
     * @param ObjectId $id
498
     * @param int      $status
499
     * @param mixed    $from_status
500
     *
501
     * @return bool
502
     */
503 5
    protected function collectJob(ObjectId $id, int $status, $from_status = self::STATUS_WAITING): bool
504
    {
505 5
        $result = $this->db->{$this->collection_name}->updateMany([
506 5
            '_id' => $id,
507 5
            'status' => $from_status,
508
            '$isolated' => true,
509
        ], [
510
            '$set' => [
511 5
                'status' => $status,
512 5
                'timestamp' => new UTCDateTime(),
513
            ],
514
        ]);
515
516 5
        if (1 === $result->getModifiedCount()) {
517 5
            $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
518 5
                'category' => get_class($this),
519
            ]);
520
521 5
            return true;
522
        }
523
524 1
        $this->logger->debug('job ['.$id.'] is already collected with status ['.$status.']', [
525 1
            'category' => get_class($this),
526
        ]);
527
528 1
        return false;
529
    }
530
531
    /**
532
     * Update job status.
533
     *
534
     * @param ObjectId $id
535
     * @param int      $status
536
     *
537
     * @return bool
538
     */
539 13
    protected function updateJob(ObjectId $id, int $status): bool
540
    {
541 13
        $result = $this->db->{$this->collection_name}->updateMany([
542 13
            '_id' => $id,
543
            '$isolated' => true,
544
        ], [
545
            '$set' => [
546 13
                'status' => $status,
547 13
                'timestamp' => new UTCDateTime(),
548
            ],
549
        ]);
550
551 13
        return $result->isAcknowledged();
552
    }
553
554
    /**
555
     * Check local queue for postponed jobs.
556
     *
557
     * @return bool
558
     */
559 4
    protected function processLocalQueue(): bool
560
    {
561 4
        $now = new UTCDateTime();
562 4
        foreach ($this->queue as $key => $job) {
563 3
            if ($job['at'] <= $now) {
564 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
565 2
                    'category' => get_class($this),
566
                ]);
567
568 2
                unset($this->queue[$key]);
569 2
                $job['at'] = null;
570
571 2
                if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING, self::STATUS_POSTPONED)) {
572 3
                    $this->processJob($job);
573
                }
574
            }
575
        }
576
577 4
        return true;
578
    }
579
580
    /**
581
     * Process job.
582
     *
583
     * @param array $job
584
     *
585
     * @return ObjectId
586
     */
587 8
    protected function processJob(array $job): ObjectId
588
    {
589 8
        if ($job['at'] instanceof UTCDateTime) {
590 1
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
0 ignored issues
show
Bug introduced by
It seems like $job['_id'] can also be of type MongoDB\BSON\UTCDateTime; however, parameter $id of TaskScheduler\Async::updateJob() does only seem to accept MongoDB\BSON\ObjectId, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

590
            $this->updateJob(/** @scrutinizer ignore-type */ $job['_id'], self::STATUS_POSTPONED);
Loading history...
591 1
            $this->queue[] = $job;
592
593 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
594 1
                'category' => get_class($this),
595
            ]);
596
597 1
            return $job['_id'];
0 ignored issues
show
Bug Best Practice introduced by
The expression return $job['_id'] could return the type MongoDB\BSON\UTCDateTime which is incompatible with the type-hinted return MongoDB\BSON\ObjectId. Consider adding an additional type-check to rule them out.
Loading history...
598
        }
599
600 7
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
601 7
            'category' => get_class($this),
602 7
            'params' => $job['data'],
603
        ]);
604
605
        try {
606 7
            $this->executeJob($job);
607 5
        } catch (\Exception $e) {
608 5
            $this->logger->error('failed execute job ['.$job['_id'].']', [
609 5
                'category' => get_class($this),
610 5
                'exception' => $e,
611
            ]);
612
613 5
            $this->updateJob($job['_id'], self::STATUS_FAILED);
614
615 5
            if ($job['retry'] > 0) {
616 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
617 1
                    'category' => get_class($this),
618
                ]);
619
620 1
                return $this->addJob($job['class'], $job['data'], [
621 1
                    self::OPTION_AT => time() + $job['retry_interval'],
622 1
                    self::OPTION_INTERVAL => $job['interval'],
623 1
                    self::OPTION_RETRY => --$job['retry'],
624 1
                    self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
625
                ]);
626
            }
627
        }
628
629 6
        if ($job['interval'] >= 0) {
630 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['interval'].'s]', [
631 1
                'category' => get_class($this),
632
            ]);
633
634 1
            return $this->addJob($job['class'], $job['data'], [
635 1
                self::OPTION_AT => time() + $job['interval'],
636 1
                self::OPTION_INTERVAL => $job['interval'],
637 1
                self::OPTION_RETRY => $job['retry'],
638 1
                self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
639
            ]);
640
        }
641
642 5
        return $job['_id'];
643
    }
644
645
    /**
646
     * Execute job.
647
     *
648
     * @param array $job
649
     *
650
     * @return bool
651
     */
652 10
    protected function executeJob(array $job): bool
653
    {
654 10
        if (!class_exists($job['class'])) {
655 4
            throw new Exception('job class does not exists');
656
        }
657
658 6
        if (null === $this->container) {
659 6
            $instance = new $job['class']();
660
        } else {
661
            $instance = $this->container->get($job['class']);
662
        }
663
664 6
        if (!($instance instanceof JobInterface)) {
665
            throw new Exception('job must implement JobInterface');
666
        }
667
668 6
        $instance->setData($job['data'])
669 6
            ->start();
670
671 3
        return $this->updateJob($job['_id'], self::STATUS_DONE);
672
    }
673
}
674