Passed
Branch dev (7112df)
by Raffael
03:08
created

Scheduler::getJobQueueSize()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
c 0
b 0
f 0
ccs 2
cts 2
cp 1
rs 10
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use Closure;
16
use Generator;
17
use MongoDB\BSON\ObjectId;
18
use MongoDB\BSON\UTCDateTime;
19
use MongoDB\Database;
20
use MongoDB\UpdateResult;
21
use Psr\Log\LoggerInterface;
22
use TaskScheduler\Exception\InvalidArgumentException;
23
use TaskScheduler\Exception\JobNotFoundException;
24
25
class Scheduler
26
{
27
    /**
28
     * Job options.
29
     */
30
    public const OPTION_AT = 'at';
31
    public const OPTION_INTERVAL = 'interval';
32
    public const OPTION_RETRY = 'retry';
33
    public const OPTION_RETRY_INTERVAL = 'retry_interval';
34
    public const OPTION_IGNORE_MAX_CHILDREN = 'ignore_max_children';
35
    public const OPTION_TIMEOUT = 'timeout';
36
    public const OPTION_ID = 'id';
37
    public const OPTION_IGNORE_DATA = 'ignore_data';
38
39
    /**
40
     * Default job options.
41
     */
42
    public const OPTION_DEFAULT_AT = 'default_at';
43
    public const OPTION_DEFAULT_INTERVAL = 'default_interval';
44
    public const OPTION_DEFAULT_RETRY = 'default_retry';
45
    public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval';
46
    public const OPTION_DEFAULT_TIMEOUT = 'default_timeout';
47
48
    /**
49
     * Queue options.
50
     */
51
    public const OPTION_JOB_QUEUE = 'job_queue';
52
    public const OPTION_JOB_QUEUE_SIZE = 'job_queue_size';
53
    public const OPTION_EVENT_QUEUE = 'event_queue';
54
    public const OPTION_EVENT_QUEUE_SIZE = 'event_queue_size';
55
56
    /**
57
     * MongoDB type map.
58
     */
59
    public const TYPE_MAP = [
60
        'document' => 'array',
61
        'root' => 'array',
62
        'array' => 'array',
63
    ];
64
65
    /**
66
     * Database.
67
     *
68
     * @var Database
69
     */
70
    protected $db;
71
72
    /**
73
     * LoggerInterface.
74
     *
75
     * @var LoggerInterface
76
     */
77
    protected $logger;
78
79
    /**
80
     * Job Collection name.
81
     *
82
     * @var string
83
     */
84
    protected $job_queue = 'taskscheduler.jobs';
85
86
    /**
87
     * Event Collection name.
88
     *
89
     * @var string
90
     */
91
    protected $event_queue = 'taskscheduler.events';
92
93
    /**
94
     * Unix time.
95
     *
96
     * @var int
97
     */
98
    protected $default_at = 0;
99
100
    /**
101
     * Default interval (secconds).
102
     *
103
     * @var int
104
     */
105
    protected $default_interval = 0;
106
107
    /**
108
     * Default retry.
109
     *
110
     * @var int
111
     */
112
    protected $default_retry = 0;
113
114
    /**
115
     * Default retry interval (secconds).
116
     *
117
     * @var int
118
     */
119
    protected $default_retry_interval = 300;
120
121
    /**
122
     * Default timeout.
123
     *
124
     * @var int
125
     */
126
    protected $default_timeout = 0;
127
128
    /**
129
     * Job Queue size.
130
     *
131
     * @var int
132
     */
133
    protected $job_queue_size = 1000000;
134
135
    /**
136
     * Event Queue size.
137
     *
138
     * @var int
139
     */
140
    protected $event_queue_size = 5000000;
141
142
    /**
143
     * Events queue.
144
     *
145
     * @var MessageQueue
146
     */
147
    protected $events;
148
149
    /**
150
     * Init queue.
151
     */
152 67
    public function __construct(Database $db, LoggerInterface $logger, array $config = [])
153
    {
154 67
        $this->db = $db;
155 67
        $this->logger = $logger;
156 67
        $this->setOptions($config);
157 67
        $this->events = new MessageQueue($db, $this->getEventQueue(), $this->getEventQueueSize(), $logger);
158 67
    }
159
160
    /**
161
     * Set options.
162
     */
163 67
    public function setOptions(array $config = []): self
164
    {
165 67
        foreach ($config as $option => $value) {
166
            switch ($option) {
167 4
                case self::OPTION_JOB_QUEUE:
168 4
                case self::OPTION_EVENT_QUEUE:
169 2
                    $this->{$option} = (string) $value;
170
171 2
                break;
172 4
                case self::OPTION_DEFAULT_AT:
173 4
                case self::OPTION_DEFAULT_RETRY_INTERVAL:
174 4
                case self::OPTION_DEFAULT_INTERVAL:
175 4
                case self::OPTION_DEFAULT_RETRY:
176 3
                case self::OPTION_DEFAULT_TIMEOUT:
177 3
                case self::OPTION_JOB_QUEUE_SIZE:
178 3
                case self::OPTION_EVENT_QUEUE_SIZE:
179 3
                    $this->{$option} = (int) $value;
180
181 3
                break;
182
                default:
183 4
                    throw new InvalidArgumentException('invalid option '.$option.' given');
184
            }
185
        }
186
187 67
        return $this;
188
    }
189
190
    /**
191
     * Get job Queue size.
192
     */
193 42
    public function getJobQueueSize(): int
194
    {
195 42
        return $this->job_queue_size;
196
    }
197
198
    /**
199
     * Get event Queue size.
200
     */
201 67
    public function getEventQueueSize(): int
202
    {
203 67
        return $this->event_queue_size;
204
    }
205
206
    /**
207
     * Get job collection name.
208
     */
209 42
    public function getJobQueue(): string
210
    {
211 42
        return $this->job_queue;
212
    }
213
214
    /**
215
     * Get event collection name.
216
     */
217 67
    public function getEventQueue(): string
218
    {
219 67
        return $this->event_queue;
220
    }
221
222
    /**
223
     * Get job by Id.
224
     */
225 23
    public function getJob(ObjectId $id): Process
226
    {
227 23
        $result = $this->db->{$this->job_queue}->findOne([
228 23
            '_id' => $id,
229
        ], [
230 23
            'typeMap' => self::TYPE_MAP,
231
        ]);
232
233 23
        if (null === $result) {
234 1
            throw new JobNotFoundException('job '.$id.' was not found');
235
        }
236
237 22
        return new Process($result, $this, $this->events);
238
    }
239
240
    /**
241
     * Cancel job.
242
     */
243 5
    public function cancelJob(ObjectId $id): bool
244
    {
245 5
        $result = $this->updateJob($id, JobInterface::STATUS_CANCELED);
246
247 5
        if (1 !== $result->getMatchedCount()) {
248 1
            throw new JobNotFoundException('job '.$id.' was not found');
249
        }
250
251 4
        $this->db->{$this->event_queue}->insertOne([
252 4
            'job' => $id,
253
            'status' => JobInterface::STATUS_CANCELED,
254 4
            'timestamp' => new UTCDateTime(),
255
        ]);
256
257 4
        return true;
258
    }
259
260
    /**
261
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
262
     */
263 13
    public function getJobs(array $query = []): Generator
264
    {
265 13
        if (0 === count($query)) {
266 12
            $query = ['status' => ['$in' => [
267
                JobInterface::STATUS_WAITING,
268
                JobInterface::STATUS_PROCESSING,
269
                JobInterface::STATUS_POSTPONED,
270
            ]]];
271
        }
272
273 13
        $result = $this->db->{$this->job_queue}->find($query, [
274 13
            'typeMap' => self::TYPE_MAP,
275
        ]);
276
277 13
        foreach ($result as $job) {
278 12
            yield new Process($job, $this, $this->events);
279
        }
280 13
    }
281
282
    /**
283
     * Add job to queue.
284
     */
285 46
    public function addJob(string $class, $data, array $options = []): Process
286
    {
287 46
        $document = $this->prepareInsert($class, $data, $options);
288
289 42
        $result = $this->db->{$this->job_queue}->insertOne($document);
290 42
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
291 42
            'category' => get_class($this),
292 42
            'params' => $options,
293 42
            'data' => $data,
294
        ]);
295
296 42
        $this->db->{$this->event_queue}->insertOne([
297 42
            'job' => $result->getInsertedId(),
298
            'status' => JobInterface::STATUS_WAITING,
299 42
            'timestamp' => new UTCDateTime(),
300
        ]);
301
302 42
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getInsertedId()], [
303 42
            'typeMap' => self::TYPE_MAP,
304
        ]);
305
306 42
        $process = new Process($document, $this, $this->events);
307
308 42
        return $process;
309
    }
310
311
    /**
312
     * Only add job if not in queue yet.
313
     */
314 4
    public function addJobOnce(string $class, $data, array $options = []): Process
315
    {
316
        $filter = [
317 4
            'class' => $class,
318
            '$or' => [
319
                ['status' => JobInterface::STATUS_WAITING],
320
                ['status' => JobInterface::STATUS_POSTPONED],
321
                ['status' => JobInterface::STATUS_PROCESSING],
322
            ],
323
        ];
324
325 4
        $document = $this->prepareInsert($class, $data, $options);
326
327 4
        if (true !== $options[self::OPTION_IGNORE_DATA]) {
328 3
            $filter = ['data' => $data] + $filter;
329
        }
330
331 4
        $result = $this->db->{$this->job_queue}->updateOne($filter, ['$setOnInsert' => $document], [
332 4
            'upsert' => true,
333
            '$isolated' => true,
334
        ]);
335
336 4
        if ($result->getMatchedCount() > 0) {
337 3
            $document = $this->db->{$this->job_queue}->findOne($filter, [
338 3
                'typeMap' => self::TYPE_MAP,
339
            ]);
340
341 3
            if (array_intersect_key($document['options'], $options) !== $options || ($data !== $document['data'] && true === $options[self::OPTION_IGNORE_DATA])) {
342 2
                $this->logger->debug('job ['.$document['_id'].'] options/data changed, reschedule new job', [
343 2
                    'category' => get_class($this),
344 2
                    'data' => $data,
345
                ]);
346
347 2
                $this->cancelJob($document['_id']);
348
349 2
                return $this->addJobOnce($class, $data, $options);
350
            }
351
352 1
            return new Process($document, $this, $this->events);
353
        }
354
355 4
        $this->logger->debug('queue job ['.$result->getUpsertedId().'] added to ['.$class.']', [
356 4
            'category' => get_class($this),
357 4
            'params' => $options,
358 4
            'data' => $data,
359
        ]);
360
361 4
        $this->db->{$this->event_queue}->insertOne([
362 4
            'job' => $result->getUpsertedId(),
363
            'status' => JobInterface::STATUS_WAITING,
364 4
            'timestamp' => new UTCDateTime(),
365
        ]);
366
367 4
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getUpsertedId()], [
368 4
            'typeMap' => self::TYPE_MAP,
369
        ]);
370
371 4
        return new Process($document, $this, $this->events);
372
    }
373
374
    /**
375
     * Listen for events.
376
     */
377 1
    public function listen(Closure $callback, array $query = []): self
378
    {
379 1
        if (0 === count($query)) {
380
            $query = [
381
                'timestamp' => ['$gte' => new UTCDateTime()],
382
            ];
383
        }
384
385 1
        $cursor = $this->events->getCursor($query);
386
387 1
        while (true) {
388 1
            if (null === $cursor->current()) {
389
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

389
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
390
                    $this->logger->error('events queue cursor is dead, is it a capped collection?', [
391
                        'category' => get_class($this),
392
                    ]);
393
394
                    $this->events->create();
395
396
                    return $this->listen($callback, $query);
397
                }
398
399
                $this->events->next($cursor, function () use ($callback, $query) {
400
                    return $this->listen($callback, $query);
401
                });
402
403
                continue;
404
            }
405
406 1
            $result = $cursor->current();
407 1
            $this->events->next($cursor, function () use ($callback, $query) {
408
                $this->listen($callback, $query);
409 1
            });
410
411 1
            $process = new Process($result, $this, $this->events);
412 1
            if (true === $callback($process)) {
413 1
                return $this;
414
            }
415
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
416
    }
417
418
    /**
419
     * Prepare insert.
420
     */
421 50
    protected function prepareInsert(string $class, $data, array &$options = []): array
422
    {
423
        $defaults = [
424 50
            self::OPTION_AT => $this->default_at,
425 50
            self::OPTION_INTERVAL => $this->default_interval,
426 50
            self::OPTION_RETRY => $this->default_retry,
427 50
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
428 50
            self::OPTION_IGNORE_MAX_CHILDREN => false,
429 50
            self::OPTION_TIMEOUT => $this->default_timeout,
430 50
            self::OPTION_IGNORE_DATA => false,
431
        ];
432
433 50
        $options = array_merge($defaults, $options);
434 50
        $options = SchedulerValidator::validateOptions($options);
435
436
        $document = [
437 46
            'class' => $class,
438
            'status' => JobInterface::STATUS_WAITING,
439 46
            'created' => new UTCDateTime(),
440 46
            'started' => new UTCDateTime(0),
441 46
            'ended' => new UTCDateTime(0),
442 46
            'worker' => new ObjectId(),
443 46
            'data' => $data,
444
        ];
445
446 46
        if (isset($options[self::OPTION_ID])) {
447 3
            $id = $options[self::OPTION_ID];
448 3
            unset($options[self::OPTION_ID]);
449 3
            $document['_id'] = $id;
450
        }
451
452 46
        $document['options'] = $options;
453
454 46
        return $document;
455
    }
456
457
    /**
458
     * Update job status.
459
     */
460 5
    protected function updateJob(ObjectId $id, int $status): UpdateResult
461
    {
462 5
        $result = $this->db->{$this->job_queue}->updateMany([
463 5
            '_id' => $id,
464
            '$isolated' => true,
465
        ], [
466
            '$set' => [
467 5
                'status' => $status,
468
            ],
469
        ]);
470
471 5
        return $result;
472
    }
473
}
474