Completed
Branch dev (58fe22)
by Raffael
02:55
created

Scheduler::listen()   B

Complexity

Conditions 6
Paths 10

Size

Total Lines 37
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 10.5

Importance

Changes 0
Metric Value
eloc 20
dl 0
loc 37
ccs 11
cts 22
cp 0.5
rs 8.9777
c 0
b 0
f 0
cc 6
nc 10
nop 2
crap 10.5
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use Closure;
16
use Generator;
17
use MongoDB\BSON\ObjectId;
18
use MongoDB\BSON\UTCDateTime;
19
use MongoDB\Database;
20
use MongoDB\UpdateResult;
21
use Psr\Log\LoggerInterface;
22
use TaskScheduler\Exception\InvalidArgumentException;
23
use TaskScheduler\Exception\JobNotFoundException;
24
25
class Scheduler
26
{
27
    /**
28
     * Job options.
29
     */
30
    public const OPTION_AT = 'at';
31
    public const OPTION_INTERVAL = 'interval';
32
    public const OPTION_RETRY = 'retry';
33
    public const OPTION_RETRY_INTERVAL = 'retry_interval';
34
    public const OPTION_IGNORE_MAX_CHILDREN = 'ignore_max_children';
35
    public const OPTION_TIMEOUT = 'timeout';
36
    public const OPTION_DEFAULT_AT = 'default_at';
37
    public const OPTION_DEFAULT_INTERVAL = 'default_interval';
38
    public const OPTION_DEFAULT_RETRY = 'default_retry';
39
    public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval';
40
    public const OPTION_DEFAULT_TIMEOUT = 'default_timeout';
41
    public const OPTION_JOB_QUEUE = 'job_queue';
42
    public const OPTION_JOB_QUEUE_SIZE = 'job_queue_size';
43
    public const OPTION_EVENT_QUEUE = 'event_queue';
44
    public const OPTION_EVENT_QUEUE_SIZE = 'event_queue_size';
45
46
    /**
47
     * MongoDB type map.
48
     */
49
    public const TYPE_MAP = [
50
        'document' => 'array',
51
        'root' => 'array',
52
        'array' => 'array',
53
    ];
54
55
    /**
56
     * Database.
57
     *
58
     * @var Database
59
     */
60
    protected $db;
61
62
    /**
63
     * LoggerInterface.
64
     *
65
     * @var LoggerInterface
66
     */
67
    protected $logger;
68
69
    /**
70
     * Job Collection name.
71
     *
72
     * @var string
73
     */
74
    protected $job_queue = 'takscheduler.jobs';
75
76
    /**
77
     * Event Collection name.
78
     *
79
     * @var string
80
     */
81
    protected $event_queue = 'takscheduler.events';
82
83
    /**
84
     * Default at (Secconds from now).
85
     *
86
     * @var int
87
     */
88
    protected $default_at = 0;
89
90
    /**
91
     * Default interval (secconds).
92
     *
93
     * @var int
94
     */
95
    protected $default_interval = -1;
96
97
    /**
98
     * Default retry.
99
     *
100
     * @var int
101
     */
102
    protected $default_retry = -1;
103
104
    /**
105
     * Default retry interval (secconds).
106
     *
107
     * @var int
108
     */
109
    protected $default_retry_interval = 300;
110
111
    /**
112
     * Default timeout.
113
     *
114
     * @var int
115
     */
116
    protected $default_timeout = 0;
117
118
    /**
119
     * Job Queue size.
120
     *
121
     * @var int
122
     */
123
    protected $job_queue_size = 100000;
124
125
    /**
126
     * Event Queue size.
127
     *
128
     * @var int
129
     */
130
    protected $event_queue_size = 500000;
131
132
    /**
133
     * Events queue.
134
     *
135
     * @var MessageQueue
136
     */
137
    protected $events;
138
139
    /**
140
     * Init queue.
141
     */
142 54
    public function __construct(Database $db, LoggerInterface $logger, array $config = [])
143
    {
144 54
        $this->db = $db;
145 54
        $this->logger = $logger;
146 54
        $this->setOptions($config);
147 54
        $this->events = new MessageQueue($db, $this->getEventQueue(), $this->getEventQueueSize(), $logger);
148 54
    }
149
150
    /**
151
     * Set options.
152
     */
153 54
    public function setOptions(array $config = []): self
154
    {
155 54
        foreach ($config as $option => $value) {
156
            switch ($option) {
157 4
                case self::OPTION_JOB_QUEUE:
158 4
                case self::OPTION_EVENT_QUEUE:
159 2
                    $this->{$option} = (string) $value;
160
161 2
                break;
162 4
                case self::OPTION_DEFAULT_AT:
163 4
                case self::OPTION_DEFAULT_RETRY_INTERVAL:
164 4
                case self::OPTION_DEFAULT_INTERVAL:
165 4
                case self::OPTION_DEFAULT_RETRY:
166 3
                case self::OPTION_DEFAULT_TIMEOUT:
167 3
                case self::OPTION_JOB_QUEUE_SIZE:
168 3
                case self::OPTION_EVENT_QUEUE_SIZE:
169 3
                    $this->{$option} = (int) $value;
170
171 3
                break;
172
                default:
173 4
                    throw new InvalidArgumentException('invalid option '.$option.' given');
174
            }
175
        }
176
177 54
        return $this;
178
    }
179
180
    /**
181
     * Get job Queue size.
182
     */
183 36
    public function getJobQueueSize(): int
184
    {
185 36
        return $this->job_queue_size;
186
    }
187
188
    /**
189
     * Get event Queue size.
190
     */
191 54
    public function getEventQueueSize(): int
192
    {
193 54
        return $this->event_queue_size;
194
    }
195
196
    /**
197
     * Get job collection name.
198
     */
199 36
    public function getJobQueue(): string
200
    {
201 36
        return $this->job_queue;
202
    }
203
204
    /**
205
     * Get event collection name.
206
     */
207 54
    public function getEventQueue(): string
208
    {
209 54
        return $this->event_queue;
210
    }
211
212
    /**
213
     * Get job by Id.
214
     */
215 16
    public function getJob(ObjectId $id): Process
216
    {
217 16
        $result = $this->db->{$this->job_queue}->findOne([
218 16
            '_id' => $id,
219
        ], [
220 16
            'typeMap' => self::TYPE_MAP,
221
        ]);
222
223 16
        if (null === $result) {
224 1
            throw new JobNotFoundException('job '.$id.' was not found');
225
        }
226
227 15
        return new Process($result, $this, $this->events);
228
    }
229
230
    /**
231
     * Cancel job.
232
     */
233 3
    public function cancelJob(ObjectId $id): bool
234
    {
235 3
        $result = $this->updateJob($id, JobInterface::STATUS_CANCELED);
236
237 3
        if (1 !== $result->getModifiedCount()) {
238 1
            throw new JobNotFoundException('job '.$id.' was not found');
239
        }
240
241 2
        $this->db->{$this->event_queue}->insertOne([
242 2
            'job' => $id,
243
            'status' => JobInterface::STATUS_CANCELED,
244 2
            'timestamp' => new UTCDateTime(),
245
        ]);
246
247 2
        return true;
248
    }
249
250
    /**
251
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
252
     */
253 7
    public function getJobs(array $query = []): Generator
254
    {
255 7
        if (0 === count($query)) {
256 6
            $query = ['status' => ['$in' => [
257
                JobInterface::STATUS_WAITING,
258
                JobInterface::STATUS_PROCESSING,
259
                JobInterface::STATUS_POSTPONED,
260
            ]]];
261
        }
262
263 7
        $result = $this->db->{$this->job_queue}->find($query, [
264 7
            'typeMap' => self::TYPE_MAP,
265
        ]);
266
267 7
        foreach ($result as $job) {
268 6
            yield new Process($job, $this, $this->events);
269
        }
270 7
    }
271
272
    /**
273
     * Add job to queue.
274
     */
275 37
    public function addJob(string $class, $data, array $options = []): Process
276
    {
277
        $defaults = [
278 37
            self::OPTION_AT => $this->default_at,
279 37
            self::OPTION_INTERVAL => $this->default_interval,
280 37
            self::OPTION_RETRY => $this->default_retry,
281 37
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
282 37
            self::OPTION_IGNORE_MAX_CHILDREN => false,
283 37
            self::OPTION_TIMEOUT => $this->default_timeout,
284
        ];
285
286 37
        $options = array_merge($defaults, $options);
287 37
        $this->validateOptions($options);
288
289 35
        if ($options[self::OPTION_AT] > 0) {
290 9
            $options[self::OPTION_AT] = new UTCDateTime($options[self::OPTION_AT] * 1000);
291
        }
292
293 35
        $result = $this->db->{$this->job_queue}->insertOne([
294 35
            'class' => $class,
295
            'status' => JobInterface::STATUS_WAITING,
296 35
            'created' => new UTCDateTime(),
297 35
            'started' => new UTCDateTime(0),
298 35
            'ended' => new UTCDateTime(0),
299 35
            'worker' => new ObjectId(),
300 35
            'options' => $options,
301 35
            'data' => $data,
302 35
        ], ['$isolated' => true]);
303
304 35
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
305 35
            'category' => get_class($this),
306 35
            'params' => $options,
307 35
            'data' => $data,
308
        ]);
309
310 35
        $this->db->{$this->event_queue}->insertOne([
311 35
            'job' => $result->getInsertedId(),
312
            'status' => JobInterface::STATUS_WAITING,
313 35
            'timestamp' => new UTCDateTime(),
314
        ]);
315
316 35
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getInsertedId()], [
317 35
            'typeMap' => self::TYPE_MAP,
318
        ]);
319
320 35
        $process = new Process($document, $this, $this->events);
321
322 35
        return $process;
323
    }
324
325
    /**
326
     * Only add job if not in queue yet.
327
     */
328 2
    public function addJobOnce(string $class, $data, array $options = []): Process
329
    {
330
        $filter = [
331 2
            'class' => $class,
332 2
            'data' => $data,
333
            '$or' => [
334
                ['status' => JobInterface::STATUS_WAITING],
335
                ['status' => JobInterface::STATUS_POSTPONED],
336
            ],
337
        ];
338
339 2
        $result = $this->db->{$this->job_queue}->findOne($filter, [
340 2
            'typeMap' => self::TYPE_MAP,
341
        ]);
342
343 2
        if (null !== $result && array_intersect_key($result, $options) !== $options) {
344 1
            $this->logger->debug('job ['.$result['_id'].'] options changed, reschedule new job', [
345 1
                'category' => get_class($this),
346 1
                'data' => $data,
347
            ]);
348
349 1
            $this->cancelJob($result['_id']);
350 1
            $result = null;
351
        }
352
353 2
        if (null === $result) {
354 2
            return $this->addJob($class, $data, $options);
355
        }
356
357 1
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
358 1
            'category' => get_class($this),
359 1
            'data' => $data,
360
        ]);
361
362 1
        return new Process($result, $this, $this->events);
363
    }
364
365
    /**
366
     * Listen for events.
367
     */
368 1
    public function listen(Closure $callback, array $query = []): self
369
    {
370 1
        if (0 === count($query)) {
371
            $query = [
372
                'timestamp' => ['$gte' => new UTCDateTime()],
373
            ];
374
        }
375
376 1
        $cursor = $this->events->getCursor($query);
377
378 1
        while (true) {
379 1
            if (null === $cursor->current()) {
380
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

380
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
381
                    $this->logger->error('events queue cursor is dead, is it a capped collection?', [
382
                        'category' => get_class($this),
383
                    ]);
384
385
                    $this->events->create();
386
387
                    return $this->listen($callback, $query);
388
                }
389
390
                $this->events->next($cursor, function () use ($callback, $query) {
391
                    return $this->listen($callback, $query);
392
                });
393
394
                continue;
395
            }
396
397 1
            $result = $cursor->current();
398 1
            $this->events->next($cursor, function () use ($callback, $query) {
399
                $this->listen($callback, $query);
400 1
            });
401
402 1
            $process = new Process($result, $this, $this->events);
403 1
            if (true === $callback($process)) {
404 1
                return $this;
405
            }
406
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
407
    }
408
409
    /**
410
     * Validate given job options.
411
     */
412 37
    protected function validateOptions(array $options): self
413
    {
414 37
        foreach ($options as $option => $value) {
415
            switch ($option) {
416 37
                case self::OPTION_AT:
417 37
                case self::OPTION_INTERVAL:
418 37
                case self::OPTION_RETRY:
419 36
                case self::OPTION_RETRY_INTERVAL:
420 36
                case self::OPTION_TIMEOUT:
421 37
                    if (!is_int($value)) {
422 1
                        throw new InvalidArgumentException('option '.$option.' must be an integer');
423
                    }
424
425 37
                break;
426 36
                case self::OPTION_IGNORE_MAX_CHILDREN:
427 36
                    if (!is_bool($value)) {
428
                        throw new InvalidArgumentException('option '.$option.' must be a boolean');
429
                    }
430
431 36
                break;
432
                default:
433 37
                    throw new InvalidArgumentException('invalid option '.$option.' given');
434
            }
435
        }
436
437 35
        return $this;
438
    }
439
440
    /**
441
     * Update job status.
442
     */
443 3
    protected function updateJob(ObjectId $id, int $status): UpdateResult
444
    {
445 3
        $result = $this->db->{$this->job_queue}->updateMany([
446 3
            '_id' => $id,
447
            '$isolated' => true,
448
        ], [
449
            '$set' => [
450 3
                'status' => $status,
451
            ],
452
        ]);
453
454 3
        return $result;
455
    }
456
}
457