Passed
Push — master ( 29ccfd...b0131a )
by Raffael
02:40
created

Async::__construct()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 5
nc 2
nop 4
dl 0
loc 8
ccs 6
cts 6
cp 1
crap 2
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Exception\RuntimeException;
20
use MongoDB\Operation\Find;
21
use Psr\Container\ContainerInterface;
22
use Psr\Log\LoggerInterface;
23
use Traversable;
24
25
class Async
26
{
27
    /**
28
     * Job status.
29
     */
30
    const STATUS_WAITING = 0;
31
    const STATUS_POSTPONED = 1;
32
    const STATUS_PROCESSING = 2;
33
    const STATUS_DONE = 3;
34
    const STATUS_FAILED = 4;
35
    const STATUS_CANCELED = 5;
36
37
    /**
38
     * Job options.
39
     */
40
    const OPTION_AT = 'at';
41
    const OPTION_INTERVAL = 'interval';
42
    const OPTION_RETRY = 'retry';
43
    const OPTION_RETRY_INTERVAL = 'retry_interval';
44
45
    /**
46
     * Database.
47
     *
48
     * @var Database
49
     */
50
    protected $db;
51
52
    /**
53
     * LoggerInterface.
54
     *
55
     * @var LoggerInterface
56
     */
57
    protected $logger;
58
59
    /**
60
     * Local queue.
61
     *
62
     * @var array
63
     */
64
    protected $queue = [];
65
66
    /**
67
     * Collection name.
68
     *
69
     * @var string
70
     */
71
    protected $collection_name = 'queue';
72
73
    /**
74
     * Container.
75
     *
76
     * @var ContainerInterface
77
     */
78
    protected $container;
79
80
    /**
81
     * Default at (Secconds from now).
82
     *
83
     * @var int
84
     */
85
    protected $default_at = 0;
86
87
    /**
88
     * Default interval (secconds).
89
     *
90
     * @var int
91
     */
92
    protected $default_interval = -1;
93
94
    /**
95
     * Default retry.
96
     *
97
     * @var int
98
     */
99
    protected $default_retry = 0;
100
101
    /**
102
     * Default retry interval (secconds).
103
     *
104
     * @var int
105
     */
106
    protected $default_retry_interval = 300;
107
108
    /**
109
     * Queue size.
110
     *
111
     * @var int
112
     */
113
    protected $queue_size = 100000;
114
115
    /**
116
     * Init queue.
117
     *
118
     * @param Database           $db
119
     * @param LoggerInterface    $logger
120
     * @param ContainerInterface $container
121
     * @param iterable           $config
122
     */
123 32
    public function __construct(Database $db, LoggerInterface $logger, ?ContainerInterface $container = null, ?Iterable $config = null)
124
    {
125 32
        $this->db = $db;
126 32
        $this->logger = $logger;
127 32
        $this->container = $container;
128
129 32
        if (null !== $config) {
130 1
            $this->setOptions($config);
131
        }
132 32
    }
133
134
    /**
135
     * Set options.
136
     *
137
     * @param iterable $config
138
     *
139
     * @return Async
140
     */
141 1
    public function setOptions(Iterable $config = []): self
142
    {
143 1
        foreach ($config as $option => $value) {
144
            switch ($option) {
145 1
                case 'collection_name':
146 1
                    $this->{$option} = (string) $value;
147
148 1
                break;
149 1
                case 'default_retry':
150 1
                case 'default_at':
151 1
                case 'default_retry_interval':
152 1
                case 'default_interval':
153 1
                case 'queue_size':
154 1
                    $this->{$option} = (int) $value;
155
156 1
                break;
157
                default:
158 1
                    throw new Exception('invalid option '.$option.' given');
159
            }
160
        }
161
162 1
        return $this;
163
    }
164
165
    /**
166
     * Create queue collection.
167
     *
168
     * @return Async
169
     */
170 1
    public function createQueue(): self
171
    {
172 1
        $this->db->createCollection(
173 1
            $this->collection_name,
174
            [
175 1
                'capped' => true,
176 1
                'size' => $this->queue_size,
177
            ]
178
        );
179
180 1
        return $this;
181
    }
182
183
    /**
184
     * Get job by ID.
185
     *
186
     * @param ObjectId
187
     *
188
     * @return array
189
     */
190 23
    public function getJob(ObjectId $id): array
191
    {
192 23
        $result = $this->db->{$this->collection_name}->findOne([
193 23
            '_id' => $id,
194
        ], [
195 23
            'typeMap' => [
196
                'document' => 'array',
197
                'root' => 'array',
198
                'array' => 'array',
199
            ],
200
        ]);
201
202 23
        if (null === $result) {
203 1
            throw new Exception('job '.$id.' was not found');
204
        }
205
206 22
        return $result;
207
    }
208
209
    /**
210
     * Cancel job.
211
     *
212
     * @param ObjectId $id
213
     *
214
     * @return bool
215
     */
216 1
    public function cancelJob(ObjectId $id): bool
217
    {
218 1
        return $this->updateJob($id, self::STATUS_CANCELED);
219
    }
220
221
    /**
222
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
223
     *
224
     * @param array $filter
225
     *
226
     * @return Traversable
227
     */
228 3
    public function getJobs(array $filter = []): Traversable
229
    {
230 3
        if (0 === count($filter)) {
231
            $filter = [
232 2
                self::STATUS_WAITING,
233 2
                self::STATUS_PROCESSING,
234 2
                self::STATUS_POSTPONED,
235
            ];
236
        }
237
238 3
        $result = $this->db->{$this->collection_name}->find([
239
            'status' => [
240 3
                '$in' => $filter,
241
            ],
242
        ], [
243 3
            'typeMap' => [
244
                'document' => 'array',
245
                'root' => 'array',
246
                'array' => 'array',
247
            ],
248
        ]);
249
250 3
        return $result;
251
    }
252
253
    /**
254
     * Add job to queue.
255
     *
256
     * @param string $class
257
     * @param mixed  $data
258
     * @param array  $options
259
     *
260
     * @return ObjectId
261
     */
262 30
    public function addJob(string $class, $data, array $options = []): ObjectId
263
    {
264
        $defaults = [
265 30
            self::OPTION_AT => $this->default_at,
266 30
            self::OPTION_INTERVAL => $this->default_interval,
267 30
            self::OPTION_RETRY => $this->default_retry,
268 30
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
269
        ];
270
271 30
        $options = array_merge($defaults, $options);
272 30
        $this->validateOptions($options);
273 28
        $at = null;
274
275 28
        if ($options[self::OPTION_AT] > 0) {
276 7
            $at = new UTCDateTime($options[self::OPTION_AT] * 1000);
277
        }
278
279 28
        $result = $this->db->{$this->collection_name}->insertOne([
280 28
            'class' => $class,
281 28
            'status' => self::STATUS_WAITING,
282 28
            'timestamp' => new UTCDateTime(),
283 28
            'at' => $at,
284 28
            'retry' => $options[self::OPTION_RETRY],
285 28
            'retry_interval' => $options[self::OPTION_RETRY_INTERVAL],
286 28
            'interval' => $options[self::OPTION_INTERVAL],
287 28
            'data' => $data,
288 28
        ], ['$isolated' => true]);
289
290 28
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
291 28
            'category' => get_class($this),
292 28
            'params' => $options,
293 28
            'data' => $data,
294
        ]);
295
296 28
        return $result->getInsertedId();
297
    }
298
299
    /**
300
     * Only add job if not in queue yet.
301
     *
302
     * @param string $class
303
     * @param mixed  $data
304
     * @param array  $options
305
     *
306
     * @return ObjectId
307
     */
308 1
    public function addJobOnce(string $class, $data, array $options = []): ObjectId
309
    {
310
        $filter = [
311 1
            'class' => $class,
312 1
            'data' => $data,
313
            '$or' => [
314 1
                ['status' => self::STATUS_WAITING],
315 1
                ['status' => self::STATUS_POSTPONED],
316
            ],
317
        ];
318
319 1
        $result = $this->db->queue->findOne($filter);
320
321 1
        if (null === $result) {
322 1
            return $this->addJob($class, $data, $options);
323
        }
324 1
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
325 1
                'category' => get_class($this),
326 1
                'data' => $data,
327
            ]);
328
329 1
        return $result['_id'];
330
    }
331
332
    /**
333
     * Execute job queue as endless loop.
334
     */
335
    public function startDaemon()
336
    {
337
        $cursor = $this->getCursor();
338
339
        while (true) {
340
            $this->processLocalQueue();
341
342
            if (null === $cursor->current()) {
343
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

343
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
344
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
345
                        'category' => get_class($this),
346
                    ]);
347
348
                    return $this->startDaemon();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->startDaemon() targeting TaskScheduler\Async::startDaemon() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
349
                }
350
351
                $this->retrieveNextJob($cursor);
0 ignored issues
show
Bug introduced by
$cursor of type IteratorIterator is incompatible with the type iterable expected by parameter $cursor of TaskScheduler\Async::retrieveNextJob(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

351
                $this->retrieveNextJob(/** @scrutinizer ignore-type */ $cursor);
Loading history...
352
353
                continue;
354
            }
355
356
            $job = $cursor->current();
357
            $this->retrieveNextJob($cursor);
358
            $this->queueJob($job);
359
        }
360
    }
361
362
    /**
363
     * Execute job queue.
364
     *
365
     * @return bool
366
     */
367 1
    public function startOnce(): bool
368
    {
369 1
        $cursor = $this->getCursor(false);
370
371 1
        while (true) {
372 1
            $this->processLocalQueue();
373
374 1
            if (null === $cursor->current()) {
375 1
                if ($cursor->getInnerIterator()->isDead()) {
376
                    $this->logger->debug('all jobs were processed', [
377
                        'category' => get_class($this),
378
                    ]);
379
380
                    return false;
381
                }
382
383 1
                return true;
384
            }
385
386 1
            $job = $cursor->current();
387 1
            $cursor->next();
388 1
            $this->queueJob($job);
389
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
390
    }
391
392
    /**
393
     * Validate given job options.
394
     *
395
     * @param array $options
396
     *
397
     * @return Async
398
     */
399 30
    protected function validateOptions(array $options): self
400
    {
401 30
        if (count($options) > 4) {
402 1
            throw new Exception('invalid option given');
403
        }
404
405 29
        if (4 !== count(array_filter($options, 'is_int'))) {
406 1
            throw new Exception('Only integers are allowed to passed');
407
        }
408
409 28
        return $this;
410
    }
411
412
    /**
413
     * Retrieve next job.
414
     *
415
     * @param iterable $cursor
416
     */
417 1
    protected function retrieveNextJob(Iterable $cursor)
418
    {
419
        try {
420 1
            $cursor->next();
421
        } catch (RuntimeException $e) {
422
            $this->logger->error('job queue cursor failed to retrieve next job, restart daemon', [
423
                'category' => get_class($this),
424
                'exception' => $e,
425
            ]);
426
427
            $this->startDaemon();
428
        }
429 1
    }
430
431
    /**
432
     * Queue job.
433
     *
434
     * @param array $job
435
     */
436 1
    protected function queueJob(array $job): bool
437
    {
438 1
        if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING)) {
439 1
            $this->processJob($job);
440
        } elseif (self::STATUS_POSTPONED === $job['status']) {
441
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
442
                'category' => get_class($this),
443
            ]);
444
445
            $this->queue[] = $job;
446
        }
447
448 1
        return true;
449
    }
450
451
    /**
452
     * Get cursor.
453
     *
454
     * @param bool $tailable
455
     *
456
     * @return IteratorIterator
457
     */
458 4
    protected function getCursor(bool $tailable = true): IteratorIterator
459
    {
460
        $options = [
461 4
            'typeMap' => [
462
                'document' => 'array',
463
                'root' => 'array',
464
                'array' => 'array',
465
            ],
466
        ];
467
468 4
        if (true === $tailable) {
469 3
            $options['cursorType'] = Find::TAILABLE;
470 3
            $options['noCursorTimeout'] = true;
471
        }
472
473 4
        $cursor = $this->db->{$this->collection_name}->find([
474
            '$or' => [
475 4
                ['status' => self::STATUS_WAITING],
476 4
                ['status' => self::STATUS_POSTPONED],
477
            ],
478 4
        ], $options);
479
480 4
        $iterator = new IteratorIterator($cursor);
481 4
        $iterator->rewind();
482
483 4
        return $iterator;
484
    }
485
486
    /**
487
     * Update job status.
488
     *
489
     * @param ObjectId $id
490
     * @param int      $status
491
     * @param mixed    $from_status
492
     *
493
     * @return bool
494
     */
495 5
    protected function collectJob(ObjectId $id, int $status, $from_status = self::STATUS_WAITING): bool
496
    {
497 5
        $result = $this->db->{$this->collection_name}->updateMany([
498 5
            '_id' => $id,
499 5
            'status' => $from_status,
500
            '$isolated' => true,
501
        ], [
502
            '$set' => [
503 5
                'status' => $status,
504 5
                'timestamp' => new UTCDateTime(),
505
            ],
506
        ]);
507
508 5
        if (1 === $result->getModifiedCount()) {
509 5
            $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
510 5
                'category' => get_class($this),
511
            ]);
512
513 5
            return true;
514
        }
515
516 1
        $this->logger->debug('job ['.$id.'] is already collected with status ['.$status.']', [
517 1
            'category' => get_class($this),
518
        ]);
519
520 1
        return false;
521
    }
522
523
    /**
524
     * Update job status.
525
     *
526
     * @param ObjectId $id
527
     * @param int      $status
528
     *
529
     * @return bool
530
     */
531 14
    protected function updateJob(ObjectId $id, int $status): bool
532
    {
533 14
        $result = $this->db->{$this->collection_name}->updateMany([
534 14
            '_id' => $id,
535
            '$isolated' => true,
536
        ], [
537
            '$set' => [
538 14
                'status' => $status,
539 14
                'timestamp' => new UTCDateTime(),
540
            ],
541
        ]);
542
543 14
        return $result->isAcknowledged();
544
    }
545
546
    /**
547
     * Check local queue for postponed jobs.
548
     *
549
     * @return bool
550
     */
551 4
    protected function processLocalQueue(): bool
552
    {
553 4
        $now = new UTCDateTime();
554 4
        foreach ($this->queue as $key => $job) {
555 3
            if ($job['at'] <= $now) {
556 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
557 2
                    'category' => get_class($this),
558
                ]);
559
560 2
                unset($this->queue[$key]);
561 2
                $job['at'] = null;
562
563 2
                if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING, self::STATUS_POSTPONED)) {
564 3
                    $this->processJob($job);
565
                }
566
            }
567
        }
568
569 4
        return true;
570
    }
571
572
    /**
573
     * Process job.
574
     *
575
     * @param array $job
576
     *
577
     * @return ObjectId
578
     */
579 8
    protected function processJob(array $job): ObjectId
580
    {
581 8
        if ($job['at'] instanceof UTCDateTime) {
582 1
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
0 ignored issues
show
Bug introduced by
It seems like $job['_id'] can also be of type MongoDB\BSON\UTCDateTime; however, parameter $id of TaskScheduler\Async::updateJob() does only seem to accept MongoDB\BSON\ObjectId, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

582
            $this->updateJob(/** @scrutinizer ignore-type */ $job['_id'], self::STATUS_POSTPONED);
Loading history...
583 1
            $this->queue[] = $job;
584
585 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
586 1
                'category' => get_class($this),
587
            ]);
588
589 1
            return $job['_id'];
0 ignored issues
show
Bug Best Practice introduced by
The expression return $job['_id'] could return the type MongoDB\BSON\UTCDateTime which is incompatible with the type-hinted return MongoDB\BSON\ObjectId. Consider adding an additional type-check to rule them out.
Loading history...
590
        }
591
592 7
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
593 7
            'category' => get_class($this),
594 7
            'params' => $job['data'],
595
        ]);
596
597
        try {
598 7
            $this->executeJob($job);
599 5
        } catch (\Exception $e) {
600 5
            $this->logger->error('failed execute job ['.$job['_id'].']', [
601 5
                'category' => get_class($this),
602 5
                'exception' => $e,
603
            ]);
604
605 5
            $this->updateJob($job['_id'], self::STATUS_FAILED);
606
607 5
            if ($job['retry'] > 0) {
608 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
609 1
                    'category' => get_class($this),
610
                ]);
611
612 1
                return $this->addJob($job['class'], $job['data'], [
613 1
                    self::OPTION_AT => time() + $job['retry_interval'],
614 1
                    self::OPTION_INTERVAL => $job['interval'],
615 1
                    self::OPTION_RETRY => --$job['retry'],
616 1
                    self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
617
                ]);
618
            }
619
        }
620
621 6
        if ($job['interval'] >= 0) {
622 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['interval'].'s]', [
623 1
                'category' => get_class($this),
624
            ]);
625
626 1
            return $this->addJob($job['class'], $job['data'], [
627 1
                self::OPTION_AT => time() + $job['interval'],
628 1
                self::OPTION_INTERVAL => $job['interval'],
629 1
                self::OPTION_RETRY => $job['retry'],
630 1
                self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
631
            ]);
632
        }
633
634 5
        return $job['_id'];
635
    }
636
637
    /**
638
     * Execute job.
639
     *
640
     * @param array $job
641
     *
642
     * @return bool
643
     */
644 11
    protected function executeJob(array $job): bool
645
    {
646 11
        if (!class_exists($job['class'])) {
647 4
            throw new Exception('job class does not exists');
648
        }
649
650 7
        if (null === $this->container) {
651 6
            $instance = new $job['class']();
652
        } else {
653 1
            $instance = $this->container->get($job['class']);
654
        }
655
656 7
        if (!($instance instanceof JobInterface)) {
657
            throw new Exception('job must implement JobInterface');
658
        }
659
660 7
        $instance->setData($job['data'])
661 7
            ->start();
662
663 4
        return $this->updateJob($job['_id'], self::STATUS_DONE);
664
    }
665
}
666