Completed
Branch dev (93c9bf)
by Raffael
04:08
created

Scheduler::getJobQueue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
c 0
b 0
f 0
ccs 2
cts 2
cp 1
rs 10
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use Closure;
16
use Generator;
17
use MongoDB\BSON\ObjectId;
18
use MongoDB\BSON\UTCDateTime;
19
use MongoDB\Database;
20
use MongoDB\UpdateResult;
21
use Psr\Log\LoggerInterface;
22
use TaskScheduler\Exception\InvalidArgumentException;
23
use TaskScheduler\Exception\JobNotFoundException;
24
25
class Scheduler
26
{
27
    /**
28
     * Job options.
29
     */
30
    public const OPTION_AT = 'at';
31
    public const OPTION_INTERVAL = 'interval';
32
    public const OPTION_RETRY = 'retry';
33
    public const OPTION_RETRY_INTERVAL = 'retry_interval';
34
    public const OPTION_IGNORE_MAX_CHILDREN = 'ignore_max_children';
35
    public const OPTION_TIMEOUT = 'timeout';
36
    public const OPTION_ID = 'id';
37
38
    /**
39
     * Default job options.
40
     */
41
    public const OPTION_DEFAULT_AT = 'default_at';
42
    public const OPTION_DEFAULT_INTERVAL = 'default_interval';
43
    public const OPTION_DEFAULT_RETRY = 'default_retry';
44
    public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval';
45
    public const OPTION_DEFAULT_TIMEOUT = 'default_timeout';
46
47
    /**
48
     * Queue options.
49
     */
50
    public const OPTION_JOB_QUEUE = 'job_queue';
51
    public const OPTION_JOB_QUEUE_SIZE = 'job_queue_size';
52
    public const OPTION_EVENT_QUEUE = 'event_queue';
53
    public const OPTION_EVENT_QUEUE_SIZE = 'event_queue_size';
54
55
    /**
56
     * MongoDB type map.
57
     */
58
    public const TYPE_MAP = [
59
        'document' => 'array',
60
        'root' => 'array',
61
        'array' => 'array',
62
    ];
63
64
    /**
65
     * Database.
66
     *
67
     * @var Database
68
     */
69
    protected $db;
70
71
    /**
72
     * LoggerInterface.
73
     *
74
     * @var LoggerInterface
75
     */
76
    protected $logger;
77
78
    /**
79
     * Job Collection name.
80
     *
81
     * @var string
82
     */
83
    protected $job_queue = 'taskscheduler.jobs';
84
85
    /**
86
     * Event Collection name.
87
     *
88
     * @var string
89
     */
90
    protected $event_queue = 'taskscheduler.events';
91
92
    /**
93
     * Default at (Secconds from now).
94
     *
95
     * @var int
96
     */
97
    protected $default_at = 0;
98
99
    /**
100
     * Default interval (secconds).
101
     *
102
     * @var int
103
     */
104
    protected $default_interval = 0;
105
106
    /**
107
     * Default retry.
108
     *
109
     * @var int
110
     */
111
    protected $default_retry = 0;
112
113
    /**
114
     * Default retry interval (secconds).
115
     *
116
     * @var int
117
     */
118
    protected $default_retry_interval = 300;
119
120
    /**
121
     * Default timeout.
122
     *
123
     * @var int
124
     */
125
    protected $default_timeout = 0;
126
127
    /**
128
     * Job Queue size.
129
     *
130
     * @var int
131
     */
132
    protected $job_queue_size = 1000000;
133
134
    /**
135
     * Event Queue size.
136
     *
137
     * @var int
138
     */
139
    protected $event_queue_size = 5000000;
140
141
    /**
142
     * Events queue.
143
     *
144
     * @var MessageQueue
145
     */
146
    protected $events;
147
148
    /**
149
     * Init queue.
150
     */
151 63
    public function __construct(Database $db, LoggerInterface $logger, array $config = [])
152
    {
153 63
        $this->db = $db;
154 63
        $this->logger = $logger;
155 63
        $this->setOptions($config);
156 63
        $this->events = new MessageQueue($db, $this->getEventQueue(), $this->getEventQueueSize(), $logger);
157 63
    }
158
159
    /**
160
     * Set options.
161
     */
162 63
    public function setOptions(array $config = []): self
163
    {
164 63
        foreach ($config as $option => $value) {
165
            switch ($option) {
166 4
                case self::OPTION_JOB_QUEUE:
167 4
                case self::OPTION_EVENT_QUEUE:
168 2
                    $this->{$option} = (string) $value;
169
170 2
                break;
171 4
                case self::OPTION_DEFAULT_AT:
172 4
                case self::OPTION_DEFAULT_RETRY_INTERVAL:
173 4
                case self::OPTION_DEFAULT_INTERVAL:
174 4
                case self::OPTION_DEFAULT_RETRY:
175 3
                case self::OPTION_DEFAULT_TIMEOUT:
176 3
                case self::OPTION_JOB_QUEUE_SIZE:
177 3
                case self::OPTION_EVENT_QUEUE_SIZE:
178 3
                    $this->{$option} = (int) $value;
179
180 3
                break;
181
                default:
182 4
                    throw new InvalidArgumentException('invalid option '.$option.' given');
183
            }
184
        }
185
186 63
        return $this;
187
    }
188
189
    /**
190
     * Get job Queue size.
191
     */
192 42
    public function getJobQueueSize(): int
193
    {
194 42
        return $this->job_queue_size;
195
    }
196
197
    /**
198
     * Get event Queue size.
199
     */
200 63
    public function getEventQueueSize(): int
201
    {
202 63
        return $this->event_queue_size;
203
    }
204
205
    /**
206
     * Get job collection name.
207
     */
208 42
    public function getJobQueue(): string
209
    {
210 42
        return $this->job_queue;
211
    }
212
213
    /**
214
     * Get event collection name.
215
     */
216 63
    public function getEventQueue(): string
217
    {
218 63
        return $this->event_queue;
219
    }
220
221
    /**
222
     * Get job by Id.
223
     */
224 19
    public function getJob(ObjectId $id): Process
225
    {
226 19
        $result = $this->db->{$this->job_queue}->findOne([
227 19
            '_id' => $id,
228
        ], [
229 19
            'typeMap' => self::TYPE_MAP,
230
        ]);
231
232 19
        if (null === $result) {
233 1
            throw new JobNotFoundException('job '.$id.' was not found');
234
        }
235
236 18
        return new Process($result, $this, $this->events);
237
    }
238
239
    /**
240
     * Cancel job.
241
     */
242 3
    public function cancelJob(ObjectId $id): bool
243
    {
244 3
        $result = $this->updateJob($id, JobInterface::STATUS_CANCELED);
245
246 3
        if (1 !== $result->getModifiedCount()) {
247 1
            throw new JobNotFoundException('job '.$id.' was not found');
248
        }
249
250 2
        $this->db->{$this->event_queue}->insertOne([
251 2
            'job' => $id,
252
            'status' => JobInterface::STATUS_CANCELED,
253 2
            'timestamp' => new UTCDateTime(),
254
        ]);
255
256 2
        return true;
257
    }
258
259
    /**
260
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
261
     */
262 11
    public function getJobs(array $query = []): Generator
263
    {
264 11
        if (0 === count($query)) {
265 10
            $query = ['status' => ['$in' => [
266
                JobInterface::STATUS_WAITING,
267
                JobInterface::STATUS_PROCESSING,
268
                JobInterface::STATUS_POSTPONED,
269
            ]]];
270
        }
271
272 11
        $result = $this->db->{$this->job_queue}->find($query, [
273 11
            'typeMap' => self::TYPE_MAP,
274
        ]);
275
276 11
        foreach ($result as $job) {
277 10
            yield new Process($job, $this, $this->events);
278
        }
279 11
    }
280
281
    /**
282
     * Add job to queue.
283
     */
284 44
    public function addJob(string $class, $data, array $options = []): Process
285
    {
286 44
        $document = $this->prepareInsert($class, $data, $options);
287
288 41
        $result = $this->db->{$this->job_queue}->insertOne($document);
289 41
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
290 41
            'category' => get_class($this),
291 41
            'params' => $options,
292 41
            'data' => $data,
293
        ]);
294
295 41
        $this->db->{$this->event_queue}->insertOne([
296 41
            'job' => $result->getInsertedId(),
297
            'status' => JobInterface::STATUS_WAITING,
298 41
            'timestamp' => new UTCDateTime(),
299
        ]);
300
301 41
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getInsertedId()], [
302 41
            'typeMap' => self::TYPE_MAP,
303
        ]);
304
305 41
        $process = new Process($document, $this, $this->events);
306
307 41
        return $process;
308
    }
309
310
    /**
311
     * Only add job if not in queue yet.
312
     */
313 2
    public function addJobOnce(string $class, $data, array $options = []): Process
314
    {
315
        $filter = [
316 2
            'class' => $class,
317 2
            'data' => $data,
318
            '$or' => [
319
                ['status' => JobInterface::STATUS_WAITING],
320
                ['status' => JobInterface::STATUS_POSTPONED],
321
                ['status' => JobInterface::STATUS_PROCESSING],
322
            ],
323
        ];
324
325 2
        $document = $this->prepareInsert($class, $data, $options);
326
327 2
        $result = $this->db->{$this->job_queue}->updateOne($filter, ['$setOnInsert' => $document], [
328 2
            'upsert' => true,
329
            '$isolated' => true,
330
        ]);
331
332 2
        if ($result->getMatchedCount() > 0) {
333 2
            $document = $this->db->{$this->job_queue}->findOne($filter, [
334 2
                'typeMap' => self::TYPE_MAP,
335
            ]);
336
337 2
            if (array_intersect_key($document['options'], $options) !== $options) {
338 1
                $this->logger->debug('job ['.$document['_id'].'] options changed, reschedule new job', [
339 1
                    'category' => get_class($this),
340 1
                    'data' => $data,
341
                ]);
342
343 1
                $this->cancelJob($document['_id']);
344
345 1
                return $this->addJobOnce($class, $data, $options);
346
            }
347
348 1
            return new Process($document, $this, $this->events);
349
        }
350
351 2
        $this->logger->debug('queue job ['.$result->getUpsertedId().'] added to ['.$class.']', [
352 2
            'category' => get_class($this),
353 2
            'params' => $options,
354 2
            'data' => $data,
355
        ]);
356
357 2
        $this->db->{$this->event_queue}->insertOne([
358 2
            'job' => $result->getUpsertedId(),
359
            'status' => JobInterface::STATUS_WAITING,
360 2
            'timestamp' => new UTCDateTime(),
361
        ]);
362
363 2
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getUpsertedId()], [
364 2
            'typeMap' => self::TYPE_MAP,
365
        ]);
366
367 2
        return new Process($document, $this, $this->events);
368
    }
369
370
    /**
371
     * Listen for events.
372
     */
373 1
    public function listen(Closure $callback, array $query = []): self
374
    {
375 1
        if (0 === count($query)) {
376
            $query = [
377
                'timestamp' => ['$gte' => new UTCDateTime()],
378
            ];
379
        }
380
381 1
        $cursor = $this->events->getCursor($query);
382
383 1
        while (true) {
384 1
            if (null === $cursor->current()) {
385
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

385
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
386
                    $this->logger->error('events queue cursor is dead, is it a capped collection?', [
387
                        'category' => get_class($this),
388
                    ]);
389
390
                    $this->events->create();
391
392
                    return $this->listen($callback, $query);
393
                }
394
395
                $this->events->next($cursor, function () use ($callback, $query) {
396
                    return $this->listen($callback, $query);
397
                });
398
399
                continue;
400
            }
401
402 1
            $result = $cursor->current();
403 1
            $this->events->next($cursor, function () use ($callback, $query) {
404
                $this->listen($callback, $query);
405 1
            });
406
407 1
            $process = new Process($result, $this, $this->events);
408 1
            if (true === $callback($process)) {
409 1
                return $this;
410
            }
411
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
412
    }
413
414
    /**
415
     * Prepare insert.
416
     */
417 46
    protected function prepareInsert(string $class, $data, array &$options = []): array
418
    {
419
        $defaults = [
420 46
            self::OPTION_AT => $this->default_at,
421 46
            self::OPTION_INTERVAL => $this->default_interval,
422 46
            self::OPTION_RETRY => $this->default_retry,
423 46
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
424 46
            self::OPTION_IGNORE_MAX_CHILDREN => false,
425 46
            self::OPTION_TIMEOUT => $this->default_timeout,
426
        ];
427
428 46
        $options = array_merge($defaults, $options);
429 46
        $options = SchedulerValidator::validateOptions($options);
430
431
        $document = [
432 43
            'class' => $class,
433
            'status' => JobInterface::STATUS_WAITING,
434 43
            'created' => new UTCDateTime(),
435 43
            'started' => new UTCDateTime(0),
436 43
            'ended' => new UTCDateTime(0),
437 43
            'worker' => new ObjectId(),
438 43
            'data' => $data,
439
        ];
440
441 43
        if (isset($options[self::OPTION_ID])) {
442 3
            $id = $options[self::OPTION_ID];
443 3
            unset($options[self::OPTION_ID]);
444 3
            $document['_id'] = $id;
445
        }
446
447 43
        if (is_int($options[self::OPTION_AT]) && $options[self::OPTION_AT] > 0) {
448 14
            $options[self::OPTION_AT] = new UTCDateTime($options[self::OPTION_AT] * 1000);
449
        }
450
451 43
        $document['options'] = $options;
452
453 43
        return $document;
454
    }
455
456
    /**
457
     * Update job status.
458
     */
459 3
    protected function updateJob(ObjectId $id, int $status): UpdateResult
460
    {
461 3
        $result = $this->db->{$this->job_queue}->updateMany([
462 3
            '_id' => $id,
463
            '$isolated' => true,
464
        ], [
465
            '$set' => [
466 3
                'status' => $status,
467
            ],
468
        ]);
469
470 3
        return $result;
471
    }
472
}
473