Passed
Branch dev (b82c9a)
by Raffael
05:11
created

Scheduler::getQueueSize()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 1
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use Closure;
16
use Generator;
17
use MongoDB\BSON\ObjectId;
18
use MongoDB\BSON\UTCDateTime;
19
use MongoDB\Database;
20
use MongoDB\UpdateResult;
21
use Psr\Log\LoggerInterface;
22
use TaskScheduler\Exception\InvalidArgumentException;
23
use TaskScheduler\Exception\JobNotFoundException;
24
25
class Scheduler
26
{
27
    /**
28
     * Job options.
29
     */
30
    public const OPTION_AT = 'at';
31
    public const OPTION_INTERVAL = 'interval';
32
    public const OPTION_RETRY = 'retry';
33
    public const OPTION_RETRY_INTERVAL = 'retry_interval';
34
    public const OPTION_IGNORE_MAX_CHILDREN = 'ignore_max_children';
35
    public const OPTION_DEFAULT_AT = 'default_at';
36
    public const OPTION_DEFAULT_INTERVAL = 'default_interval';
37
    public const OPTION_DEFAULT_RETRY = 'default_retry';
38
    public const OPTION_DEFAULT_RETRY_INTERVAL = 'default_retry_interval';
39
    public const OPTION_JOB_QUEUE = 'job_queue';
40
    public const OPTION_JOB_QUEUE_SIZE = 'job_queue_size';
41
    public const OPTION_EVENT_QUEUE = 'event_queue';
42
    public const OPTION_EVENT_QUEUE_SIZE = 'event_queue_size';
43
44
    /**
45
     * MongoDB type map.
46
     */
47
    public const TYPE_MAP = [
48
        'document' => 'array',
49
        'root' => 'array',
50
        'array' => 'array',
51
    ];
52
53
    /**
54
     * Database.
55
     *
56
     * @var Database
57
     */
58
    protected $db;
59
60
    /**
61
     * LoggerInterface.
62
     *
63
     * @var LoggerInterface
64
     */
65
    protected $logger;
66
67
    /**
68
     * Job Collection name.
69
     *
70
     * @var string
71
     */
72
    protected $job_queue = 'takscheduler.jobs';
73
74
    /**
75
     * Event Collection name.
76
     *
77
     * @var string
78
     */
79
    protected $event_queue = 'takscheduler.events';
80
81
    /**
82
     * Default at (Secconds from now).
83
     *
84
     * @var int
85
     */
86
    protected $default_at = 0;
87
88
    /**
89
     * Default interval (secconds).
90
     *
91
     * @var int
92
     */
93
    protected $default_interval = -1;
94
95
    /**
96
     * Default retry.
97
     *
98
     * @var int
99
     */
100
    protected $default_retry = -1;
101
102
    /**
103
     * Default retry interval (secconds).
104
     *
105
     * @var int
106
     */
107
    protected $default_retry_interval = 300;
108
109
    /**
110
     * Job Queue size.
111
     *
112
     * @var int
113
     */
114
    protected $job_queue_size = 100000;
115
116
    /**
117
     * Event Queue size.
118
     *
119
     * @var int
120
     */
121
    protected $event_queue_size = 500000;
122
123
    /**
124
     * Events queue.
125
     *
126
     * @var MessageQueue
127
     */
128
    protected $events;
129
130
    /**
131
     * Init queue.
132
     */
133 49
    public function __construct(Database $db, LoggerInterface $logger, array $config = [])
134
    {
135 49
        $this->db = $db;
136 49
        $this->logger = $logger;
137 49
        $this->setOptions($config);
138 49
        $this->events = new MessageQueue($db, $this->getEventQueue(), $this->getEventQueueSize(), $logger);
139 49
    }
140
141
    /**
142
     * Set options.
143
     *
144
     *
145
     * @return Scheduler
146
     */
147 49
    public function setOptions(array $config = []): self
148
    {
149 49
        foreach ($config as $option => $value) {
150
            switch ($option) {
151 4
                case self::OPTION_JOB_QUEUE:
152 4
                case self::OPTION_EVENT_QUEUE:
153 2
                    $this->{$option} = (string) $value;
154
155 2
                break;
156 4
                case self::OPTION_DEFAULT_AT:
157 4
                case self::OPTION_DEFAULT_RETRY_INTERVAL:
158 4
                case self::OPTION_DEFAULT_INTERVAL:
159 4
                case self::OPTION_DEFAULT_RETRY:
160 3
                case self::OPTION_JOB_QUEUE_SIZE:
161 3
                case self::OPTION_EVENT_QUEUE_SIZE:
162 3
                    $this->{$option} = (int) $value;
163
164 3
                break;
165
                default:
166 4
                    throw new InvalidArgumentException('invalid option '.$option.' given');
167
            }
168
        }
169
170 49
        return $this;
171
    }
172
173
    /**
174
     * Get job Queue size.
175
     */
176 31
    public function getJobQueueSize(): int
177
    {
178 31
        return $this->job_queue_size;
179
    }
180
181
    /**
182
     * Get event Queue size.
183
     */
184 49
    public function getEventQueueSize(): int
185
    {
186 49
        return $this->event_queue_size;
187
    }
188
189
    /**
190
     * Get job collection name.
191
     */
192 31
    public function getJobQueue(): string
193
    {
194 31
        return $this->job_queue;
195
    }
196
197
    /**
198
     * Get event collection name.
199
     */
200 49
    public function getEventQueue(): string
201
    {
202 49
        return $this->event_queue;
203
    }
204
205
    /**
206
     * Get job by Id.
207
     *
208
     * @param ObjectId
209
     */
210 17
    public function getJob(ObjectId $id): Process
211
    {
212 17
        $result = $this->db->{$this->job_queue}->findOne([
213 17
            '_id' => $id,
214
        ], [
215 17
            'typeMap' => self::TYPE_MAP,
216
        ]);
217
218 17
        if (null === $result) {
219 1
            throw new JobNotFoundException('job '.$id.' was not found');
220
        }
221
222 16
        return new Process($result, $this, $this->events);
223
    }
224
225
    /**
226
     * Cancel job.
227
     */
228 3
    public function cancelJob(ObjectId $id): bool
229
    {
230 3
        $result = $this->updateJob($id, JobInterface::STATUS_CANCELED);
231
232 3
        if (1 !== $result->getModifiedCount()) {
233 1
            throw new JobNotFoundException('job '.$id.' was not found');
234
        }
235
236 2
        return true;
237
    }
238
239
    /**
240
     * Get jobs (Pass a filter which contains job status, by default all active jobs get returned).
241
     */
242 4
    public function getJobs(array $query = []): Generator
243
    {
244 4
        if (0 === count($query)) {
245 3
            $query = ['status' => ['$in' => [
246
                JobInterface::STATUS_WAITING,
247
                JobInterface::STATUS_PROCESSING,
248
                JobInterface::STATUS_POSTPONED,
249
            ]]];
250
        }
251
252 4
        $result = $this->db->{$this->job_queue}->find($query, [
253 4
            'typeMap' => self::TYPE_MAP,
254
        ]);
255
256 4
        foreach ($result as $job) {
257 3
            yield new Process($job, $this, $this->events);
258
        }
259 4
    }
260
261
    /**
262
     * Add job to queue.
263
     */
264 33
    public function addJob(string $class, $data, array $options = []): Process
265
    {
266
        $defaults = [
267 33
            self::OPTION_AT => $this->default_at,
268 33
            self::OPTION_INTERVAL => $this->default_interval,
269 33
            self::OPTION_RETRY => $this->default_retry,
270 33
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
271 33
            self::OPTION_IGNORE_MAX_CHILDREN => false,
272
        ];
273
274 33
        $options = array_merge($defaults, $options);
275 33
        $this->validateOptions($options);
276 31
        $at = null;
277
278 31
        if ($options[self::OPTION_AT] > 0) {
279 8
            $at = new UTCDateTime($options[self::OPTION_AT] * 1000);
280
        }
281
282 31
        $result = $this->db->{$this->job_queue}->insertOne([
283 31
            'class' => $class,
284
            'status' => JobInterface::STATUS_WAITING,
285 31
            'created' => new UTCDateTime(),
286 31
            'started' => new UTCDateTime(0),
287 31
            'ended' => new UTCDateTime(0),
288 31
            'at' => $at,
289 31
            'retry' => $options[self::OPTION_RETRY],
290 31
            'retry_interval' => $options[self::OPTION_RETRY_INTERVAL],
291 31
            'interval' => $options[self::OPTION_INTERVAL],
292 31
            'ignore_max_children' => $options[self::OPTION_IGNORE_MAX_CHILDREN],
293 31
            'data' => $data,
294 31
        ], ['$isolated' => true]);
295
296 31
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
297 31
            'category' => get_class($this),
298 31
            'params' => $options,
299 31
            'data' => $data,
300
        ]);
301
302 31
        $this->db->{$this->event_queue}->insertOne([
303 31
            'job' => $result->getInsertedId(),
304
            'status' => JobInterface::STATUS_WAITING,
305 31
            'timestamp' => new UTCDateTime(),
306
        ]);
307
308 31
        $document = $this->db->{$this->job_queue}->findOne(['_id' => $result->getInsertedId()], [
309 31
            'typeMap' => self::TYPE_MAP,
310
        ]);
311
312 31
        $process = new Process($document, $this, $this->events);
313
314 31
        return $process;
315
    }
316
317
    /**
318
     * Only add job if not in queue yet.
319
     */
320 2
    public function addJobOnce(string $class, $data, array $options = []): Process
321
    {
322
        $filter = [
323 2
            'class' => $class,
324 2
            'data' => $data,
325
            '$or' => [
326
                ['status' => JobInterface::STATUS_WAITING],
327
                ['status' => JobInterface::STATUS_POSTPONED],
328
            ],
329
        ];
330
331 2
        $result = $this->db->{$this->job_queue}->findOne($filter, [
332 2
            'typeMap' => self::TYPE_MAP,
333
        ]);
334
335 2
        if (null !== $result && array_intersect_key($result, $options) !== $options) {
336 1
            $this->logger->debug('job ['.$result['_id'].'] options changed, reschedule new job', [
337 1
                'category' => get_class($this),
338 1
                'data' => $data,
339
            ]);
340
341 1
            $this->cancelJob($result['_id']);
342 1
            $result = null;
343
        }
344
345 2
        if (null === $result) {
346 2
            return $this->addJob($class, $data, $options);
347
        }
348
349 1
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
350 1
            'category' => get_class($this),
351 1
            'data' => $data,
352
        ]);
353
354 1
        return new Process($result, $this, $this->events);
355
    }
356
357
    /**
358
     * Listen for events.
359
     */
360 1
    public function listen(Closure $callback, array $query = []): self
361
    {
362 1
        $cursor = $this->events->getCursor($query);
363
364 1
        while (true) {
365 1
            if (null === $cursor->current()) {
366
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

366
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
367
                    $this->logger->error('events queue cursor is dead, is it a capped collection?', [
368
                        'category' => get_class($this),
369
                    ]);
370
371
                    $this->events->create();
372
373
                    return $this->listen($callback, $query);
374
                }
375
376
                $this->events->next($cursor, function () use ($callback, $query) {
377
                    return $this->listen($callback, $query);
378
                });
379
380
                continue;
381
            }
382
383 1
            $result = $cursor->current();
384 1
            $this->events->next($cursor, function () use ($callback, $query) {
385
                $this->listen($callback, $query);
386 1
            });
387
388 1
            $process = new Process($result, $this, $this->events);
389 1
            if (true === $callback($process)) {
390 1
                return $this;
391
            }
392
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return TaskScheduler\Scheduler. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
393
    }
394
395
    /**
396
     * Validate given job options.
397
     */
398 33
    protected function validateOptions(array $options): self
399
    {
400 33
        foreach ($options as $option => $value) {
401
            switch ($option) {
402 33
                case self::OPTION_AT:
403 33
                case self::OPTION_INTERVAL:
404 33
                case self::OPTION_RETRY:
405 32
                case self::OPTION_RETRY_INTERVAL:
406 33
                    if (!is_int($value)) {
407 1
                        throw new InvalidArgumentException('option '.$option.' must be an integer');
408
                    }
409
410 33
                break;
411 32
                case self::OPTION_IGNORE_MAX_CHILDREN:
412 32
                    if (!is_bool($value)) {
413
                        throw new InvalidArgumentException('option '.$option.' must be a boolean');
414
                    }
415
416 32
                break;
417
                default:
418 33
                    throw new InvalidArgumentException('invalid option '.$option.' given');
419
            }
420
        }
421
422 31
        return $this;
423
    }
424
425
    /**
426
     * Update job status.
427
     */
428 3
    protected function updateJob(ObjectId $id, int $status): UpdateResult
429
    {
430 3
        $result = $this->db->{$this->job_queue}->updateMany([
431 3
            '_id' => $id,
432
            '$isolated' => true,
433
        ], [
434
            '$set' => [
435 3
                'status' => $status,
436
            ],
437
        ]);
438
439 3
        return $result;
440
    }
441
}
442