Scheduler::getEventQueueSize()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
c 0
b 0
f 0
ccs 2
cts 2
cp 1
rs 10
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use Closure;
16
use Generator;
17
use MongoDB\BSON\ObjectId;
18
use MongoDB\BSON\UTCDateTime;
19
use MongoDB\Database;
20
use MongoDB\UpdateResult;
21
use Psr\Log\LoggerInterface;
22
use TaskScheduler\Exception\InvalidArgumentException;
23
use TaskScheduler\Exception\JobNotFoundException;
24
25
class Scheduler
26
{
27
    /**
28
     * Job options.
29
     */
30
    public const OPTION_AT = 'at';
31
    public const OPTION_INTERVAL = 'interval';
32
    public const OPTION_RETRY = 'retry';
33
    public const OPTION_RETRY_INTERVAL = 'retry_interval';
34
    public const OPTION_FORCE_SPAWN = 'force_spawn';
35
    public const OPTION_TIMEOUT = 'timeout';
36
    public const OPTION_ID = 'id';
37
    public const OPTION_IGNORE_DATA = 'ignore_data';
38
39
    /**
40
     * Operation options:
41
     */
42
    public const OPTION_THROW_EXCEPTION = 1;
43
44
    /**
45
     * Default job options.
46
     */
47
    public const OPTION_DEFAULT_AT = 'default_at';
48
    public const OPTION_DEFAULT_INTERVAL = 'default_interval';
49
    public const OPTION_DEFAULT_RETRY = 'default_retry';
50
    public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval';
51
    public const OPTION_DEFAULT_TIMEOUT = 'default_timeout';
52
53
    /**
54
     * Queue options.
55
     */
56
    public const OPTION_JOB_QUEUE = 'job_queue';
57
    public const OPTION_JOB_QUEUE_SIZE = 'job_queue_size';
58
    public const OPTION_EVENT_QUEUE = 'event_queue';
59
    public const OPTION_EVENT_QUEUE_SIZE = 'event_queue_size';
60
61
    /**
62
     * MongoDB type map.
63
     */
64
    public const TYPE_MAP = [
65
        'document' => 'array',
66
        'root' => 'array',
67
        'array' => 'array',
68
    ];
69
70
    /**
71
     * Database.
72
     *
73
     * @var Database
74
     */
75
    protected $db;
76
77
    /**
78
     * LoggerInterface.
79
     *
80
     * @var LoggerInterface
81
     */
82
    protected $logger;
83
84
    /**
85
     * Job Collection name.
86
     *
87
     * @var string
88
     */
89
    protected $job_queue = 'taskscheduler.jobs';
90
91
    /**
92
     * Event Collection name.
93
     *
94
     * @var string
95
     */
96
    protected $event_queue = 'taskscheduler.events';
97
98
    /**
99
     * Unix time.
100
     *
101
     * @var int
102
     */
103
    protected $default_at = 0;
104
105
    /**
106
     * Default interval (secconds).
107
     *
108
     * @var int
109
     */
110
    protected $default_interval = 0;
111
112
    /**
113
     * Default retry.
114
     *
115
     * @var int
116
     */
117
    protected $default_retry = 0;
118
119
    /**
120
     * Default retry interval (secconds).
121
     *
122
     * @var int
123
     */
124
    protected $default_retry_interval = 300;
125
126
    /**
127
     * Default timeout.
128
     *
129
     * @var int
130
     */
131
    protected $default_timeout = 0;
132
133
    /**
134
     * Job Queue size.
135
     *
136
     * @var int
137
     */
138
    protected $job_queue_size = 1000000;
139
140
    /**
141
     * Event Queue size.
142
     *
143
     * @var int
144
     */
145
    protected $event_queue_size = 5000000;
146
147
    /**
148
     * Events queue.
149
     *
150
     * @var MessageQueue
151
     */
152
    protected $events;
153
154
    /**
155
     * Init queue.
156
     */
157 58
    public function __construct(Database $db, LoggerInterface $logger, array $config = [])
158
    {
159 58
        $this->db = $db;
160 58
        $this->logger = $logger;
161 58
        $this->setOptions($config);
162 58
        $this->events = new MessageQueue($db, $this->getEventQueue(), $this->getEventQueueSize(), $logger);
163 58
    }
164
165
    /**
166
     * Set options.
167
     */
168 58
    public function setOptions(array $config = []): self
169
    {
170 58
        foreach ($config as $option => $value) {
171
            switch ($option) {
172 4
                case self::OPTION_JOB_QUEUE:
173 4
                case self::OPTION_EVENT_QUEUE:
174 2
                    $this->{$option} = (string) $value;
175
176 2
                break;
177 4
                case self::OPTION_DEFAULT_AT:
178 4
                case self::OPTION_DEFAULT_RETRY_INTERVAL:
179 4
                case self::OPTION_DEFAULT_INTERVAL:
180 4
                case self::OPTION_DEFAULT_RETRY:
181 3
                case self::OPTION_DEFAULT_TIMEOUT:
182 3
                case self::OPTION_JOB_QUEUE_SIZE:
183 3
                case self::OPTION_EVENT_QUEUE_SIZE:
184 3
                    $this->{$option} = (int) $value;
185
186 3
                break;
187
                default:
188 4
                    throw new InvalidArgumentException('invalid option '.$option.' given');
189
            }
190
        }
191
192 58
        return $this;
193
    }
194
195
    /**
196
     * Get job Queue size.
197
     */
198 31
    public function getJobQueueSize(): int
199
    {
200 31
        return $this->job_queue_size;
201
    }
202
203
    /**
204
     * Get event Queue size.
205
     */
206 58
    public function getEventQueueSize(): int
207
    {
208 58
        return $this->event_queue_size;
209
    }
210
211
    /**
212
     * Get job collection name.
213
     */
214 31
    public function getJobQueue(): string
215
    {
216 31
        return $this->job_queue;
217
    }
218
219
    /**
220
     * Get event collection name.
221
     */
222 58
    public function getEventQueue(): string
223
    {
224 58
        return $this->event_queue;
225
    }
226
227
    /**
228
     * Get job by Id.
229
     */
230 27
    public function getJob(ObjectId $id): Process
231
    {
232 27
        $result = $this->db->{$this->job_queue}->findOne([
233 27
            '_id' => $id,
234
        ], [
235 27
            'typeMap' => self::TYPE_MAP,
236
        ]);
237
238 27
        if (null === $result) {
239 2
            throw new JobNotFoundException('job '.$id.' was not found');
240
        }
241
242 25
        return new Process($result, $this, $this->events);
243
    }
244
245
    /**
246
     * Cancel job.
247
     */
248 5
    public function cancelJob(ObjectId $id): bool
249
    {
250 5
        $result = $this->updateJob($id, JobInterface::STATUS_CANCELED);
251
252 5
        if (1 !== $result->getMatchedCount()) {
253 1
            throw new JobNotFoundException('job '.$id.' was not found');
254
        }
255
256 4
        $this->db->{$this->event_queue}->insertOne([
257 4
            'job' => $id,
258
            'status' => JobInterface::STATUS_CANCELED,
259 4
            'timestamp' => new UTCDateTime(),
260
        ]);
261
262 4
        return true;
263
    }
264
265
    /**
266
     * Flush.
267
     */
268 1
    public function flush(): Scheduler
269
    {
270 1
        $this->db->{$this->job_queue}->drop();
271 1
        $this->db->{$this->event_queue}->drop();
272
273 1
        return $this;
274
    }
275
276
    /**
277
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
278
     */
279 13
    public function getJobs(array $query = []): Generator
280
    {
281 13
        if (0 === count($query)) {
282 12
            $query = ['status' => ['$in' => [
283
                JobInterface::STATUS_WAITING,
284
                JobInterface::STATUS_PROCESSING,
285
                JobInterface::STATUS_POSTPONED,
286
            ]]];
287
        }
288
289 13
        $result = $this->db->{$this->job_queue}->find($query, [
290 13
            'typeMap' => self::TYPE_MAP,
291
        ]);
292
293 13
        foreach ($result as $job) {
294 12
            yield new Process($job, $this, $this->events);
295
        }
296 13
    }
297
298
    /**
299
     * Add job to queue.
300
     */
301 45
    public function addJob(string $class, $data, array $options = []): Process
302
    {
303 45
        $document = $this->prepareInsert($class, $data, $options);
304
305 41
        $result = $this->db->{$this->job_queue}->insertOne($document);
306 41
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
307 41
            'category' => get_class($this),
308 41
            'params' => $options,
309 41
            'data' => $data,
310
        ]);
311
312 41
        $this->db->{$this->event_queue}->insertOne([
313 41
            'job' => $result->getInsertedId(),
314
            'status' => JobInterface::STATUS_WAITING,
315 41
            'timestamp' => new UTCDateTime(),
316
        ]);
317
318 41
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getInsertedId()], [
319 41
            'typeMap' => self::TYPE_MAP,
320
        ]);
321
322 41
        $process = new Process($document, $this, $this->events);
323
324 41
        return $process;
325
    }
326
327
    /**
328
     * Only add job if not in queue yet.
329
     */
330 5
    public function addJobOnce(string $class, $data, array $options = []): Process
331
    {
332
        $filter = [
333 5
            'class' => $class,
334
            '$or' => [
335
                ['status' => JobInterface::STATUS_WAITING],
336
                ['status' => JobInterface::STATUS_POSTPONED],
337
                ['status' => JobInterface::STATUS_PROCESSING],
338
            ],
339
        ];
340
341 5
        $requested = $options;
342 5
        $document = $this->prepareInsert($class, $data, $options);
343
344 5
        if (true !== $options[self::OPTION_IGNORE_DATA]) {
345 4
            $filter = ['data' => $data] + $filter;
346
        }
347
348 5
        $result = $this->db->{$this->job_queue}->updateOne($filter, ['$setOnInsert' => $document], [
349 5
            'upsert' => true,
350
            '$isolated' => true,
351
        ]);
352
353 5
        if ($result->getMatchedCount() > 0) {
354 4
            $document = $this->db->{$this->job_queue}->findOne($filter, [
355 4
                'typeMap' => self::TYPE_MAP,
356
            ]);
357
358 4
            if (array_intersect_key($document['options'], $requested) !== $requested || ($data !== $document['data'] && true === $options[self::OPTION_IGNORE_DATA])) {
359 2
                $this->logger->debug('job ['.$document['_id'].'] options/data changed, reschedule new job', [
360 2
                    'category' => get_class($this),
361 2
                    'data' => $data,
362
                ]);
363
364 2
                $this->cancelJob($document['_id']);
365
366 2
                return $this->addJobOnce($class, $data, $options);
367
            }
368
369 2
            return new Process($document, $this, $this->events);
370
        }
371
372 5
        $this->logger->debug('queue job ['.$result->getUpsertedId().'] added to ['.$class.']', [
373 5
            'category' => get_class($this),
374 5
            'params' => $options,
375 5
            'data' => $data,
376
        ]);
377
378 5
        $this->db->{$this->event_queue}->insertOne([
379 5
            'job' => $result->getUpsertedId(),
380
            'status' => JobInterface::STATUS_WAITING,
381 5
            'timestamp' => new UTCDateTime(),
382
        ]);
383
384 5
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getUpsertedId()], [
385 5
            'typeMap' => self::TYPE_MAP,
386
        ]);
387
388 5
        return new Process($document, $this, $this->events);
389
    }
390
391
392
    /**
393
     * Wait for job beeing executed.
394
     *
395
     * @param Process[] $stack
396
     */
397
    public function waitFor(array $stack, int $options=0): Scheduler
398
    {
399
        $jobs = array_map(function($job) {
400
            if(!($job instanceof Process)) {
401
                throw new InvalidArgumentException('waitFor() requires a stack of Process[]');
402
            }
403
404
            return $job->getId();
405
        }, $stack);
406
407
        $cursor = $this->events->getCursor([
408
            'job' => ['$in' => $jobs],
409
            'status' => ['$gte' => JobInterface::STATUS_DONE],
410
        ]);
411
412
        $expected = count($stack);
413
        $done = 0;
414
415
        while (true) {
416
            if (null === $cursor->current()) {
417
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

417
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
418
                    $this->events->create();
419
420
                    return $this->waitFor($stack, $options);
421
                }
422
423
                $this->events->next($cursor, function () use($stack, $options) {
424
                    $this->waitFor($stack, $options);
425
                });
426
427
                continue;
428
            }
429
430
            $event = $cursor->current();
431
            $this->events->next($cursor, function () use($stack, $options) {
432
                $this->waitFor($stack, $options);
433
            });
434
435
            if (JobInterface::STATUS_FAILED === $event['status'] && isset($event['exception']) && $options & self::OPTION_THROW_EXCEPTION) {
436
                throw new $event['exception']['class'](
437
                    $event['exception']['message'],
438
                    $event['exception']['code']
439
                );
440
            }
441
442
            $done++;
443
444
            if($done >= $expected) {
445
                return $this;
446
            }
447
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
448
    }
449
450
    /**
451
     * Listen for events.
452
     */
453 1
    public function listen(Closure $callback, array $query = []): self
454
    {
455 1
        if (0 === count($query)) {
456
            $query = [
457
                'timestamp' => ['$gte' => new UTCDateTime()],
458
            ];
459
        }
460
461 1
        $cursor = $this->events->getCursor($query);
462
463 1
        while (true) {
464 1
            if (null === $cursor->current()) {
465
                if ($cursor->getInnerIterator()->isDead()) {
466
                    $this->logger->error('events queue cursor is dead, is it a capped collection?', [
467
                        'category' => get_class($this),
468
                    ]);
469
470
                    $this->events->create();
471
472
                    return $this->listen($callback, $query);
473
                }
474
475
                $this->events->next($cursor, function () use ($callback, $query) {
476
                    return $this->listen($callback, $query);
477
                });
478
479
                continue;
480
            }
481
482 1
            $result = $cursor->current();
483 1
            $this->events->next($cursor, function () use ($callback, $query) {
484
                $this->listen($callback, $query);
485 1
            });
486
487 1
            $process = new Process($result, $this, $this->events);
488 1
            if (true === $callback($process)) {
489 1
                return $this;
490
            }
491
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
492
    }
493
494
    /**
495
     * Prepare insert.
496
     */
497 50
    protected function prepareInsert(string $class, $data, array &$options = []): array
498
    {
499
        $defaults = [
500 50
            self::OPTION_AT => $this->default_at,
501 50
            self::OPTION_INTERVAL => $this->default_interval,
502 50
            self::OPTION_RETRY => $this->default_retry,
503 50
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
504 50
            self::OPTION_FORCE_SPAWN => false,
505 50
            self::OPTION_TIMEOUT => $this->default_timeout,
506 50
            self::OPTION_IGNORE_DATA => false,
507
        ];
508
509 50
        $options = array_merge($defaults, $options);
510 50
        $options = SchedulerValidator::validateOptions($options);
511
512
        $document = [
513 46
            'class' => $class,
514
            'status' => JobInterface::STATUS_WAITING,
515 46
            'created' => new UTCDateTime(),
516 46
            'started' => new UTCDateTime(),
517 46
            'ended' => new UTCDateTime(),
518 46
            'worker' => new ObjectId(),
519 46
            'data' => $data,
520
        ];
521
522 46
        if (isset($options[self::OPTION_ID])) {
523 2
            $id = $options[self::OPTION_ID];
524 2
            unset($options[self::OPTION_ID]);
525 2
            $document['_id'] = $id;
526
        }
527
528 46
        $document['options'] = $options;
529
530 46
        return $document;
531
    }
532
533
    /**
534
     * Update job status.
535
     */
536 5
    protected function updateJob(ObjectId $id, int $status): UpdateResult
537
    {
538 5
        $result = $this->db->{$this->job_queue}->updateMany([
539 5
            '_id' => $id,
540
            '$isolated' => true,
541
        ], [
542
            '$set' => [
543 5
                'status' => $status,
544
            ],
545
        ]);
546
547 5
        return $result;
548
    }
549
}
550