Passed
Branch dev (266621)
by Raffael
05:09
created

Scheduler   A

Complexity

Total Complexity 40

Size/Duplication

Total Lines 458
Duplicated Lines 0 %

Test Coverage

Coverage 92%

Importance

Changes 0
Metric Value
wmc 40
eloc 184
dl 0
loc 458
ccs 138
cts 150
cp 0.92
rs 9.2
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
A getJobQueue() 0 3 1
A getJob() 0 13 2
B setOptions() 0 25 11
A __construct() 0 6 1
A getEventQueueSize() 0 3 1
A getEventQueue() 0 3 1
A cancelJob() 0 15 2
A getJobQueueSize() 0 3 1
A addJob() 0 24 1
A prepareInsert() 0 34 2
B addJobOnce() 0 59 6
A updateJob() 0 12 1
A getJobs() 0 16 3
B listen() 0 37 6
A flush() 0 5 1

How to fix   Complexity   

Complex Class

Complex classes like Scheduler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Scheduler, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use Closure;
16
use Generator;
17
use MongoDB\BSON\ObjectId;
18
use MongoDB\BSON\UTCDateTime;
19
use MongoDB\Database;
20
use MongoDB\UpdateResult;
21
use Psr\Log\LoggerInterface;
22
use TaskScheduler\Exception\InvalidArgumentException;
23
use TaskScheduler\Exception\JobNotFoundException;
24
25
class Scheduler
26
{
27
    /**
28
     * Job options.
29
     */
30
    public const OPTION_AT = 'at';
31
    public const OPTION_INTERVAL = 'interval';
32
    public const OPTION_RETRY = 'retry';
33
    public const OPTION_RETRY_INTERVAL = 'retry_interval';
34
    public const OPTION_FORCE_SPAWN = 'force_spawn';
35
    public const OPTION_TIMEOUT = 'timeout';
36
    public const OPTION_ID = 'id';
37
    public const OPTION_IGNORE_DATA = 'ignore_data';
38
39
    /**
40
     * Default job options.
41
     */
42
    public const OPTION_DEFAULT_AT = 'default_at';
43
    public const OPTION_DEFAULT_INTERVAL = 'default_interval';
44
    public const OPTION_DEFAULT_RETRY = 'default_retry';
45
    public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval';
46
    public const OPTION_DEFAULT_TIMEOUT = 'default_timeout';
47
48
    /**
49
     * Queue options.
50
     */
51
    public const OPTION_JOB_QUEUE = 'job_queue';
52
    public const OPTION_JOB_QUEUE_SIZE = 'job_queue_size';
53
    public const OPTION_EVENT_QUEUE = 'event_queue';
54
    public const OPTION_EVENT_QUEUE_SIZE = 'event_queue_size';
55
56
    /**
57
     * MongoDB type map.
58
     */
59
    public const TYPE_MAP = [
60
        'document' => 'array',
61
        'root' => 'array',
62
        'array' => 'array',
63
    ];
64
65
    /**
66
     * Database.
67
     *
68
     * @var Database
69
     */
70
    protected $db;
71
72
    /**
73
     * LoggerInterface.
74
     *
75
     * @var LoggerInterface
76
     */
77
    protected $logger;
78
79
    /**
80
     * Job Collection name.
81
     *
82
     * @var string
83
     */
84
    protected $job_queue = 'taskscheduler.jobs';
85
86
    /**
87
     * Event Collection name.
88
     *
89
     * @var string
90
     */
91
    protected $event_queue = 'taskscheduler.events';
92
93
    /**
94
     * Unix time.
95
     *
96
     * @var int
97
     */
98
    protected $default_at = 0;
99
100
    /**
101
     * Default interval (secconds).
102
     *
103
     * @var int
104
     */
105
    protected $default_interval = 0;
106
107
    /**
108
     * Default retry.
109
     *
110
     * @var int
111
     */
112
    protected $default_retry = 0;
113
114
    /**
115
     * Default retry interval (secconds).
116
     *
117
     * @var int
118
     */
119
    protected $default_retry_interval = 300;
120
121
    /**
122
     * Default timeout.
123
     *
124
     * @var int
125
     */
126
    protected $default_timeout = 0;
127
128
    /**
129
     * Job Queue size.
130
     *
131
     * @var int
132
     */
133
    protected $job_queue_size = 1000000;
134
135
    /**
136
     * Event Queue size.
137
     *
138
     * @var int
139
     */
140
    protected $event_queue_size = 5000000;
141
142
    /**
143
     * Events queue.
144
     *
145
     * @var MessageQueue
146
     */
147
    protected $events;
148
149
    /**
150
     * Init queue.
151
     */
152 58
    public function __construct(Database $db, LoggerInterface $logger, array $config = [])
153
    {
154 58
        $this->db = $db;
155 58
        $this->logger = $logger;
156 58
        $this->setOptions($config);
157 58
        $this->events = new MessageQueue($db, $this->getEventQueue(), $this->getEventQueueSize(), $logger);
158 58
    }
159
160
    /**
161
     * Set options.
162
     */
163 58
    public function setOptions(array $config = []): self
164
    {
165 58
        foreach ($config as $option => $value) {
166
            switch ($option) {
167 4
                case self::OPTION_JOB_QUEUE:
168 4
                case self::OPTION_EVENT_QUEUE:
169 2
                    $this->{$option} = (string) $value;
170
171 2
                break;
172 4
                case self::OPTION_DEFAULT_AT:
173 4
                case self::OPTION_DEFAULT_RETRY_INTERVAL:
174 4
                case self::OPTION_DEFAULT_INTERVAL:
175 4
                case self::OPTION_DEFAULT_RETRY:
176 3
                case self::OPTION_DEFAULT_TIMEOUT:
177 3
                case self::OPTION_JOB_QUEUE_SIZE:
178 3
                case self::OPTION_EVENT_QUEUE_SIZE:
179 3
                    $this->{$option} = (int) $value;
180
181 3
                break;
182
                default:
183 4
                    throw new InvalidArgumentException('invalid option '.$option.' given');
184
            }
185
        }
186
187 58
        return $this;
188
    }
189
190
    /**
191
     * Get job Queue size.
192
     */
193 31
    public function getJobQueueSize(): int
194
    {
195 31
        return $this->job_queue_size;
196
    }
197
198
    /**
199
     * Get event Queue size.
200
     */
201 58
    public function getEventQueueSize(): int
202
    {
203 58
        return $this->event_queue_size;
204
    }
205
206
    /**
207
     * Get job collection name.
208
     */
209 31
    public function getJobQueue(): string
210
    {
211 31
        return $this->job_queue;
212
    }
213
214
    /**
215
     * Get event collection name.
216
     */
217 58
    public function getEventQueue(): string
218
    {
219 58
        return $this->event_queue;
220
    }
221
222
    /**
223
     * Get job by Id.
224
     */
225 27
    public function getJob(ObjectId $id): Process
226
    {
227 27
        $result = $this->db->{$this->job_queue}->findOne([
228 27
            '_id' => $id,
229
        ], [
230 27
            'typeMap' => self::TYPE_MAP,
231
        ]);
232
233 27
        if (null === $result) {
234 2
            throw new JobNotFoundException('job '.$id.' was not found');
235
        }
236
237 25
        return new Process($result, $this, $this->events);
238
    }
239
240
    /**
241
     * Cancel job.
242
     */
243 5
    public function cancelJob(ObjectId $id): bool
244
    {
245 5
        $result = $this->updateJob($id, JobInterface::STATUS_CANCELED);
246
247 5
        if (1 !== $result->getMatchedCount()) {
248 1
            throw new JobNotFoundException('job '.$id.' was not found');
249
        }
250
251 4
        $this->db->{$this->event_queue}->insertOne([
252 4
            'job' => $id,
253
            'status' => JobInterface::STATUS_CANCELED,
254 4
            'timestamp' => new UTCDateTime(),
255
        ]);
256
257 4
        return true;
258
    }
259
260
    /**
261
     * Flush
262
     */
263 1
    public function flush(): Scheduler
264
    {
265 1
        $this->db->{$this->job_queue}->drop();
266 1
        $this->db->{$this->event_queue}->drop();
267 1
        return $this;
268
    }
269
270
    /**
271
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
272
     */
273 13
    public function getJobs(array $query = []): Generator
274
    {
275 13
        if (0 === count($query)) {
276 12
            $query = ['status' => ['$in' => [
277
                JobInterface::STATUS_WAITING,
278
                JobInterface::STATUS_PROCESSING,
279
                JobInterface::STATUS_POSTPONED,
280
            ]]];
281
        }
282
283 13
        $result = $this->db->{$this->job_queue}->find($query, [
284 13
            'typeMap' => self::TYPE_MAP,
285
        ]);
286
287 13
        foreach ($result as $job) {
288 12
            yield new Process($job, $this, $this->events);
289
        }
290 13
    }
291
292
    /**
293
     * Add job to queue.
294
     */
295 45
    public function addJob(string $class, $data, array $options = []): Process
296
    {
297 45
        $document = $this->prepareInsert($class, $data, $options);
298
299 41
        $result = $this->db->{$this->job_queue}->insertOne($document);
300 41
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
301 41
            'category' => get_class($this),
302 41
            'params' => $options,
303 41
            'data' => $data,
304
        ]);
305
306 41
        $this->db->{$this->event_queue}->insertOne([
307 41
            'job' => $result->getInsertedId(),
308
            'status' => JobInterface::STATUS_WAITING,
309 41
            'timestamp' => new UTCDateTime(),
310
        ]);
311
312 41
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getInsertedId()], [
313 41
            'typeMap' => self::TYPE_MAP,
314
        ]);
315
316 41
        $process = new Process($document, $this, $this->events);
317
318 41
        return $process;
319
    }
320
321
    /**
322
     * Only add job if not in queue yet.
323
     */
324 5
    public function addJobOnce(string $class, $data, array $options = []): Process
325
    {
326
        $filter = [
327 5
            'class' => $class,
328
            '$or' => [
329
                ['status' => JobInterface::STATUS_WAITING],
330
                ['status' => JobInterface::STATUS_POSTPONED],
331
                ['status' => JobInterface::STATUS_PROCESSING],
332
            ],
333
        ];
334
335 5
        $requested = $options;
336 5
        $document = $this->prepareInsert($class, $data, $options);
337
338 5
        if (true !== $options[self::OPTION_IGNORE_DATA]) {
339 4
            $filter = ['data' => $data] + $filter;
340
        }
341
342 5
        $result = $this->db->{$this->job_queue}->updateOne($filter, ['$setOnInsert' => $document], [
343 5
            'upsert' => true,
344
            '$isolated' => true,
345
        ]);
346
347 5
        if ($result->getMatchedCount() > 0) {
348 4
            $document = $this->db->{$this->job_queue}->findOne($filter, [
349 4
                'typeMap' => self::TYPE_MAP,
350
            ]);
351
352 4
            if (array_intersect_key($document['options'], $requested) !== $requested || ($data !== $document['data'] && true === $options[self::OPTION_IGNORE_DATA])) {
353 2
                $this->logger->debug('job ['.$document['_id'].'] options/data changed, reschedule new job', [
354 2
                    'category' => get_class($this),
355 2
                    'data' => $data,
356
                ]);
357
358 2
                $this->cancelJob($document['_id']);
359
360 2
                return $this->addJobOnce($class, $data, $options);
361
            }
362
363 2
            return new Process($document, $this, $this->events);
364
        }
365
366 5
        $this->logger->debug('queue job ['.$result->getUpsertedId().'] added to ['.$class.']', [
367 5
            'category' => get_class($this),
368 5
            'params' => $options,
369 5
            'data' => $data,
370
        ]);
371
372 5
        $this->db->{$this->event_queue}->insertOne([
373 5
            'job' => $result->getUpsertedId(),
374
            'status' => JobInterface::STATUS_WAITING,
375 5
            'timestamp' => new UTCDateTime(),
376
        ]);
377
378 5
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getUpsertedId()], [
379 5
            'typeMap' => self::TYPE_MAP,
380
        ]);
381
382 5
        return new Process($document, $this, $this->events);
383
    }
384
385
    /**
386
     * Listen for events.
387
     */
388 1
    public function listen(Closure $callback, array $query = []): self
389
    {
390 1
        if (0 === count($query)) {
391
            $query = [
392
                'timestamp' => ['$gte' => new UTCDateTime()],
393
            ];
394
        }
395
396 1
        $cursor = $this->events->getCursor($query);
397
398 1
        while (true) {
399 1
            if (null === $cursor->current()) {
400
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

400
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
401
                    $this->logger->error('events queue cursor is dead, is it a capped collection?', [
402
                        'category' => get_class($this),
403
                    ]);
404
405
                    $this->events->create();
406
407
                    return $this->listen($callback, $query);
408
                }
409
410
                $this->events->next($cursor, function () use ($callback, $query) {
411
                    return $this->listen($callback, $query);
412
                });
413
414
                continue;
415
            }
416
417 1
            $result = $cursor->current();
418 1
            $this->events->next($cursor, function () use ($callback, $query) {
419
                $this->listen($callback, $query);
420 1
            });
421
422 1
            $process = new Process($result, $this, $this->events);
423 1
            if (true === $callback($process)) {
424 1
                return $this;
425
            }
426
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
427
    }
428
429
    /**
430
     * Prepare insert.
431
     */
432 50
    protected function prepareInsert(string $class, $data, array &$options = []): array
433
    {
434
        $defaults = [
435 50
            self::OPTION_AT => $this->default_at,
436 50
            self::OPTION_INTERVAL => $this->default_interval,
437 50
            self::OPTION_RETRY => $this->default_retry,
438 50
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
439 50
            self::OPTION_FORCE_SPAWN => false,
440 50
            self::OPTION_TIMEOUT => $this->default_timeout,
441 50
            self::OPTION_IGNORE_DATA => false,
442
        ];
443
444 50
        $options = array_merge($defaults, $options);
445 50
        $options = SchedulerValidator::validateOptions($options);
446
447
        $document = [
448 46
            'class' => $class,
449
            'status' => JobInterface::STATUS_WAITING,
450 46
            'created' => new UTCDateTime(),
451 46
            'started' => new UTCDateTime(0),
452 46
            'ended' => new UTCDateTime(0),
453 46
            'worker' => new ObjectId(),
454 46
            'data' => $data,
455
        ];
456
457 46
        if (isset($options[self::OPTION_ID])) {
458 2
            $id = $options[self::OPTION_ID];
459 2
            unset($options[self::OPTION_ID]);
460 2
            $document['_id'] = $id;
461
        }
462
463 46
        $document['options'] = $options;
464
465 46
        return $document;
466
    }
467
468
    /**
469
     * Update job status.
470
     */
471 5
    protected function updateJob(ObjectId $id, int $status): UpdateResult
472
    {
473 5
        $result = $this->db->{$this->job_queue}->updateMany([
474 5
            '_id' => $id,
475
            '$isolated' => true,
476
        ], [
477
            '$set' => [
478 5
                'status' => $status,
479
            ],
480
        ]);
481
482 5
        return $result;
483
    }
484
}
485