Completed
Branch dev (05d0ad)
by Raffael
02:25
created

Scheduler::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 6
c 0
b 0
f 0
ccs 5
cts 5
cp 1
rs 10
cc 1
nc 1
nop 3
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use Closure;
16
use Generator;
17
use MongoDB\BSON\ObjectId;
18
use MongoDB\BSON\UTCDateTime;
19
use MongoDB\Database;
20
use MongoDB\UpdateResult;
21
use Psr\Log\LoggerInterface;
22
use TaskScheduler\Exception\InvalidArgumentException;
23
use TaskScheduler\Exception\JobNotFoundException;
24
25
class Scheduler
26
{
27
    /**
28
     * Job options.
29
     */
30
    public const OPTION_AT = 'at';
31
    public const OPTION_INTERVAL = 'interval';
32
    public const OPTION_RETRY = 'retry';
33
    public const OPTION_RETRY_INTERVAL = 'retry_interval';
34
    public const OPTION_FORCE_SPAWN = 'force_spawn';
35
    public const OPTION_TIMEOUT = 'timeout';
36
    public const OPTION_ID = 'id';
37
    public const OPTION_IGNORE_DATA = 'ignore_data';
38
39
    /**
40
     * Default job options.
41
     */
42
    public const OPTION_DEFAULT_AT = 'default_at';
43
    public const OPTION_DEFAULT_INTERVAL = 'default_interval';
44
    public const OPTION_DEFAULT_RETRY = 'default_retry';
45
    public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval';
46
    public const OPTION_DEFAULT_TIMEOUT = 'default_timeout';
47
48
    /**
49
     * Queue options.
50
     */
51
    public const OPTION_JOB_QUEUE = 'job_queue';
52
    public const OPTION_JOB_QUEUE_SIZE = 'job_queue_size';
53
    public const OPTION_EVENT_QUEUE = 'event_queue';
54
    public const OPTION_EVENT_QUEUE_SIZE = 'event_queue_size';
55
56
    /**
57
     * MongoDB type map.
58
     */
59
    public const TYPE_MAP = [
60
        'document' => 'array',
61
        'root' => 'array',
62
        'array' => 'array',
63
    ];
64
65
    /**
66
     * Database.
67
     *
68
     * @var Database
69
     */
70
    protected $db;
71
72
    /**
73
     * LoggerInterface.
74
     *
75
     * @var LoggerInterface
76
     */
77
    protected $logger;
78
79
    /**
80
     * Job Collection name.
81
     *
82
     * @var string
83
     */
84
    protected $job_queue = 'taskscheduler.jobs';
85
86
    /**
87
     * Event Collection name.
88
     *
89
     * @var string
90
     */
91
    protected $event_queue = 'taskscheduler.events';
92
93
    /**
94
     * Unix time.
95
     *
96
     * @var int
97
     */
98
    protected $default_at = 0;
99
100
    /**
101
     * Default interval (secconds).
102
     *
103
     * @var int
104
     */
105
    protected $default_interval = 0;
106
107
    /**
108
     * Default retry.
109
     *
110
     * @var int
111
     */
112
    protected $default_retry = 0;
113
114
    /**
115
     * Default retry interval (secconds).
116
     *
117
     * @var int
118
     */
119
    protected $default_retry_interval = 300;
120
121
    /**
122
     * Default timeout.
123
     *
124
     * @var int
125
     */
126
    protected $default_timeout = 0;
127
128
    /**
129
     * Job Queue size.
130
     *
131
     * @var int
132
     */
133
    protected $job_queue_size = 1000000;
134
135
    /**
136
     * Event Queue size.
137
     *
138
     * @var int
139
     */
140
    protected $event_queue_size = 5000000;
141
142
    /**
143
     * Events queue.
144
     *
145
     * @var MessageQueue
146
     */
147
    protected $events;
148
149
    /**
150
     * Init queue.
151
     */
152 57
    public function __construct(Database $db, LoggerInterface $logger, array $config = [])
153
    {
154 57
        $this->db = $db;
155 57
        $this->logger = $logger;
156 57
        $this->setOptions($config);
157 57
        $this->events = new MessageQueue($db, $this->getEventQueue(), $this->getEventQueueSize(), $logger);
158 57
    }
159
160
    /**
161
     * Set options.
162
     */
163 57
    public function setOptions(array $config = []): self
164
    {
165 57
        foreach ($config as $option => $value) {
166
            switch ($option) {
167 4
                case self::OPTION_JOB_QUEUE:
168 4
                case self::OPTION_EVENT_QUEUE:
169 2
                    $this->{$option} = (string) $value;
170
171 2
                break;
172 4
                case self::OPTION_DEFAULT_AT:
173 4
                case self::OPTION_DEFAULT_RETRY_INTERVAL:
174 4
                case self::OPTION_DEFAULT_INTERVAL:
175 4
                case self::OPTION_DEFAULT_RETRY:
176 3
                case self::OPTION_DEFAULT_TIMEOUT:
177 3
                case self::OPTION_JOB_QUEUE_SIZE:
178 3
                case self::OPTION_EVENT_QUEUE_SIZE:
179 3
                    $this->{$option} = (int) $value;
180
181 3
                break;
182
                default:
183 4
                    throw new InvalidArgumentException('invalid option '.$option.' given');
184
            }
185
        }
186
187 57
        return $this;
188
    }
189
190
    /**
191
     * Get job Queue size.
192
     */
193 31
    public function getJobQueueSize(): int
194
    {
195 31
        return $this->job_queue_size;
196
    }
197
198
    /**
199
     * Get event Queue size.
200
     */
201 57
    public function getEventQueueSize(): int
202
    {
203 57
        return $this->event_queue_size;
204
    }
205
206
    /**
207
     * Get job collection name.
208
     */
209 31
    public function getJobQueue(): string
210
    {
211 31
        return $this->job_queue;
212
    }
213
214
    /**
215
     * Get event collection name.
216
     */
217 57
    public function getEventQueue(): string
218
    {
219 57
        return $this->event_queue;
220
    }
221
222
    /**
223
     * Get job by Id.
224
     */
225 26
    public function getJob(ObjectId $id): Process
226
    {
227 26
        $result = $this->db->{$this->job_queue}->findOne([
228 26
            '_id' => $id,
229
        ], [
230 26
            'typeMap' => self::TYPE_MAP,
231
        ]);
232
233 26
        if (null === $result) {
234 1
            throw new JobNotFoundException('job '.$id.' was not found');
235
        }
236
237 25
        return new Process($result, $this, $this->events);
238
    }
239
240
    /**
241
     * Cancel job.
242
     */
243 5
    public function cancelJob(ObjectId $id): bool
244
    {
245 5
        $result = $this->updateJob($id, JobInterface::STATUS_CANCELED);
246
247 5
        if (1 !== $result->getMatchedCount()) {
248 1
            throw new JobNotFoundException('job '.$id.' was not found');
249
        }
250
251 4
        $this->db->{$this->event_queue}->insertOne([
252 4
            'job' => $id,
253
            'status' => JobInterface::STATUS_CANCELED,
254 4
            'timestamp' => new UTCDateTime(),
255
        ]);
256
257 4
        return true;
258
    }
259
260
    /**
261
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
262
     */
263 13
    public function getJobs(array $query = []): Generator
264
    {
265 13
        if (0 === count($query)) {
266 12
            $query = ['status' => ['$in' => [
267
                JobInterface::STATUS_WAITING,
268
                JobInterface::STATUS_PROCESSING,
269
                JobInterface::STATUS_POSTPONED,
270
            ]]];
271
        }
272
273 13
        $result = $this->db->{$this->job_queue}->find($query, [
274 13
            'typeMap' => self::TYPE_MAP,
275
        ]);
276
277 13
        foreach ($result as $job) {
278 12
            yield new Process($job, $this, $this->events);
279
        }
280 13
    }
281
282
    /**
283
     * Add job to queue.
284
     */
285 44
    public function addJob(string $class, $data, array $options = []): Process
286
    {
287 44
        $document = $this->prepareInsert($class, $data, $options);
288
289 40
        $result = $this->db->{$this->job_queue}->insertOne($document);
290 40
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
291 40
            'category' => get_class($this),
292 40
            'params' => $options,
293 40
            'data' => $data,
294
        ]);
295
296 40
        $this->db->{$this->event_queue}->insertOne([
297 40
            'job' => $result->getInsertedId(),
298
            'status' => JobInterface::STATUS_WAITING,
299 40
            'timestamp' => new UTCDateTime(),
300
        ]);
301
302 40
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getInsertedId()], [
303 40
            'typeMap' => self::TYPE_MAP,
304
        ]);
305
306 40
        $process = new Process($document, $this, $this->events);
307
308 40
        return $process;
309
    }
310
311
    /**
312
     * Only add job if not in queue yet.
313
     */
314 5
    public function addJobOnce(string $class, $data, array $options = []): Process
315
    {
316
        $filter = [
317 5
            'class' => $class,
318
            '$or' => [
319
                ['status' => JobInterface::STATUS_WAITING],
320
                ['status' => JobInterface::STATUS_POSTPONED],
321
                ['status' => JobInterface::STATUS_PROCESSING],
322
            ],
323
        ];
324
325 5
        $requested = $options;
326 5
        $document = $this->prepareInsert($class, $data, $options);
327
328 5
        if (true !== $options[self::OPTION_IGNORE_DATA]) {
329 4
            $filter = ['data' => $data] + $filter;
330
        }
331
332 5
        $result = $this->db->{$this->job_queue}->updateOne($filter, ['$setOnInsert' => $document], [
333 5
            'upsert' => true,
334
            '$isolated' => true,
335
        ]);
336
337 5
        if ($result->getMatchedCount() > 0) {
338 4
            $document = $this->db->{$this->job_queue}->findOne($filter, [
339 4
                'typeMap' => self::TYPE_MAP,
340
            ]);
341
342 4
            if (array_intersect_key($document['options'], $requested) !== $requested || ($data !== $document['data'] && true === $options[self::OPTION_IGNORE_DATA])) {
343 2
                $this->logger->debug('job ['.$document['_id'].'] options/data changed, reschedule new job', [
344 2
                    'category' => get_class($this),
345 2
                    'data' => $data,
346
                ]);
347
348 2
                $this->cancelJob($document['_id']);
349
350 2
                return $this->addJobOnce($class, $data, $options);
351
            }
352
353 2
            return new Process($document, $this, $this->events);
354
        }
355
356 5
        $this->logger->debug('queue job ['.$result->getUpsertedId().'] added to ['.$class.']', [
357 5
            'category' => get_class($this),
358 5
            'params' => $options,
359 5
            'data' => $data,
360
        ]);
361
362 5
        $this->db->{$this->event_queue}->insertOne([
363 5
            'job' => $result->getUpsertedId(),
364
            'status' => JobInterface::STATUS_WAITING,
365 5
            'timestamp' => new UTCDateTime(),
366
        ]);
367
368 5
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getUpsertedId()], [
369 5
            'typeMap' => self::TYPE_MAP,
370
        ]);
371
372 5
        return new Process($document, $this, $this->events);
373
    }
374
375
    /**
376
     * Listen for events.
377
     */
378 1
    public function listen(Closure $callback, array $query = []): self
379
    {
380 1
        if (0 === count($query)) {
381
            $query = [
382
                'timestamp' => ['$gte' => new UTCDateTime()],
383
            ];
384
        }
385
386 1
        $cursor = $this->events->getCursor($query);
387
388 1
        while (true) {
389 1
            if (null === $cursor->current()) {
390
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

390
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
391
                    $this->logger->error('events queue cursor is dead, is it a capped collection?', [
392
                        'category' => get_class($this),
393
                    ]);
394
395
                    $this->events->create();
396
397
                    return $this->listen($callback, $query);
398
                }
399
400
                $this->events->next($cursor, function () use ($callback, $query) {
401
                    return $this->listen($callback, $query);
402
                });
403
404
                continue;
405
            }
406
407 1
            $result = $cursor->current();
408 1
            $this->events->next($cursor, function () use ($callback, $query) {
409
                $this->listen($callback, $query);
410 1
            });
411
412 1
            $process = new Process($result, $this, $this->events);
413 1
            if (true === $callback($process)) {
414 1
                return $this;
415
            }
416
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
417
    }
418
419
    /**
420
     * Prepare insert.
421
     */
422 49
    protected function prepareInsert(string $class, $data, array &$options = []): array
423
    {
424
        $defaults = [
425 49
            self::OPTION_AT => $this->default_at,
426 49
            self::OPTION_INTERVAL => $this->default_interval,
427 49
            self::OPTION_RETRY => $this->default_retry,
428 49
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
429 49
            self::OPTION_FORCE_SPAWN => false,
430 49
            self::OPTION_TIMEOUT => $this->default_timeout,
431 49
            self::OPTION_IGNORE_DATA => false,
432
        ];
433
434 49
        $options = array_merge($defaults, $options);
435 49
        $options = SchedulerValidator::validateOptions($options);
436
437
        $document = [
438 45
            'class' => $class,
439
            'status' => JobInterface::STATUS_WAITING,
440 45
            'created' => new UTCDateTime(),
441 45
            'started' => new UTCDateTime(0),
442 45
            'ended' => new UTCDateTime(0),
443 45
            'worker' => new ObjectId(),
444 45
            'data' => $data,
445
        ];
446
447 45
        if (isset($options[self::OPTION_ID])) {
448 2
            $id = $options[self::OPTION_ID];
449 2
            unset($options[self::OPTION_ID]);
450 2
            $document['_id'] = $id;
451
        }
452
453 45
        $document['options'] = $options;
454
455 45
        return $document;
456
    }
457
458
    /**
459
     * Update job status.
460
     */
461 5
    protected function updateJob(ObjectId $id, int $status): UpdateResult
462
    {
463 5
        $result = $this->db->{$this->job_queue}->updateMany([
464 5
            '_id' => $id,
465
            '$isolated' => true,
466
        ], [
467
            '$set' => [
468 5
                'status' => $status,
469
            ],
470
        ]);
471
472 5
        return $result;
473
    }
474
}
475