Completed
Branch dev (c58bbf)
by Raffael
03:36
created

Scheduler::getJob()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 13
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 13
c 0
b 0
f 0
ccs 7
cts 7
cp 1
rs 10
cc 2
nc 2
nop 1
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use Closure;
16
use Generator;
17
use MongoDB\BSON\ObjectId;
18
use MongoDB\BSON\UTCDateTime;
19
use MongoDB\Database;
20
use MongoDB\UpdateResult;
21
use Psr\Log\LoggerInterface;
22
use TaskScheduler\Exception\InvalidArgumentException;
23
use TaskScheduler\Exception\JobNotFoundException;
24
25
class Scheduler
26
{
27
    /**
28
     * Job options.
29
     */
30
    public const OPTION_AT = 'at';
31
    public const OPTION_INTERVAL = 'interval';
32
    public const OPTION_RETRY = 'retry';
33
    public const OPTION_RETRY_INTERVAL = 'retry_interval';
34
    public const OPTION_IGNORE_MAX_CHILDREN = 'ignore_max_children';
35
    public const OPTION_TIMEOUT = 'timeout';
36
    public const OPTION_DEFAULT_AT = 'default_at';
37
    public const OPTION_DEFAULT_INTERVAL = 'default_interval';
38
    public const OPTION_DEFAULT_RETRY = 'default_retry';
39
    public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval';
40
    public const OPTION_DEFAULT_TIMEOUT = 'default_timeout';
41
    public const OPTION_JOB_QUEUE = 'job_queue';
42
    public const OPTION_JOB_QUEUE_SIZE = 'job_queue_size';
43
    public const OPTION_EVENT_QUEUE = 'event_queue';
44
    public const OPTION_EVENT_QUEUE_SIZE = 'event_queue_size';
45
46
    /**
47
     * MongoDB type map.
48
     */
49
    public const TYPE_MAP = [
50
        'document' => 'array',
51
        'root' => 'array',
52
        'array' => 'array',
53
    ];
54
55
    /**
56
     * Database.
57
     *
58
     * @var Database
59
     */
60
    protected $db;
61
62
    /**
63
     * LoggerInterface.
64
     *
65
     * @var LoggerInterface
66
     */
67
    protected $logger;
68
69
    /**
70
     * Job Collection name.
71
     *
72
     * @var string
73
     */
74
    protected $job_queue = 'takscheduler.jobs';
75
76
    /**
77
     * Event Collection name.
78
     *
79
     * @var string
80
     */
81
    protected $event_queue = 'takscheduler.events';
82
83
    /**
84
     * Default at (Secconds from now).
85
     *
86
     * @var int
87
     */
88
    protected $default_at = 0;
89
90
    /**
91
     * Default interval (secconds).
92
     *
93
     * @var int
94
     */
95
    protected $default_interval = -1;
96
97
    /**
98
     * Default retry.
99
     *
100
     * @var int
101
     */
102
    protected $default_retry = -1;
103
104
    /**
105
     * Default retry interval (secconds).
106
     *
107
     * @var int
108
     */
109
    protected $default_retry_interval = 300;
110
111
    /**
112
     * Default timeout.
113
     *
114
     * @var int
115
     */
116
    protected $default_timeout = 0;
117
118
    /**
119
     * Job Queue size.
120
     *
121
     * @var int
122
     */
123
    protected $job_queue_size = 100000;
124
125
    /**
126
     * Event Queue size.
127
     *
128
     * @var int
129
     */
130
    protected $event_queue_size = 500000;
131
132
    /**
133
     * Events queue.
134
     *
135
     * @var MessageQueue
136
     */
137
    protected $events;
138
139
    /**
140
     * Init queue.
141
     */
142 54
    public function __construct(Database $db, LoggerInterface $logger, array $config = [])
143
    {
144 54
        $this->db = $db;
145 54
        $this->logger = $logger;
146 54
        $this->setOptions($config);
147 54
        $this->events = new MessageQueue($db, $this->getEventQueue(), $this->getEventQueueSize(), $logger);
148 54
    }
149
150
    /**
151
     * Set options.
152
     *
153
     *
154
     * @return Scheduler
155
     */
156 54
    public function setOptions(array $config = []): self
157
    {
158 54
        foreach ($config as $option => $value) {
159
            switch ($option) {
160 4
                case self::OPTION_JOB_QUEUE:
161 4
                case self::OPTION_EVENT_QUEUE:
162 2
                    $this->{$option} = (string) $value;
163
164 2
                break;
165 4
                case self::OPTION_DEFAULT_AT:
166 4
                case self::OPTION_DEFAULT_RETRY_INTERVAL:
167 4
                case self::OPTION_DEFAULT_INTERVAL:
168 4
                case self::OPTION_DEFAULT_RETRY:
169 3
                case self::OPTION_DEFAULT_TIMEOUT:
170 3
                case self::OPTION_JOB_QUEUE_SIZE:
171 3
                case self::OPTION_EVENT_QUEUE_SIZE:
172 3
                    $this->{$option} = (int) $value;
173
174 3
                break;
175
                default:
176 4
                    throw new InvalidArgumentException('invalid option '.$option.' given');
177
            }
178
        }
179
180 54
        return $this;
181
    }
182
183
    /**
184
     * Get job Queue size.
185
     */
186 36
    public function getJobQueueSize(): int
187
    {
188 36
        return $this->job_queue_size;
189
    }
190
191
    /**
192
     * Get event Queue size.
193
     */
194 54
    public function getEventQueueSize(): int
195
    {
196 54
        return $this->event_queue_size;
197
    }
198
199
    /**
200
     * Get job collection name.
201
     */
202 36
    public function getJobQueue(): string
203
    {
204 36
        return $this->job_queue;
205
    }
206
207
    /**
208
     * Get event collection name.
209
     */
210 54
    public function getEventQueue(): string
211
    {
212 54
        return $this->event_queue;
213
    }
214
215
    /**
216
     * Get job by Id.
217
     *
218
     * @param ObjectId
219
     */
220 16
    public function getJob(ObjectId $id): Process
221
    {
222 16
        $result = $this->db->{$this->job_queue}->findOne([
223 16
            '_id' => $id,
224
        ], [
225 16
            'typeMap' => self::TYPE_MAP,
226
        ]);
227
228 16
        if (null === $result) {
229 1
            throw new JobNotFoundException('job '.$id.' was not found');
230
        }
231
232 15
        return new Process($result, $this, $this->events);
233
    }
234
235
    /**
236
     * Cancel job.
237
     */
238 3
    public function cancelJob(ObjectId $id): bool
239
    {
240 3
        $result = $this->updateJob($id, JobInterface::STATUS_CANCELED);
241
242 3
        if (1 !== $result->getModifiedCount()) {
243 1
            throw new JobNotFoundException('job '.$id.' was not found');
244
        }
245
246 2
        return true;
247
    }
248
249
    /**
250
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
251
     */
252 7
    public function getJobs(array $query = []): Generator
253
    {
254 7
        if (0 === count($query)) {
255 6
            $query = ['status' => ['$in' => [
256
                JobInterface::STATUS_WAITING,
257
                JobInterface::STATUS_PROCESSING,
258
                JobInterface::STATUS_POSTPONED,
259
            ]]];
260
        }
261
262 7
        $result = $this->db->{$this->job_queue}->find($query, [
263 7
            'typeMap' => self::TYPE_MAP,
264
        ]);
265
266 7
        foreach ($result as $job) {
267 6
            yield new Process($job, $this, $this->events);
268
        }
269 7
    }
270
271
    /**
272
     * Add job to queue.
273
     */
274 37
    public function addJob(string $class, $data, array $options = []): Process
275
    {
276
        $defaults = [
277 37
            self::OPTION_AT => $this->default_at,
278 37
            self::OPTION_INTERVAL => $this->default_interval,
279 37
            self::OPTION_RETRY => $this->default_retry,
280 37
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
281 37
            self::OPTION_IGNORE_MAX_CHILDREN => false,
282 37
            self::OPTION_TIMEOUT => $this->default_timeout,
283
        ];
284
285 37
        $options = array_merge($defaults, $options);
286 37
        $this->validateOptions($options);
287
288 35
        if ($options[self::OPTION_AT] > 0) {
289 10
            $options[self::OPTION_AT] = new UTCDateTime($options[self::OPTION_AT] * 1000);
290
        }
291
292 35
        $result = $this->db->{$this->job_queue}->insertOne([
293 35
            'class' => $class,
294
            'status' => JobInterface::STATUS_WAITING,
295 35
            'created' => new UTCDateTime(),
296 35
            'started' => new UTCDateTime(0),
297 35
            'ended' => new UTCDateTime(0),
298 35
            'options' => $options,
299 35
            'data' => $data,
300 35
        ], ['$isolated' => true]);
301
302 35
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
303 35
            'category' => get_class($this),
304 35
            'params' => $options,
305 35
            'data' => $data,
306
        ]);
307
308 35
        $this->db->{$this->event_queue}->insertOne([
309 35
            'job' => $result->getInsertedId(),
310
            'status' => JobInterface::STATUS_WAITING,
311 35
            'timestamp' => new UTCDateTime(),
312
        ]);
313
314 35
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getInsertedId()], [
315 35
            'typeMap' => self::TYPE_MAP,
316
        ]);
317
318 35
        $process = new Process($document, $this, $this->events);
319
320 35
        return $process;
321
    }
322
323
    /**
324
     * Only add job if not in queue yet.
325
     */
326 2
    public function addJobOnce(string $class, $data, array $options = []): Process
327
    {
328
        $filter = [
329 2
            'class' => $class,
330 2
            'data' => $data,
331
            '$or' => [
332
                ['status' => JobInterface::STATUS_WAITING],
333
                ['status' => JobInterface::STATUS_POSTPONED],
334
            ],
335
        ];
336
337 2
        $result = $this->db->{$this->job_queue}->findOne($filter, [
338 2
            'typeMap' => self::TYPE_MAP,
339
        ]);
340
341 2
        if (null !== $result && array_intersect_key($result, $options) !== $options) {
342 1
            $this->logger->debug('job ['.$result['_id'].'] options changed, reschedule new job', [
343 1
                'category' => get_class($this),
344 1
                'data' => $data,
345
            ]);
346
347 1
            $this->cancelJob($result['_id']);
348 1
            $result = null;
349
        }
350
351 2
        if (null === $result) {
352 2
            return $this->addJob($class, $data, $options);
353
        }
354
355 1
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
356 1
            'category' => get_class($this),
357 1
            'data' => $data,
358
        ]);
359
360 1
        return new Process($result, $this, $this->events);
361
    }
362
363
    /**
364
     * Listen for events.
365
     */
366 1
    public function listen(Closure $callback, array $query = []): self
367
    {
368 1
        $cursor = $this->events->getCursor($query);
369
370 1
        while (true) {
371 1
            if (null === $cursor->current()) {
372
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

372
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
373
                    $this->logger->error('events queue cursor is dead, is it a capped collection?', [
374
                        'category' => get_class($this),
375
                    ]);
376
377
                    $this->events->create();
378
379
                    return $this->listen($callback, $query);
380
                }
381
382
                $this->events->next($cursor, function () use ($callback, $query) {
383
                    return $this->listen($callback, $query);
384
                });
385
386
                continue;
387
            }
388
389 1
            $result = $cursor->current();
390 1
            $this->events->next($cursor, function () use ($callback, $query) {
391
                $this->listen($callback, $query);
392 1
            });
393
394 1
            $process = new Process($result, $this, $this->events);
395 1
            if (true === $callback($process)) {
396 1
                return $this;
397
            }
398
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
399
    }
400
401
    /**
402
     * Validate given job options.
403
     */
404 37
    protected function validateOptions(array $options): self
405
    {
406 37
        foreach ($options as $option => $value) {
407
            switch ($option) {
408 37
                case self::OPTION_AT:
409 37
                case self::OPTION_INTERVAL:
410 37
                case self::OPTION_RETRY:
411 36
                case self::OPTION_RETRY_INTERVAL:
412 36
                case self::OPTION_TIMEOUT:
413 37
                    if (!is_int($value)) {
414 1
                        throw new InvalidArgumentException('option '.$option.' must be an integer');
415
                    }
416
417 37
                break;
418 36
                case self::OPTION_IGNORE_MAX_CHILDREN:
419 36
                    if (!is_bool($value)) {
420
                        throw new InvalidArgumentException('option '.$option.' must be a boolean');
421
                    }
422
423 36
                break;
424
                default:
425 37
                    throw new InvalidArgumentException('invalid option '.$option.' given');
426
            }
427
        }
428
429 35
        return $this;
430
    }
431
432
    /**
433
     * Update job status.
434
     */
435 3
    protected function updateJob(ObjectId $id, int $status): UpdateResult
436
    {
437 3
        $result = $this->db->{$this->job_queue}->updateMany([
438 3
            '_id' => $id,
439
            '$isolated' => true,
440
        ], [
441
            '$set' => [
442 3
                'status' => $status,
443
            ],
444
        ]);
445
446 3
        return $result;
447
    }
448
}
449