Passed
Push — master ( 8d41c2...8a1226 )
by Raffael
03:15
created

Async::getJob()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nc 2
nop 1
dl 0
loc 17
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Cursor;
20
use MongoDB\Operation\Find;
21
use Psr\Container\ContainerInterface;
22
use Psr\Log\LoggerInterface;
23
use Traversable;
24
25
class Async
26
{
27
    /**
28
     * Job status.
29
     */
30
    const STATUS_WAITING = 0;
31
    const STATUS_POSTPONED = 1;
32
    const STATUS_PROCESSING = 2;
33
    const STATUS_DONE = 3;
34
    const STATUS_FAILED = 4;
35
36
    /**
37
     * Job options.
38
     */
39
    const OPTION_AT = 'at';
40
    const OPTION_INTERVAL = 'interval';
41
    const OPTION_RETRY = 'retry';
42
    const OPTION_RETRY_INTERVAL = 'retry_interval';
43
44
    /**
45
     * Database.
46
     *
47
     * @var Database
48
     */
49
    protected $db;
50
51
    /**
52
     * LoggerInterface.
53
     *
54
     * @var LoggerInterface
55
     */
56
    protected $logger;
57
58
    /**
59
     * Local queue.
60
     *
61
     * @var array
62
     */
63
    protected $queue = [];
64
65
    /**
66
     * Collection name.
67
     *
68
     * @var string
69
     */
70
    protected $collection_name = 'queue';
71
72
    /**
73
     * Container.
74
     *
75
     * @var ContainerInterface
76
     */
77
    protected $container;
78
79
    /**
80
     * Default at (Secconds from now).
81
     *
82
     * @var int
83
     */
84
    protected $default_at = 0;
85
86
    /**
87
     * Default interval (secconds).
88
     *
89
     * @var int
90
     */
91
    protected $default_interval = -1;
92
93
    /**
94
     * Default retry.
95
     *
96
     * @var int
97
     */
98
    protected $default_retry = 0;
99
100
    /**
101
     * Default retry interval (secconds).
102
     *
103
     * @var int
104
     */
105
    protected $default_retry_interval = 300;
106
107
    /**
108
     * Queue size.
109
     *
110
     * @var int
111
     */
112
    protected $queue_size = 100000;
113
114
    /**
115
     * Init queue.
116
     *
117
     * @param Database           $db
118
     * @param LoggerInterface    $logger
119
     * @param ContainerInterface $container
120
     * @param iterable           $config
121
     */
122
    public function __construct(Database $db, LoggerInterface $logger, ?ContainerInterface $container = null, ?Iterable $config = null)
123
    {
124
        $this->db = $db;
125
        $this->logger = $logger;
126
        $this->container = $container;
127
        $this->setOptions($config);
128
    }
129
130
    /**
131
     * Set options.
132
     *
133
     * @param iterable $config
134
     *
135
     * @return Async
136
     */
137
    public function setOptions(? Iterable $config = null): self
138
    {
139
        if (null === $config) {
140
            return $this;
141
        }
142
143
        foreach ($config as $option => $value) {
144
            switch ($option) {
145
                case 'collection_name':
146
                    $this->{$option} = (string) $value;
147
148
                break;
149
                case 'default_retry':
150
                case 'default_at':
151
                case 'default_retry_interval':
152
                case 'default_interval':
153
                case 'queue_size':
154
                    $this->{$option} = (int) $value;
155
156
                break;
157
                default:
158
                    throw new Exception('invalid option '.$option.' given');
159
            }
160
        }
161
162
        return $this;
163
    }
164
165
    /**
166
     * Create queue collection.
167
     *
168
     * @return Async
169
     */
170
    public function createQueue(): self
171
    {
172
        $this->db->createCollection(
173
            $this->collection_name,
174
            [
175
                'capped' => true,
176
                'size' => $this->queue_size,
177
            ]
178
        );
179
180
        return $this;
181
    }
182
183
    /**
184
     * Get job by ID.
185
     *
186
     * @param ObjectId
187
     *
188
     * @return array
189
     */
190
    public function getJob(ObjectId $id): array
191
    {
192
        $result = $this->db->{$this->collection_name}->findOne([
193
            '_id' => $id,
194
        ], [
195
            'typeMap' => [
196
                'document' => 'array',
197
                'root' => 'array',
198
                'array' => 'array',
199
            ],
200
        ]);
201
202
        if (null === $result) {
203
            throw new Exception('job '.$id.' was not found');
204
        }
205
206
        return $result;
207
    }
208
209
    /**
210
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
211
     *
212
     * @param array $filter
213
     *
214
     * @return Traversable
215
     */
216
    public function getJobs(array $filter = []): Traversable
217
    {
218
        if (0 === count($filter)) {
219
            $filter = [
220
                self::STATUS_WAITING,
221
                self::STATUS_PROCESSING,
222
                self::STATUS_POSTPONED,
223
            ];
224
        }
225
226
        $result = $this->db->{$this->collection_name}->find([
227
            'status' => [
228
                '$in' => $filter,
229
            ],
230
        ]);
231
232
        return $result;
233
    }
234
235
    /**
236
     * Validate given job options.
237
     *
238
     * @param array $options
239
     *
240
     * @return Async
241
     */
242
    public function validateOptions(array $options): self
243
    {
244
        foreach ($options as $option => $value) {
245
            switch ($option) {
246
                case self::OPTION_AT:
247
                case self::OPTION_RETRY:
248
                case self::OPTION_RETRY_INTERVAL:
249
                case self::OPTION_INTERVAL:
250
                    if (!is_int($value)) {
251
                        throw new Exception('option '.$option.' must be an integer');
252
                    }
253
254
                break;
255
                default:
256
                    throw new Exception('invalid option '.$option.' given');
257
            }
258
        }
259
260
        return $this;
261
    }
262
263
    /**
264
     * Add job to queue.
265
     *
266
     * @param string $class
267
     * @param mixed  $data
268
     * @param array  $options
269
     *
270
     * @return ObjectId
271
     */
272
    public function addJob(string $class, $data, array $options = []): ObjectId
273
    {
274
        $defaults = [
275
            self::OPTION_AT => $this->default_at,
276
            self::OPTION_INTERVAL => $this->default_interval,
277
            self::OPTION_RETRY => $this->default_retry,
278
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
279
        ];
280
281
        $options = array_merge($defaults, $options);
282
        $this->validateOptions($options);
283
284
        if ($options[self::OPTION_AT] > 0) {
285
            $at = new UTCDateTime($options[self::OPTION_AT] * 1000);
286
        } else {
287
            $at = null;
288
        }
289
290
        $result = $this->db->{$this->collection_name}->insertOne([
291
            'class' => $class,
292
            'status' => self::STATUS_WAITING,
293
            'timestamp' => new UTCDateTime(),
294
            'at' => $at,
295
            'retry' => $options[self::OPTION_RETRY],
296
            'retry_interval' => $options[self::OPTION_RETRY_INTERVAL],
297
            'interval' => $options[self::OPTION_INTERVAL],
298
            'data' => $data,
299
        ], ['$isolated' => true]);
300
301
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
302
            'category' => get_class($this),
303
            'params' => $options,
304
            'data' => $data,
305
        ]);
306
307
        return $result->getInsertedId();
308
    }
309
310
    /**
311
     * Only add job if not in queue yet.
312
     *
313
     * @param string $class
314
     * @param mixed  $data
315
     * @param array  $options
316
     *
317
     * @return ObjectId
318
     */
319
    public function addJobOnce(string $class, $data, array $options = []): ObjectId
320
    {
321
        $filter = [
322
            'class' => $class,
323
            'data' => $data,
324
            '$or' => [
325
                ['status' => self::STATUS_WAITING],
326
                ['status' => self::STATUS_POSTPONED],
327
            ],
328
        ];
329
330
        $result = $this->db->queue->findOne($filter);
331
332
        if (null === $result) {
333
            return $this->addJob($class, $data, $options);
334
        }
335
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
336
                'category' => get_class($this),
337
                'data' => $data,
338
            ]);
339
340
        return $result['_id'];
341
    }
342
343
    /**
344
     * Execute job queue as endless loop.
345
     *
346
     * @return bool
347
     */
348
    public function startDaemon(): bool
349
    {
350
        $cursor = $this->getCursor();
351
352
        while (true) {
353
            $this->processLocalQueue();
354
355
            if (null === $cursor->current()) {
356
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

356
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
357
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
358
                        'category' => get_class($this),
359
                    ]);
360
361
                    return $this->startDaemon();
362
                }
363
364
                $cursor->next();
365
366
                continue;
367
            }
368
369
            $job = $cursor->current();
370
            $cursor->next();
371
            $this->queueJob($job);
372
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
373
    }
374
375
    /**
376
     * Execute job queue.
377
     *
378
     * @return bool
379
     */
380
    public function startOnce(): bool
381
    {
382
        $cursor = $this->getCursor(false);
383
384
        while (true) {
385
            $this->processLocalQueue();
386
387
            if (null === $cursor->current()) {
388
                if ($cursor->getInnerIterator()->isDead()) {
389
                    $this->logger->debug('all jobs were processed', [
390
                        'category' => get_class($this),
391
                    ]);
392
393
                    return false;
394
                }
395
396
                return true;
397
            }
398
399
            $job = $cursor->current();
400
            $cursor->next();
401
            $this->queueJob($job);
402
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
403
    }
404
405
    /**
406
     * Queue job.
407
     *
408
     * @param array $job
409
     */
410
    protected function queueJob(array $job): bool
411
    {
412
        if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING)) {
413
            $this->processJob($job);
414
        } elseif (self::STATUS_POSTPONED === $job['status']) {
415
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
416
                'category' => get_class($this),
417
            ]);
418
419
            $this->queue[] = $job;
420
        }
421
422
        return true;
423
    }
424
425
    /**
426
     * Get cursor.
427
     *
428
     * @param bool $tailable
429
     *
430
     * @return IteratorIterator
431
     */
432
    protected function getCursor(bool $tailable = true): IteratorIterator
433
    {
434
        $options = [];
435
        if (true === $tailable) {
436
            $options['cursorType'] = Find::TAILABLE;
437
            $options['noCursorTimeout'] = true;
438
        }
439
440
        $cursor = $this->db->{$this->collection_name}->find([
441
            '$or' => [
442
                ['status' => self::STATUS_WAITING],
443
                ['status' => self::STATUS_POSTPONED],
444
            ],
445
        ], $options);
446
447
        $iterator = new IteratorIterator($cursor);
448
        $iterator->rewind();
449
450
        return $iterator;
451
    }
452
453
    /**
454
     * Update job status.
455
     *
456
     * @param ObjectId $id
457
     * @param int      $status
458
     * @param mixed    $from_status
459
     *
460
     * @return bool
461
     */
462
    protected function collectJob(ObjectId $id, int $status, $from_status = self::STATUS_WAITING): bool
463
    {
464
        $result = $this->db->{$this->collection_name}->updateMany([
465
            '_id' => $id,
466
            'status' => $from_status,
467
            '$isolated' => true,
468
        ], [
469
            '$set' => [
470
                'status' => $status,
471
                'timestamp' => new UTCDateTime(),
472
            ],
473
        ]);
474
475
        if (1 === $result->getModifiedCount()) {
476
            $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
477
                'category' => get_class($this),
478
            ]);
479
480
            return true;
481
        }
482
483
        $this->logger->debug('job ['.$id.'] is already collected with status ['.$status.']', [
484
            'category' => get_class($this),
485
        ]);
486
487
        return false;
488
    }
489
490
    /**
491
     * Update job status.
492
     *
493
     * @param ObjectId $id
494
     * @param int      $status
495
     *
496
     * @return bool
497
     */
498
    protected function updateJob(ObjectId $id, int $status): bool
499
    {
500
        $result = $this->db->{$this->collection_name}->updateMany([
501
            '_id' => $id,
502
            '$isolated' => true,
503
        ], [
504
            '$set' => [
505
                'status' => $status,
506
                'timestamp' => new UTCDateTime(),
507
            ],
508
        ]);
509
510
        return $result->isAcknowledged();
511
    }
512
513
    /**
514
     * Check local queue for postponed jobs.
515
     *
516
     * @return bool
517
     */
518
    protected function processLocalQueue(): bool
519
    {
520
        $now = new UTCDateTime();
521
        foreach ($this->queue as $key => $job) {
522
            if ($job['at'] <= $now) {
523
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
524
                    'category' => get_class($this),
525
                ]);
526
527
                unset($this->queue[$key]);
528
                $job['at'] = null;
529
530
                if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING, self::STATUS_POSTPONED)) {
531
                    $this->processJob($job);
532
                }
533
            }
534
        }
535
536
        return true;
537
    }
538
539
    /**
540
     * Process job.
541
     *
542
     * @param array $job
543
     *
544
     * @return bool
545
     */
546
    protected function processJob(array $job): bool
547
    {
548
        if ($job['at'] instanceof UTCDateTime) {
549
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
0 ignored issues
show
Bug introduced by
It seems like $job['_id'] can also be of type MongoDB\BSON\UTCDateTime; however, parameter $id of TaskScheduler\Async::updateJob() does only seem to accept MongoDB\BSON\ObjectId, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

549
            $this->updateJob(/** @scrutinizer ignore-type */ $job['_id'], self::STATUS_POSTPONED);
Loading history...
550
            $this->queue[] = $job;
551
552
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
553
                'category' => get_class($this),
554
            ]);
555
556
            return true;
557
        }
558
559
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
560
            'category' => get_class($this),
561
            'params' => $job['data'],
562
        ]);
563
564
        try {
565
            $this->executeJob($job);
566
        } catch (\Exception $e) {
567
            $this->logger->error('failed execute job ['.$job['_id'].']', [
568
                'category' => get_class($this),
569
                'exception' => $e,
570
            ]);
571
572
            $this->updateJob($job['_id'], self::STATUS_FAILED);
573
574
            if ($job['retry'] > 0) {
575
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
576
                    'category' => get_class($this),
577
                ]);
578
579
                $this->addJob($job['class'], $job['data'], [
580
                    self::OPTION_AT => time() + $job['retry_interval'],
581
                    self::OPTION_INTERVAL => $job['interval'],
582
                    self::OPTION_RETRY => --$job['retry'],
583
                    self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
584
                ]);
585
            }
586
        }
587
588
        if ($job['interval'] >= 0) {
589
            $this->addJob($job['class'], $job['data'], [
590
                self::OPTION_AT => time() + $job['interval'],
591
                self::OPTION_INTERVAL => $job['interval'],
592
                self::OPTION_RETRY => $job['retry'],
593
                self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
594
            ]);
595
        }
596
597
        return true;
598
    }
599
600
    /**
601
     * Execute job.
602
     *
603
     * @param array $job
604
     *
605
     * @return bool
606
     */
607
    protected function executeJob(array $job): bool
608
    {
609
        if (!class_exists($job['class'])) {
610
            throw new Exception('job class does not exists');
611
        }
612
613
        if (null === $this->container) {
614
            $instance = new $job['class']();
615
        } else {
616
            $instance = $this->container->get($job['class']);
617
        }
618
619
        if (!($instance instanceof JobInterface)) {
620
            throw new Exception('job must implement JobInterface');
621
        }
622
623
        $instance->setData($job['data'])
624
            ->start();
625
626
        return $this->updateJob($job['_id'], self::STATUS_DONE);
627
    }
628
}
629