Passed
Push — master ( c2ec67...65f694 )
by Raffael
05:00 queued 02:27
created

Async::processLocalQueue()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 19
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 10
nc 4
nop 0
dl 0
loc 19
ccs 0
cts 11
cp 0
crap 20
rs 9.2
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Cursor;
20
use MongoDB\Driver\Exception\RuntimeException;
21
use MongoDB\Operation\Find;
22
use Psr\Container\ContainerInterface;
23
use Psr\Log\LoggerInterface;
24
use Traversable;
25
26
class Async
27
{
28
    /**
29
     * Job status.
30
     */
31
    const STATUS_WAITING = 0;
32
    const STATUS_POSTPONED = 1;
33
    const STATUS_PROCESSING = 2;
34
    const STATUS_DONE = 3;
35
    const STATUS_FAILED = 4;
36
    const STATUS_CANCELED = 5;
37
38
    /**
39
     * Job options.
40
     */
41
    const OPTION_AT = 'at';
42
    const OPTION_INTERVAL = 'interval';
43
    const OPTION_RETRY = 'retry';
44
    const OPTION_RETRY_INTERVAL = 'retry_interval';
45
46
    /**
47
     * Database.
48
     *
49
     * @var Database
50
     */
51
    protected $db;
52
53
    /**
54
     * LoggerInterface.
55
     *
56
     * @var LoggerInterface
57
     */
58
    protected $logger;
59
60
    /**
61
     * Local queue.
62
     *
63
     * @var array
64
     */
65
    protected $queue = [];
66
67
    /**
68
     * Collection name.
69
     *
70
     * @var string
71
     */
72
    protected $collection_name = 'queue';
73
74
    /**
75
     * Container.
76
     *
77
     * @var ContainerInterface
78
     */
79
    protected $container;
80
81
    /**
82
     * Default at (Secconds from now).
83
     *
84
     * @var int
85
     */
86
    protected $default_at = 0;
87
88
    /**
89
     * Default interval (secconds).
90
     *
91
     * @var int
92
     */
93
    protected $default_interval = -1;
94
95
    /**
96
     * Default retry.
97
     *
98
     * @var int
99
     */
100
    protected $default_retry = 0;
101
102
    /**
103
     * Default retry interval (secconds).
104
     *
105
     * @var int
106
     */
107
    protected $default_retry_interval = 300;
108
109
    /**
110
     * Queue size.
111
     *
112
     * @var int
113
     */
114
    protected $queue_size = 100000;
115
116
    /**
117
     * Init queue.
118
     *
119
     * @param Database           $db
120
     * @param LoggerInterface    $logger
121
     * @param ContainerInterface $container
122
     * @param iterable           $config
123
     */
124 16
    public function __construct(Database $db, LoggerInterface $logger, ?ContainerInterface $container = null, ?Iterable $config = null)
125
    {
126 16
        $this->db = $db;
127 16
        $this->logger = $logger;
128 16
        $this->container = $container;
129 16
        $this->setOptions($config);
130 16
    }
131
132
    /**
133
     * Set options.
134
     *
135
     * @param iterable $config
136
     *
137
     * @return Async
138
     */
139 16
    public function setOptions(? Iterable $config = null): self
140
    {
141 16
        if (null === $config) {
142 16
            return $this;
143
        }
144
145
        foreach ($config as $option => $value) {
146
            switch ($option) {
147
                case 'collection_name':
148
                    $this->{$option} = (string) $value;
149
150
                break;
151
                case 'default_retry':
152
                case 'default_at':
153
                case 'default_retry_interval':
154
                case 'default_interval':
155
                case 'queue_size':
156
                    $this->{$option} = (int) $value;
157
158
                break;
159
                default:
160
                    throw new Exception('invalid option '.$option.' given');
161
            }
162
        }
163
164
        return $this;
165
    }
166
167
    /**
168
     * Create queue collection.
169
     *
170
     * @return Async
171
     */
172
    public function createQueue(): self
173
    {
174
        $this->db->createCollection(
175
            $this->collection_name,
176
            [
177
                'capped' => true,
178
                'size' => $this->queue_size,
179
            ]
180
        );
181
182
        return $this;
183
    }
184
185
    /**
186
     * Get job by ID.
187
     *
188
     * @param ObjectId
189
     *
190
     * @return array
191
     */
192 11
    public function getJob(ObjectId $id): array
193
    {
194 11
        $result = $this->db->{$this->collection_name}->findOne([
195 11
            '_id' => $id,
196
        ], [
197 11
            'typeMap' => [
198
                'document' => 'array',
199
                'root' => 'array',
200
                'array' => 'array',
201
            ],
202
        ]);
203
204 11
        if (null === $result) {
205 1
            throw new Exception('job '.$id.' was not found');
206
        }
207
208 10
        return $result;
209
    }
210
211
    /**
212
     * Cancel job.
213
     *
214
     *  @param ObjectId $id
215
     */
216 1
    public function cancelJob(ObjectId $id): bool
217
    {
218 1
        return $this->updateJob($id, self::STATUS_CANCELED);
219
    }
220
221
    /**
222
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
223
     *
224
     * @param array $filter
225
     *
226
     * @return Traversable
227
     */
228 3
    public function getJobs(array $filter = []): Traversable
229
    {
230 3
        if (0 === count($filter)) {
231
            $filter = [
232 2
                self::STATUS_WAITING,
233 2
                self::STATUS_PROCESSING,
234 2
                self::STATUS_POSTPONED,
235
            ];
236
        }
237
238 3
        $result = $this->db->{$this->collection_name}->find([
239
            'status' => [
240 3
                '$in' => $filter,
241
            ],
242
        ], [
243 3
            'typeMap' => [
244
                'document' => 'array',
245
                'root' => 'array',
246
                'array' => 'array',
247
            ],
248
        ]);
249
250 3
        return $result;
251
    }
252
253
    /**
254
     * Validate given job options.
255
     *
256
     * @param array $options
257
     *
258
     * @return Async
259
     */
260 15
    public function validateOptions(array $options): self
261
    {
262 15
        foreach ($options as $option => $value) {
263
            switch ($option) {
264 15
                case self::OPTION_AT:
265 15
                case self::OPTION_RETRY:
266 15
                case self::OPTION_RETRY_INTERVAL:
267 15
                case self::OPTION_INTERVAL:
268 15
                    if (!is_int($value)) {
269 1
                        throw new Exception('option '.$option.' must be an integer');
270
                    }
271
272 15
                break;
273
                default:
274 15
                    throw new Exception('invalid option '.$option.' given');
275
            }
276
        }
277
278 14
        return $this;
279
    }
280
281
    /**
282
     * Add job to queue.
283
     *
284
     * @param string $class
285
     * @param mixed  $data
286
     * @param array  $options
287
     *
288
     * @return ObjectId
289
     */
290 15
    public function addJob(string $class, $data, array $options = []): ObjectId
291
    {
292
        $defaults = [
293 15
            self::OPTION_AT => $this->default_at,
294 15
            self::OPTION_INTERVAL => $this->default_interval,
295 15
            self::OPTION_RETRY => $this->default_retry,
296 15
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
297
        ];
298
299 15
        $options = array_merge($defaults, $options);
300 15
        $this->validateOptions($options);
301
302 14
        if ($options[self::OPTION_AT] > 0) {
303 1
            $at = new UTCDateTime($options[self::OPTION_AT] * 1000);
304
        } else {
305 13
            $at = null;
306
        }
307
308 14
        $result = $this->db->{$this->collection_name}->insertOne([
309 14
            'class' => $class,
310 14
            'status' => self::STATUS_WAITING,
311 14
            'timestamp' => new UTCDateTime(),
312 14
            'at' => $at,
313 14
            'retry' => $options[self::OPTION_RETRY],
314 14
            'retry_interval' => $options[self::OPTION_RETRY_INTERVAL],
315 14
            'interval' => $options[self::OPTION_INTERVAL],
316 14
            'data' => $data,
317 14
        ], ['$isolated' => true]);
318
319 14
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
320 14
            'category' => get_class($this),
321 14
            'params' => $options,
322 14
            'data' => $data,
323
        ]);
324
325 14
        return $result->getInsertedId();
326
    }
327
328
    /**
329
     * Only add job if not in queue yet.
330
     *
331
     * @param string $class
332
     * @param mixed  $data
333
     * @param array  $options
334
     *
335
     * @return ObjectId
336
     */
337 1
    public function addJobOnce(string $class, $data, array $options = []): ObjectId
338
    {
339
        $filter = [
340 1
            'class' => $class,
341 1
            'data' => $data,
342
            '$or' => [
343 1
                ['status' => self::STATUS_WAITING],
344 1
                ['status' => self::STATUS_POSTPONED],
345
            ],
346
        ];
347
348 1
        $result = $this->db->queue->findOne($filter);
349
350 1
        if (null === $result) {
351 1
            return $this->addJob($class, $data, $options);
352
        }
353 1
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
354 1
                'category' => get_class($this),
355 1
                'data' => $data,
356
            ]);
357
358 1
        return $result['_id'];
359
    }
360
361
    /**
362
     * Execute job queue as endless loop.
363
     */
364
    public function startDaemon()
365
    {
366
        $cursor = $this->getCursor();
367
368
        while (true) {
369
            $this->processLocalQueue();
370
371
            if (null === $cursor->current()) {
372
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

372
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
373
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
374
                        'category' => get_class($this),
375
                    ]);
376
377
                    return $this->startDaemon();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->startDaemon() targeting TaskScheduler\Async::startDaemon() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
378
                }
379
380
                $this->retrieveNextJob($cursor);
0 ignored issues
show
Bug introduced by
$cursor of type IteratorIterator is incompatible with the type MongoDB\Driver\Cursor expected by parameter $cursor of TaskScheduler\Async::retrieveNextJob(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

380
                $this->retrieveNextJob(/** @scrutinizer ignore-type */ $cursor);
Loading history...
381
382
                continue;
383
            }
384
385
            $job = $cursor->current();
386
            $this->retrieveNextJob($cursor);
387
            $this->queueJob($job);
388
        }
389
    }
390
391
    /**
392
     * Execute job queue.
393
     *
394
     * @return bool
395
     */
396
    public function startOnce(): bool
397
    {
398
        $cursor = $this->getCursor(false);
399
400
        while (true) {
401
            $this->processLocalQueue();
402
403
            if (null === $cursor->current()) {
404
                if ($cursor->getInnerIterator()->isDead()) {
405
                    $this->logger->debug('all jobs were processed', [
406
                        'category' => get_class($this),
407
                    ]);
408
409
                    return false;
410
                }
411
412
                return true;
413
            }
414
415
            $job = $cursor->current();
416
            $cursor->next();
417
            $this->queueJob($job);
418
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
419
    }
420
421
    /**
422
     * Retrieve next job.
423
     *
424
     * @param Cursor $cursor
425
     */
426
    protected function retrieveNextJob(Cursor $cursor)
427
    {
428
        try {
429
            $cursor->next();
0 ignored issues
show
Bug introduced by
The method next() does not exist on MongoDB\Driver\Cursor. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

429
            $cursor->/** @scrutinizer ignore-call */ 
430
                     next();

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
430
        } catch (RuntimeException $e) {
431
            $this->logger->error('job queue cursor failed to retrieve next job, restart daemon', [
432
                'category' => get_class($this),
433
                'exception' => $e,
434
            ]);
435
436
            $this->startDaemon();
437
        }
438
    }
439
440
    /**
441
     * Queue job.
442
     *
443
     * @param array $job
444
     */
445
    protected function queueJob(array $job): bool
446
    {
447
        if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING)) {
448
            $this->processJob($job);
449
        } elseif (self::STATUS_POSTPONED === $job['status']) {
450
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
451
                'category' => get_class($this),
452
            ]);
453
454
            $this->queue[] = $job;
455
        }
456
457
        return true;
458
    }
459
460
    /**
461
     * Get cursor.
462
     *
463
     * @param bool $tailable
464
     *
465
     * @return IteratorIterator
466
     */
467
    protected function getCursor(bool $tailable = true): IteratorIterator
468
    {
469
        $options = [
470
            'typeMap' => [
471
                'document' => 'array',
472
                'root' => 'array',
473
                'array' => 'array',
474
            ],
475
        ];
476
477
        if (true === $tailable) {
478
            $options['cursorType'] = Find::TAILABLE;
479
            $options['noCursorTimeout'] = true;
480
        }
481
482
        $cursor = $this->db->{$this->collection_name}->find([
483
            '$or' => [
484
                ['status' => self::STATUS_WAITING],
485
                ['status' => self::STATUS_POSTPONED],
486
            ],
487
        ], $options);
488
489
        $iterator = new IteratorIterator($cursor);
490
        $iterator->rewind();
491
492
        return $iterator;
493
    }
494
495
    /**
496
     * Update job status.
497
     *
498
     * @param ObjectId $id
499
     * @param int      $status
500
     * @param mixed    $from_status
501
     *
502
     * @return bool
503
     */
504
    protected function collectJob(ObjectId $id, int $status, $from_status = self::STATUS_WAITING): bool
505
    {
506
        $result = $this->db->{$this->collection_name}->updateMany([
507
            '_id' => $id,
508
            'status' => $from_status,
509
            '$isolated' => true,
510
        ], [
511
            '$set' => [
512
                'status' => $status,
513
                'timestamp' => new UTCDateTime(),
514
            ],
515
        ]);
516
517
        if (1 === $result->getModifiedCount()) {
518
            $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
519
                'category' => get_class($this),
520
            ]);
521
522
            return true;
523
        }
524
525
        $this->logger->debug('job ['.$id.'] is already collected with status ['.$status.']', [
526
            'category' => get_class($this),
527
        ]);
528
529
        return false;
530
    }
531
532
    /**
533
     * Update job status.
534
     *
535
     * @param ObjectId $id
536
     * @param int      $status
537
     *
538
     * @return bool
539
     */
540 5
    protected function updateJob(ObjectId $id, int $status): bool
541
    {
542 5
        $result = $this->db->{$this->collection_name}->updateMany([
543 5
            '_id' => $id,
544
            '$isolated' => true,
545
        ], [
546
            '$set' => [
547 5
                'status' => $status,
548 5
                'timestamp' => new UTCDateTime(),
549
            ],
550
        ]);
551
552 5
        return $result->isAcknowledged();
553
    }
554
555
    /**
556
     * Check local queue for postponed jobs.
557
     *
558
     * @return bool
559
     */
560
    protected function processLocalQueue(): bool
561
    {
562
        $now = new UTCDateTime();
563
        foreach ($this->queue as $key => $job) {
564
            if ($job['at'] <= $now) {
565
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
566
                    'category' => get_class($this),
567
                ]);
568
569
                unset($this->queue[$key]);
570
                $job['at'] = null;
571
572
                if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING, self::STATUS_POSTPONED)) {
573
                    $this->processJob($job);
574
                }
575
            }
576
        }
577
578
        return true;
579
    }
580
581
    /**
582
     * Process job.
583
     *
584
     * @param array $job
585
     *
586
     * @return bool
587
     */
588 3
    protected function processJob(array $job): bool
589
    {
590 3
        if ($job['at'] instanceof UTCDateTime) {
591 1
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
0 ignored issues
show
Bug introduced by
It seems like $job['_id'] can also be of type MongoDB\BSON\UTCDateTime; however, parameter $id of TaskScheduler\Async::updateJob() does only seem to accept MongoDB\BSON\ObjectId, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

591
            $this->updateJob(/** @scrutinizer ignore-type */ $job['_id'], self::STATUS_POSTPONED);
Loading history...
592 1
            $this->queue[] = $job;
593
594 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
595 1
                'category' => get_class($this),
596
            ]);
597
598 1
            return true;
599
        }
600
601 2
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
602 2
            'category' => get_class($this),
603 2
            'params' => $job['data'],
604
        ]);
605
606
        try {
607 2
            $this->executeJob($job);
608 1
        } catch (\Exception $e) {
609 1
            $this->logger->error('failed execute job ['.$job['_id'].']', [
610 1
                'category' => get_class($this),
611 1
                'exception' => $e,
612
            ]);
613
614 1
            $this->updateJob($job['_id'], self::STATUS_FAILED);
615
616 1
            if ($job['retry'] > 0) {
617
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
618
                    'category' => get_class($this),
619
                ]);
620
621
                $this->addJob($job['class'], $job['data'], [
622
                    self::OPTION_AT => time() + $job['retry_interval'],
623
                    self::OPTION_INTERVAL => $job['interval'],
624
                    self::OPTION_RETRY => --$job['retry'],
625
                    self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
626
                ]);
627
            }
628
        }
629
630 2
        if ($job['interval'] >= 0) {
631
            $this->addJob($job['class'], $job['data'], [
632
                self::OPTION_AT => time() + $job['interval'],
633
                self::OPTION_INTERVAL => $job['interval'],
634
                self::OPTION_RETRY => $job['retry'],
635
                self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
636
            ]);
637
        }
638
639 2
        return true;
640
    }
641
642
    /**
643
     * Execute job.
644
     *
645
     * @param array $job
646
     *
647
     * @return bool
648
     */
649 5
    protected function executeJob(array $job): bool
650
    {
651 5
        if (!class_exists($job['class'])) {
652 1
            throw new Exception('job class does not exists');
653
        }
654
655 4
        if (null === $this->container) {
656 4
            $instance = new $job['class']();
657
        } else {
658
            $instance = $this->container->get($job['class']);
659
        }
660
661 4
        if (!($instance instanceof JobInterface)) {
662
            throw new Exception('job must implement JobInterface');
663
        }
664
665 4
        $instance->setData($job['data'])
666 4
            ->start();
667
668 2
        return $this->updateJob($job['_id'], self::STATUS_DONE);
669
    }
670
}
671