Completed
Push — master ( 469b01...74e00a )
by Raffael
02:23
created

Async   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 480
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 480
rs 8.3396
c 0
b 0
f 0
wmc 44

12 Methods

Rating   Name   Duplication   Size   Complexity  
D setOptions() 0 26 9
B startOnce() 0 22 4
B startDaemon() 0 24 4
A executeJob() 0 20 4
B processJob() 0 54 5
B addJob() 0 37 2
B validateOptions() 0 19 7
A updateJob() 0 13 1
A getCursor() 0 22 2
A __construct() 0 7 1
A processLocalQueue() 0 17 3
A addJobOnce() 0 22 2

How to fix   Complexity   

Complex Class

Complex classes like Async often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Async, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Cursor;
20
use MongoDB\Operation\Find;
21
use Psr\Container\ContainerInterface;
22
use Psr\Log\LoggerInterface;
23
24
class Async
25
{
26
    /**
27
     * Job status.
28
     */
29
    const STATUS_WAITING = 0;
30
    const STATUS_POSTPONED = 1;
31
    const STATUS_PROCESSING = 2;
32
    const STATUS_DONE = 3;
33
    const STATUS_FAILED = 4;
34
35
    /**
36
     * Job options.
37
     */
38
    const OPTION_AT = 'at';
39
    const OPTION_INTERVAL = 'interval';
40
    const OPTION_RETRY = 'retry';
41
    const OPTION_RETRY_INTERVAL = 'retry_interval';
42
43
    /**
44
     * Database.
45
     *
46
     * @var Database
47
     */
48
    protected $db;
49
50
    /**
51
     * LoggerInterface.
52
     *
53
     * @var LoggerInterface
54
     */
55
    protected $logger;
56
57
    /**
58
     * Local queue.
59
     *
60
     * @var array
61
     */
62
    protected $queue = [];
63
64
    /**
65
     * Node name.
66
     *
67
     * @var string
68
     */
69
    protected $node_name;
70
71
    /**
72
     * Collection name.
73
     *
74
     * @var string
75
     */
76
    protected $collection_name = 'queue';
77
78
    /**
79
     * Container.
80
     *
81
     * @var ContainerInterface
82
     */
83
    protected $container;
84
85
    /**
86
     * Default at (Secconds from now).
87
     *
88
     * @var int
89
     */
90
    protected $default_at = 0;
91
92
    /**
93
     * Default interval (secconds).
94
     *
95
     * @var int
96
     */
97
    protected $default_interval = -1;
98
99
    /**
100
     * Default retry.
101
     *
102
     * @var int
103
     */
104
    protected $default_retry = 0;
105
106
    /**
107
     * Default retry interval (secconds).
108
     *
109
     * @var int
110
     */
111
    protected $default_retry_interval = 300;
112
113
    /**
114
     * Init queue.
115
     *
116
     * @param Database           $db
117
     * @param LoggerInterface    $logger
118
     * @param ContainerInterface $container
119
     * @param iterable           $config
120
     */
121
    public function __construct(Database $db, LoggerInterface $logger, ?ContainerInterface $container = null, ?Iterable $config = null)
122
    {
123
        $this->db = $db;
124
        $this->logger = $logger;
125
        $this->container = $container;
126
        $this->node_name = gethostname();
127
        $this->setOptions($config);
128
    }
129
130
    /**
131
     * Set options.
132
     *
133
     * @param iterable $config
134
     *
135
     * @return Async
136
     */
137
    public function setOptions(? Iterable $config = null): self
138
    {
139
        if (null === $config) {
140
            return $this;
141
        }
142
143
        foreach ($config as $option => $value) {
144
            switch ($option) {
145
                case 'node_name':
146
                case 'collection_name':
147
                    $this->{$option} = (string) $value;
148
149
                    // no break
150
                case 'default_retry':
151
                case 'default_at':
152
                case 'default_retry_interval':
153
                case 'default_interval':
154
                    $this->{$option} = (int) $value;
155
156
                break;
157
                default:
158
                    throw new Exception('invalid option '.$option.' given');
159
            }
160
        }
161
162
        return $this;
163
    }
164
165
    /**
166
     * Validate given job options.
167
     *
168
     * @param array $options
169
     *
170
     * @return Async
171
     */
172
    public function validateOptions(array $options): self
173
    {
174
        foreach ($options as $option => $value) {
175
            switch ($option) {
176
                case self::OPTION_AT:
177
                case self::OPTION_RETRY:
178
                case self::OPTION_RETRY_INTERVAL:
179
                case self::OPTION_INTERVAL:
180
                    if (!is_int($value)) {
181
                        throw new Exception('option '.$option.' must be an integer');
182
                    }
183
184
                break;
185
                default:
186
                    throw new Exception('invalid option '.$option.' given');
187
            }
188
        }
189
190
        return $this;
191
    }
192
193
    /**
194
     * Add job to queue.
195
     *
196
     * @param string $class
197
     * @param mixed  $data
198
     * @param array  $options
199
     *
200
     * @return bool
201
     */
202
    public function addJob(string $class, $data, array $options = []): bool
203
    {
204
        $defaults = [
205
            self::OPTION_AT => $this->default_at,
206
            self::OPTION_INTERVAL => $this->default_interval,
207
            self::OPTION_RETRY => $this->default_retry,
208
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
209
        ];
210
211
        $options = array_merge($defaults, $options);
212
        $this->validateOptions($options);
213
214
        if ($options[self::OPTION_AT] > 0) {
215
            $at = new UTCDateTime($options[self::OPTION_AT] * 1000);
216
        } else {
217
            $at = null;
218
        }
219
220
        $result = $this->db->{$this->collection_name}->insertOne([
221
            'class' => $class,
222
            'status' => self::STATUS_WAITING,
223
            'timestamp' => new UTCDateTime(),
0 ignored issues
show
Bug introduced by
The call to MongoDB\BSON\UTCDateTime::__construct() has too few arguments starting with milliseconds. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

223
            'timestamp' => /** @scrutinizer ignore-call */ new UTCDateTime(),

This check compares calls to functions or methods with their respective definitions. If the call has less arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
224
            'at' => $at,
225
            'retry' => $options[self::OPTION_RETRY],
226
            'retry_interval' => $options[self::OPTION_RETRY_INTERVAL],
227
            'interval' => $options[self::OPTION_INTERVAL],
228
            'node' => $this->node_name,
229
            'data' => $data,
230
        ]);
231
232
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
233
            'category' => get_class($this),
234
            'params' => $options,
235
            'data' => $data,
236
        ]);
237
238
        return $result->isAcknowledged();
239
    }
240
241
    /**
242
     * Only add job if not in queue yet.
243
     *
244
     * @param string $class
245
     * @param mixed  $data
246
     * @param array  $options
247
     *
248
     * @return bool
249
     */
250
    public function addJobOnce(string $class, $data, array $options = []): bool
251
    {
252
        $filter = [
253
            'class' => $class,
254
            'data' => $data,
255
            '$or' => [
256
                ['status' => self::STATUS_WAITING],
257
                ['status' => self::STATUS_POSTPONED],
258
            ],
259
        ];
260
261
        $result = $this->db->queue->findOne($filter);
262
263
        if (null === $result) {
264
            return $this->addJob($class, $data, $options);
265
        }
266
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
267
                'category' => get_class($this),
268
                'data' => $data,
269
            ]);
270
271
        return true;
272
    }
273
274
    /**
275
     * Execute job queue as endless loop.
276
     *
277
     * @return bool
278
     */
279
    public function startDaemon(): bool
280
    {
281
        $cursor = $this->getCursor();
282
283
        while (true) {
284
            $this->processLocalQueue();
285
286
            if (null === $cursor->current()) {
287
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Nette\Iterators\CachingIterator or Nette\Iterators\CachingIterator or Nette\Iterators\CachingIterator or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

287
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
288
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
289
                        'category' => get_class($this),
290
                    ]);
291
292
                    return $this->startDaemon();
293
                }
294
295
                $cursor->next();
296
297
                continue;
298
            }
299
300
            $job = $cursor->current();
301
            $cursor->next();
302
            $this->processJob($job);
303
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
304
    }
305
306
    /**
307
     * Execute job queue.
308
     *
309
     * @return bool
310
     */
311
    public function startOnce(): bool
312
    {
313
        $cursor = $this->getCursor(false);
314
315
        while (true) {
316
            $this->processLocalQueue();
317
318
            if (null === $cursor->current()) {
319
                if ($cursor->getInnerIterator()->isDead()) {
320
                    $this->logger->debug('all jobs were processed', [
321
                        'category' => get_class($this),
322
                    ]);
323
324
                    return false;
325
                }
326
327
                return true;
328
            }
329
330
            $job = $cursor->current();
331
            $cursor->next();
332
            $this->processJob($job);
333
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
334
    }
335
336
    /**
337
     * Get cursor.
338
     *
339
     * @param bool $tailable
340
     *
341
     * @return IteratorIterator
342
     */
343
    protected function getCursor(bool $tailable = true): IteratorIterator
344
    {
345
        $options = [];
346
        if (true === $tailable) {
347
            $options['cursorType'] = Find::TAILABLE;
348
            $options['noCursorTimeout'] = true;
349
        }
350
351
        $cursor = $this->db->{$this->collection_name}->find([
352
            '$or' => [
353
                ['status' => self::STATUS_WAITING],
354
                ['status' => self::STATUS_POSTPONED,
355
                 'node' => $this->node_name, ],
356
                ['status' => self::STATUS_POSTPONED,
357
                 'at' => ['$gte' => new UTCDateTime()], ],
0 ignored issues
show
Bug introduced by
The call to MongoDB\BSON\UTCDateTime::__construct() has too few arguments starting with milliseconds. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

357
                 'at' => ['$gte' => /** @scrutinizer ignore-call */ new UTCDateTime()], ],

This check compares calls to functions or methods with their respective definitions. If the call has less arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
358
            ],
359
        ], $options);
360
361
        $iterator = new IteratorIterator($cursor);
362
        $iterator->rewind();
363
364
        return $iterator;
365
    }
366
367
    /**
368
     * Update job status.
369
     *
370
     * @param ObjectId $id
371
     * @param int      $status
372
     *
373
     * @return bool
374
     */
375
    protected function updateJob(ObjectId $id, int $status): bool
376
    {
377
        $result = $this->db->{$this->collection_name}->updateMany(['_id' => $id, '$isolated' => true], ['$set' => [
378
            'status' => $status,
379
            'node' => $this->node_name,
380
            'timestamp' => new UTCDateTime(),
0 ignored issues
show
Bug introduced by
The call to MongoDB\BSON\UTCDateTime::__construct() has too few arguments starting with milliseconds. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

380
            'timestamp' => /** @scrutinizer ignore-call */ new UTCDateTime(),

This check compares calls to functions or methods with their respective definitions. If the call has less arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
381
        ]]);
382
383
        $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
384
            'category' => get_class($this),
385
        ]);
386
387
        return $result->isAcknowledged();
388
    }
389
390
    /**
391
     * Check local queue for postponed jobs.
392
     *
393
     * @return bool
394
     */
395
    protected function processLocalQueue()
396
    {
397
        $now = new UTCDateTime();
0 ignored issues
show
Bug introduced by
The call to MongoDB\BSON\UTCDateTime::__construct() has too few arguments starting with milliseconds. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

397
        $now = /** @scrutinizer ignore-call */ new UTCDateTime();

This check compares calls to functions or methods with their respective definitions. If the call has less arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
398
        foreach ($this->queue as $key => $job) {
399
            if ($job['at'] <= $now) {
400
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
401
                    'category' => get_class($this),
402
                ]);
403
404
                unset($this->queue[$key]);
405
                $job['at'] = null;
406
407
                $this->processJob($job);
408
            }
409
        }
410
411
        return true;
412
    }
413
414
    /**
415
     * Process job.
416
     *
417
     * @param array $job
418
     *
419
     * @return bool
420
     */
421
    protected function processJob(array $job): bool
422
    {
423
        if ($job['at'] instanceof UTCDateTime) {
424
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
0 ignored issues
show
Bug introduced by
It seems like $job['_id'] can also be of type MongoDB\BSON\UTCDateTime; however, parameter $id of TaskScheduler\Async::updateJob() does only seem to accept MongoDB\BSON\ObjectId, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

424
            $this->updateJob(/** @scrutinizer ignore-type */ $job['_id'], self::STATUS_POSTPONED);
Loading history...
425
            $this->queue[] = $job;
426
427
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at'].']', [
428
                'category' => get_class($this),
429
            ]);
430
431
            return true;
432
        }
433
434
        $this->updateJob($job['_id'], self::STATUS_PROCESSING);
435
436
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
437
            'category' => get_class($this),
438
            'params' => $job['data'],
439
        ]);
440
441
        try {
442
            $this->executeJob($job);
443
        } catch (\Exception $e) {
444
            $this->logger->error('failed execute job ['.$job['_id'].']', [
445
                'category' => get_class($this),
446
                'exception' => $e,
447
            ]);
448
449
            $this->updateJob($job['_id'], self::STATUS_FAILED);
450
451
            if ($job['retry'] > 0) {
452
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
453
                    'category' => get_class($this),
454
                ]);
455
456
                $this->addJob($job['class'], $job['data'], [
457
                    self::OPTION_AT => time() + $job['retry_interval'],
458
                    self::OPTION_INTERVAL => $job['interval'],
459
                    self::OPTION_RETRY => --$job['retry'],
460
                    self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
461
                ]);
462
            }
463
        }
464
465
        if ($job['interval'] >= 0) {
466
            $this->addJob($job['class'], $job['data'], [
467
                self::OPTION_AT => time() + $job['interval'],
468
                self::OPTION_INTERVAL => $job['interval'],
469
                self::OPTION_RETRY => $job['retry'],
470
                self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
471
            ]);
472
        }
473
474
        return true;
475
    }
476
477
    /**
478
     * Execute job.
479
     *
480
     * @param array $job
481
     *
482
     * @return bool
483
     */
484
    protected function executeJob(array $job): bool
485
    {
486
        if (!class_exists($job['class'])) {
487
            throw new Exception('job class does not exists');
488
        }
489
490
        if (null === $this->container) {
491
            $instance = new $job['class']();
492
        } else {
493
            $instance = $this->container->getNew($job['class']);
0 ignored issues
show
Bug introduced by
The method getNew() does not exist on Psr\Container\ContainerInterface. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

493
            /** @scrutinizer ignore-call */ 
494
            $instance = $this->container->getNew($job['class']);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
494
        }
495
496
        if (!($instance instanceof JobInterface)) {
497
            throw new Exception('job must implement JobInterface');
498
        }
499
500
        $instance->setData($job['data'])
501
            ->start();
502
503
        return $this->updateJob($job['_id'], self::STATUS_DONE);
504
    }
505
}
506