Completed
Branch dev (ed868c)
by Raffael
03:59
created

Scheduler   C

Complexity

Total Complexity 53

Size/Duplication

Total Lines 572
Duplicated Lines 0 %

Test Coverage

Coverage 89.05%

Importance

Changes 7
Bugs 3 Features 0
Metric Value
wmc 53
eloc 241
dl 0
loc 572
ccs 179
cts 201
cp 0.8905
rs 6.96
c 7
b 3
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A getJobQueue() 0 3 1
A getJob() 0 13 2
B setOptions() 0 25 11
A addJob() 0 24 1
A __construct() 0 7 1
A prepareInsert() 0 35 2
B addJobOnce() 0 59 6
A getEventQueueSize() 0 3 1
A getEventQueue() 0 3 1
A updateJob() 0 12 1
A cancelJob() 0 15 2
A updateJobProgress() 0 15 3
A getJobs() 0 16 3
B listen() 0 39 6
B waitFor() 0 59 10
A flush() 0 6 1
A getJobQueueSize() 0 3 1

How to fix   Complexity   

Complex Class

Complex classes like Scheduler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Scheduler, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use Closure;
16
use Generator;
17
use MongoDB\BSON\ObjectId;
18
use MongoDB\BSON\UTCDateTime;
19
use MongoDB\Database;
20
use MongoDB\UpdateResult;
21
use Psr\Log\LoggerInterface;
22
use TaskScheduler\Exception\InvalidArgumentException;
23
use TaskScheduler\Exception\LogicException;
24
use TaskScheduler\Exception\JobNotFoundException;
25
use League\Event\Emitter;
26
27
class Scheduler
28
{
29
    use EventsTrait;
30
31
    /**
32
     * Job options.
33
     */
34
    public const OPTION_AT = 'at';
35
    public const OPTION_INTERVAL = 'interval';
36
    public const OPTION_RETRY = 'retry';
37
    public const OPTION_RETRY_INTERVAL = 'retry_interval';
38
    public const OPTION_FORCE_SPAWN = 'force_spawn';
39
    public const OPTION_TIMEOUT = 'timeout';
40
    public const OPTION_ID = 'id';
41
    public const OPTION_IGNORE_DATA = 'ignore_data';
42
43
    /**
44
     * Operation options:
45
     */
46
    public const OPTION_THROW_EXCEPTION = 1;
47
48
    /**
49
     * Default job options.
50
     */
51
    public const OPTION_DEFAULT_AT = 'default_at';
52
    public const OPTION_DEFAULT_INTERVAL = 'default_interval';
53
    public const OPTION_DEFAULT_RETRY = 'default_retry';
54
    public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval';
55
    public const OPTION_DEFAULT_TIMEOUT = 'default_timeout';
56
57
    /**
58
     * Queue options.
59
     */
60
    public const OPTION_JOB_QUEUE = 'job_queue';
61
    public const OPTION_JOB_QUEUE_SIZE = 'job_queue_size';
62
    public const OPTION_EVENT_QUEUE = 'event_queue';
63
    public const OPTION_EVENT_QUEUE_SIZE = 'event_queue_size';
64
65
    /**
66
     * MongoDB type map.
67
     */
68
    public const TYPE_MAP = [
69
        'document' => 'array',
70
        'root' => 'array',
71
        'array' => 'array',
72
    ];
73
74
    /**
75
     * Valid events
76
     */
77
    public const VALID_EVENTS = [
78
        'taskscheduler.onStart',
79
        'taskscheduler.onDone',
80
        'taskscheduler.onPostponed',
81
        'taskscheduler.onFailed',
82
        'taskscheduler.onTimeout',
83
        'taskscheduler.onCancel',
84
        'taskscheduler.*',
85
    ];
86
87
    /**
88
     * Database.
89
     *
90
     * @var Database
91
     */
92
    protected $db;
93
94
    /**
95
     * LoggerInterface.
96
     *
97
     * @var LoggerInterface
98
     */
99
    protected $logger;
100
101
    /**
102
     * Job Collection name.
103
     *
104
     * @var string
105
     */
106
    protected $job_queue = 'taskscheduler.jobs';
107
108
    /**
109
     * Event Collection name.
110
     *
111
     * @var string
112
     */
113
    protected $event_queue = 'taskscheduler.events';
114
115
    /**
116
     * Unix time.
117
     *
118
     * @var int
119
     */
120
    protected $default_at = 0;
121
122
    /**
123
     * Default interval (secconds).
124
     *
125
     * @var int
126
     */
127
    protected $default_interval = 0;
128
129
    /**
130
     * Default retry.
131
     *
132
     * @var int
133
     */
134
    protected $default_retry = 0;
135
136
    /**
137
     * Default retry interval (secconds).
138
     *
139
     * @var int
140
     */
141
    protected $default_retry_interval = 300;
142
143
    /**
144
     * Default timeout.
145
     *
146
     * @var int
147
     */
148
    protected $default_timeout = 0;
149
150
    /**
151
     * Job Queue size.
152
     *
153
     * @var int
154
     */
155
    protected $job_queue_size = 1000000;
156
157
    /**
158
     * Event Queue size.
159
     *
160
     * @var int
161
     */
162
    protected $event_queue_size = 5000000;
163
164
    /**
165
     * Events queue.
166
     *
167
     * @var MessageQueue
168
     */
169
    protected $events;
170
171
    /**
172
     * Init queue.
173
     */
174 61
    public function __construct(Database $db, LoggerInterface $logger, array $config = [], ?Emitter $emitter=null)
175
    {
176 61
        $this->db = $db;
177 61
        $this->logger = $logger;
178 61
        $this->setOptions($config);
179 61
        $this->events = new MessageQueue($db, $this->getEventQueue(), $this->getEventQueueSize(), $logger);
180 61
        $this->emitter = $emitter ?? new Emitter();
181 61
    }
182
183
    /**
184
     * Set options.
185
     */
186 61
    public function setOptions(array $config = []): self
187
    {
188 61
        foreach ($config as $option => $value) {
189
            switch ($option) {
190 4
                case self::OPTION_JOB_QUEUE:
191 4
                case self::OPTION_EVENT_QUEUE:
192 2
                    $this->{$option} = (string) $value;
193
194 2
                break;
195 4
                case self::OPTION_DEFAULT_AT:
196 4
                case self::OPTION_DEFAULT_RETRY_INTERVAL:
197 4
                case self::OPTION_DEFAULT_INTERVAL:
198 4
                case self::OPTION_DEFAULT_RETRY:
199 3
                case self::OPTION_DEFAULT_TIMEOUT:
200 3
                case self::OPTION_JOB_QUEUE_SIZE:
201 3
                case self::OPTION_EVENT_QUEUE_SIZE:
202 3
                    $this->{$option} = (int) $value;
203
204 3
                break;
205
                default:
206 4
                    throw new InvalidArgumentException('invalid option '.$option.' given');
207
            }
208
        }
209
210 61
        return $this;
211
    }
212
213
    /**
214
     * Get job Queue size.
215
     */
216 31
    public function getJobQueueSize(): int
217
    {
218 31
        return $this->job_queue_size;
219
    }
220
221
    /**
222
     * Get event Queue size.
223
     */
224 61
    public function getEventQueueSize(): int
225
    {
226 61
        return $this->event_queue_size;
227
    }
228
229
    /**
230
     * Get job collection name.
231
     */
232 31
    public function getJobQueue(): string
233
    {
234 31
        return $this->job_queue;
235
    }
236
237
    /**
238
     * Get event collection name.
239
     */
240 61
    public function getEventQueue(): string
241
    {
242 61
        return $this->event_queue;
243
    }
244
245
    /**
246
     * Get job by Id.
247
     */
248 28
    public function getJob(ObjectId $id): Process
249
    {
250 28
        $result = $this->db->{$this->job_queue}->findOne([
251 28
            '_id' => $id,
252
        ], [
253 28
            'typeMap' => self::TYPE_MAP,
254
        ]);
255
256 28
        if (null === $result) {
257 2
            throw new JobNotFoundException('job '.$id.' was not found');
258
        }
259
260 26
        return new Process($result, $this);
261
    }
262
263
    /**
264
     * Cancel job.
265
     */
266 5
    public function cancelJob(ObjectId $id): bool
267
    {
268 5
        $result = $this->updateJob($id, JobInterface::STATUS_CANCELED);
269
270 5
        if (1 !== $result->getMatchedCount()) {
271 1
            throw new JobNotFoundException('job '.$id.' was not found');
272
        }
273
274 4
        $this->db->{$this->event_queue}->insertOne([
275 4
            'job' => $id,
276
            'status' => JobInterface::STATUS_CANCELED,
277 4
            'timestamp' => new UTCDateTime(),
278
        ]);
279
280 4
        return true;
281
    }
282
283
    /**
284
     * Flush.
285
     */
286 1
    public function flush(): Scheduler
287
    {
288 1
        $this->db->{$this->job_queue}->drop();
289 1
        $this->db->{$this->event_queue}->drop();
290
291 1
        return $this;
292
    }
293
294
    /**
295
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
296
     */
297 13
    public function getJobs(array $query = []): Generator
298
    {
299 13
        if (0 === count($query)) {
300 12
            $query = ['status' => ['$in' => [
301
                JobInterface::STATUS_WAITING,
302
                JobInterface::STATUS_PROCESSING,
303
                JobInterface::STATUS_POSTPONED,
304
            ]]];
305
        }
306
307 13
        $result = $this->db->{$this->job_queue}->find($query, [
308 13
            'typeMap' => self::TYPE_MAP,
309
        ]);
310
311 13
        foreach ($result as $job) {
312 12
            yield new Process($job, $this);
313
        }
314 13
    }
315
316
    /**
317
     * Add job to queue.
318
     */
319 46
    public function addJob(string $class, $data, array $options = []): Process
320
    {
321 46
        $document = $this->prepareInsert($class, $data, $options);
322
323 42
        $result = $this->db->{$this->job_queue}->insertOne($document);
324 42
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
325 42
            'category' => get_class($this),
326 42
            'params' => $options,
327 42
            'data' => $data,
328
        ]);
329
330 42
        $this->db->{$this->event_queue}->insertOne([
331 42
            'job' => $result->getInsertedId(),
332
            'status' => JobInterface::STATUS_WAITING,
333 42
            'timestamp' => new UTCDateTime(),
334
        ]);
335
336 42
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getInsertedId()], [
337 42
            'typeMap' => self::TYPE_MAP,
338
        ]);
339
340 42
        $process = new Process($document, $this);
341
342 42
        return $process;
343
    }
344
345
    /**
346
     * Only add job if not in queue yet.
347
     */
348 5
    public function addJobOnce(string $class, $data, array $options = []): Process
349
    {
350
        $filter = [
351 5
            'class' => $class,
352
            '$or' => [
353
                ['status' => JobInterface::STATUS_WAITING],
354
                ['status' => JobInterface::STATUS_POSTPONED],
355
                ['status' => JobInterface::STATUS_PROCESSING],
356
            ],
357
        ];
358
359 5
        $requested = $options;
360 5
        $document = $this->prepareInsert($class, $data, $options);
361
362 5
        if (true !== $options[self::OPTION_IGNORE_DATA]) {
363 4
            $filter = ['data' => $data] + $filter;
364
        }
365
366 5
        $result = $this->db->{$this->job_queue}->updateOne($filter, ['$setOnInsert' => $document], [
367 5
            'upsert' => true,
368
            '$isolated' => true,
369
        ]);
370
371 5
        if ($result->getMatchedCount() > 0) {
372 4
            $document = $this->db->{$this->job_queue}->findOne($filter, [
373 4
                'typeMap' => self::TYPE_MAP,
374
            ]);
375
376 4
            if (array_intersect_key($document['options'], $requested) !== $requested || ($data !== $document['data'] && true === $options[self::OPTION_IGNORE_DATA])) {
377 2
                $this->logger->debug('job ['.$document['_id'].'] options/data changed, reschedule new job', [
378 2
                    'category' => get_class($this),
379 2
                    'data' => $data,
380
                ]);
381
382 2
                $this->cancelJob($document['_id']);
383
384 2
                return $this->addJobOnce($class, $data, $options);
385
            }
386
387 2
            return new Process($document, $this);
388
        }
389
390 5
        $this->logger->debug('queue job ['.$result->getUpsertedId().'] added to ['.$class.']', [
391 5
            'category' => get_class($this),
392 5
            'params' => $options,
393 5
            'data' => $data,
394
        ]);
395
396 5
        $this->db->{$this->event_queue}->insertOne([
397 5
            'job' => $result->getUpsertedId(),
398
            'status' => JobInterface::STATUS_WAITING,
399 5
            'timestamp' => new UTCDateTime(),
400
        ]);
401
402 5
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getUpsertedId()], [
403 5
            'typeMap' => self::TYPE_MAP,
404
        ]);
405
406 5
        return new Process($document, $this);
407
    }
408
409
410
    /**
411
     * Wait for job beeing executed.
412
     *
413
     * @param Process[] $stack
414
     */
415 2
    public function waitFor(array $stack, int $options=0): Scheduler
416
    {
417 2
        $orig = [];
418 2
        $jobs = array_map(function($job) use(&$orig) {
419 2
            if(!($job instanceof Process)) {
420
                throw new InvalidArgumentException('waitFor() requires a stack of Process[]');
421
            }
422
423 2
            $orig[(string)$job->getId()] = $job;
424 2
            return $job->getId();
425 2
        }, $stack);
426
427 2
        $cursor = $this->events->getCursor([
428 2
            'job' => ['$in' => $jobs],
429
        ]);
430
431 2
        $start = time();
0 ignored issues
show
Unused Code introduced by
The assignment to $start is dead and can be removed.
Loading history...
432 2
        $expected = count($stack);
433 2
        $done = 0;
434
435 2
        while (true) {
436 2
            if (null === $cursor->current()) {
437
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

437
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
438
                    $this->events->create();
439
440
                    return $this->waitFor($stack, $options);
441
                }
442
443
                $this->events->next($cursor, function () use($stack, $options) {
444
                    $this->waitFor($stack, $options);
445
                });
446
447
                continue;
448
            }
449
450 2
            $event = $cursor->current();
451 2
            $this->events->next($cursor, function () use($stack, $options) {
452
                $this->waitFor($stack, $options);
453 2
            });
454
455 2
            $process = $orig[(string)$event['job']];
456 2
            $data = $process->toArray();
457 2
            $data['status'] = $event['status'];
458 2
            $process->replace(new Process($data, $this));
459 2
            $this->emit($process);
460
461 2
            if($event['status'] < JobInterface::STATUS_DONE) {
462 2
                continue;
463 2
            } elseif (JobInterface::STATUS_FAILED === $event['status'] && isset($event['exception']) && $options & self::OPTION_THROW_EXCEPTION) {
464 1
                throw new $event['exception']['class'](
465 1
                    $event['exception']['message'],
466 1
                    $event['exception']['code']
467
                );
468
            }
469
470 1
            $done++;
471
472 1
            if($done >= $expected) {
473 1
                return $this;
474
            }
475
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
476
    }
477
478
    /**
479
     * Listen for events.
480
     */
481 1
    public function listen(Closure $callback, array $query = []): self
482
    {
483 1
        if (0 === count($query)) {
484
            $query = [
485
                'timestamp' => ['$gte' => new UTCDateTime()],
486
            ];
487
        }
488
489 1
        $cursor = $this->events->getCursor($query);
490
491 1
        while (true) {
492 1
            if (null === $cursor->current()) {
493
                if ($cursor->getInnerIterator()->isDead()) {
494
                    $this->logger->error('events queue cursor is dead, is it a capped collection?', [
495
                        'category' => get_class($this),
496
                    ]);
497
498
                    $this->events->create();
499
500
                    return $this->listen($callback, $query);
501
                }
502
503
                $this->events->next($cursor, function () use ($callback, $query) {
504
                    return $this->listen($callback, $query);
505
                });
506
507
                continue;
508
            }
509
510 1
            $result = $cursor->current();
511 1
            $this->events->next($cursor, function () use ($callback, $query) {
512
                $this->listen($callback, $query);
513 1
            });
514
515 1
            $process = new Process($result, $this);
516 1
            $this->emit($process);
517
518 1
            if (true === $callback($process)) {
519 1
                return $this;
520
            }
521
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
522
    }
523
524
    /**
525
     * Prepare insert.
526
     */
527 51
    protected function prepareInsert(string $class, $data, array &$options = []): array
528
    {
529
        $defaults = [
530 51
            self::OPTION_AT => $this->default_at,
531 51
            self::OPTION_INTERVAL => $this->default_interval,
532 51
            self::OPTION_RETRY => $this->default_retry,
533 51
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
534 51
            self::OPTION_FORCE_SPAWN => false,
535 51
            self::OPTION_TIMEOUT => $this->default_timeout,
536 51
            self::OPTION_IGNORE_DATA => false,
537
        ];
538
539 51
        $options = array_merge($defaults, $options);
540 51
        $options = SchedulerValidator::validateOptions($options);
541
542
        $document = [
543 47
            'class' => $class,
544
            'status' => JobInterface::STATUS_WAITING,
545 47
            'created' => new UTCDateTime(),
546 47
            'started' => new UTCDateTime(),
547 47
            'ended' => new UTCDateTime(),
548 47
            'worker' => new ObjectId(),
549 47
            'progress' => 0.0,
550 47
            'data' => $data,
551
        ];
552
553 47
        if (isset($options[self::OPTION_ID])) {
554 2
            $id = $options[self::OPTION_ID];
555 2
            unset($options[self::OPTION_ID]);
556 2
            $document['_id'] = $id;
557
        }
558
559 47
        $document['options'] = $options;
560
561 47
        return $document;
562
    }
563
564
    /**
565
     * Update job status.
566
     */
567 5
    protected function updateJob(ObjectId $id, int $status): UpdateResult
568
    {
569 5
        $result = $this->db->{$this->job_queue}->updateMany([
570 5
            '_id' => $id,
571
            '$isolated' => true,
572
        ], [
573
            '$set' => [
574 5
                'status' => $status,
575
            ],
576
        ]);
577
578 5
        return $result;
579
    }
580
581
    /**
582
     * Update job progress.
583
     */
584 3
    public function updateJobProgress(JobInterface $job, float $progress): self
585
    {
586 3
        if($progress < 0 || $progress > 100) {
587 2
            throw new LogicException('progress may only be between 0 to 100');
588
        }
589
590 1
        $result = $this->db->{$this->job_queue}->updateOne([
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
591 1
            '_id' => $job->getId(),
592
        ], [
593
            '$set' => [
594 1
                'progress' => $progress,
595
            ],
596
        ]);
597
598 1
        return $this;
599
    }
600
}
601