Completed
Push — master ( ed1e2a...e1c7c2 )
by Raffael
02:40
created

Queue::queueJob()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 4.125

Importance

Changes 0
Metric Value
dl 0
loc 13
c 0
b 0
f 0
ccs 4
cts 8
cp 0.5
rs 9.4285
cc 3
eloc 7
nc 3
nop 1
crap 4.125
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Exception\ConnectionException;
20
use MongoDB\Driver\Exception\RuntimeException;
21
use MongoDB\Operation\Find;
22
use Psr\Container\ContainerInterface;
23
use Psr\Log\LoggerInterface;
24
25
class Queue
26
{
27
    /**
28
     * Job status.
29
     */
30
    const STATUS_WAITING = 0;
31
    const STATUS_POSTPONED = 1;
32
    const STATUS_PROCESSING = 2;
33
    const STATUS_DONE = 3;
34
    const STATUS_FAILED = 4;
35
    const STATUS_CANCELED = 5;
36
37
    /**
38
     * Scheduler.
39
     *
40
     * @param Scheduler
41
     */
42
    protected $scheduler;
43
44
    /**
45
     * Database.
46
     *
47
     * @var Database
48
     */
49
    protected $db;
50
51
    /**
52
     * LoggerInterface.
53
     *
54
     * @var LoggerInterface
55
     */
56
    protected $logger;
57
58
    /**
59
     * Local queue.
60
     *
61
     * @var array
62
     */
63
    protected $queue = [];
64
65
    /**
66
     * Collection name.
67
     *
68
     * @var string
69
     */
70
    protected $collection_name = 'queue';
71
72
    /**
73
     * Container.
74
     *
75
     * @var ContainerInterface
76
     */
77
    protected $container;
78
79
    /**
80
     * Current processing job.
81
     *
82
     * @var array
83
     */
84
    protected $current_job;
85
86
    /**
87
     * Init queue.
88
     *
89
     * @param Scheduler          $scheduler
90
     * @param Database           $db
91
     * @param LoggerInterface    $logger
92
     * @param ContainerInterface $container
93
     */
94 28
    public function __construct(Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
95
    {
96 28
        $this->scheduler = $scheduler;
97 28
        $this->db = $db;
98 28
        $this->logger = $logger;
99 28
        $this->container = $container;
100 28
        $this->collection_name = $scheduler->getCollection();
101 28
    }
102
103
    /**
104
     * Execute job queue as endless loop.
105
     */
106
    public function process()
107
    {
108
        $cursor = $this->getCursor();
109
        $this->catchSignal();
110
111
        while (true) {
112
            $this->processLocalQueue();
113
114
            if (null === $cursor->current()) {
115
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

115
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
116
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
117
                        'category' => get_class($this),
118
                    ]);
119
120
                    $this->createQueue();
121
122
                    return $this->process();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->process() targeting TaskScheduler\Queue::process() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
123
                }
124
125
                $this->retrieveNextJob($cursor);
0 ignored issues
show
Bug introduced by
$cursor of type IteratorIterator is incompatible with the type iterable expected by parameter $cursor of TaskScheduler\Queue::retrieveNextJob(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

125
                $this->retrieveNextJob(/** @scrutinizer ignore-type */ $cursor);
Loading history...
126
127
                continue;
128
            }
129
130
            $job = $cursor->current();
131
            $this->retrieveNextJob($cursor);
132
            $this->queueJob($job);
133
        }
134
    }
135
136
    /**
137
     * Execute job queue.
138
     *
139
     * @return bool
140
     */
141 2
    public function processOnce(): bool
142
    {
143 2
        $cursor = $this->getCursor(false);
144
145 1
        while (true) {
146 1
            $this->processLocalQueue();
147
148 1
            if (null === $cursor->current()) {
149 1
                if ($cursor->getInnerIterator()->isDead()) {
150
                    $this->logger->debug('all jobs were processed', [
151
                        'category' => get_class($this),
152
                    ]);
153
154
                    return false;
155
                }
156
157 1
                return true;
158
            }
159
160 1
            $job = $cursor->current();
161 1
            $cursor->next();
162 1
            $this->queueJob($job);
163
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
164
    }
165
166
    /**
167
     * Cleanup and exit.
168
     *
169
     * @param int $sig
170
     */
171
    public function cleanup(int $sig)
172
    {
173
        $this->handleSignal($sig);
174
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
175
    }
176
177
    /**
178
     * Catch signals and cleanup.
179
     *
180
     * @return Queue
181
     */
182 1
    protected function catchSignal(): self
183
    {
184 1
        pcntl_async_signals(true);
185 1
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
186 1
        pcntl_signal(SIGINT, [$this, 'cleanup']);
187
188 1
        return $this;
189
    }
190
191
    /**
192
     * Cleanup and exit.
193
     *
194
     * @param int $sig
195
     *
196
     * @return ObjectId
197
     */
198 2
    protected function handleSignal(int $sig): ?ObjectId
199
    {
200 2
        if (null === $this->current_job) {
201 2
            $this->logger->debug('received signal ['.$sig.'], no job is currently processing, exit now', [
202 2
                'category' => get_class($this),
203
            ]);
204
205 2
            return null;
206
        }
207
208
        $this->logger->debug('received signal ['.$sig.'], reschedule current processing job ['.$this->current_job['_id'].']', [
209
            'category' => get_class($this),
210
        ]);
211
212
        $this->updateJob($this->current_job['_id'], self::STATUS_CANCELED);
213
214
        return $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], [
215
            Scheduler::OPTION_AT => $this->current_job['retry_interval'],
216
            Scheduler::OPTION_INTERVAL => $this->current_job['interval'],
217
            Scheduler::OPTION_RETRY => --$this->current_job['retry'],
218
            Scheduler::OPTION_RETRY_INTERVAL => $this->current_job['retry_interval'],
219
        ]);
220
    }
221
222
    /**
223
     * Create queue and insert a dummy object to start cursor
224
     * Dummy object is required, otherwise we would get a dead cursor.
225
     *
226
     * @return Queue
227
     */
228 3
    protected function createQueue(): self
229
    {
230 3
        $this->logger->info('create new queue ['.$this->collection_name.']', [
231 3
            'category' => get_class($this),
232
        ]);
233
234
        try {
235 3
            $this->db->createCollection(
236 3
                $this->collection_name,
237
                [
238 3
                    'capped' => true,
239 3
                    'size' => $this->scheduler->getQueueSize(),
240
                ]
241
            );
242
243 2
            $this->db->{$this->collection_name}->insertOne(['class' => 'dummy']);
244 2
        } catch (RuntimeException $e) {
245 2
            if (48 !== $e->getCode()) {
246 1
                throw $e;
247
            }
248
        }
249
250 2
        return $this;
251
    }
252
253
    /**
254
     * Create queue and insert a dummy object to start cursor
255
     * Dummy object is required, otherwise we would get a dead cursor.
256
     *
257
     * @return Queue
258
     */
259 1
    protected function convertQueue(): self
260
    {
261 1
        $this->logger->info('convert existing queue collection ['.$this->collection_name.'] into a capped collection', [
262 1
            'category' => get_class($this),
263
        ]);
264
265 1
        $this->db->command([
266 1
            'convertToCapped' => $this->collection_name,
267 1
            'size' => $this->scheduler->getQueueSize(),
268
        ]);
269
270 1
        $this->db->{$this->collection_name}->insertOne(['class' => 'dummy']);
271
272 1
        return $this;
273
    }
274
275
    /**
276
     * Retrieve next job.
277
     *
278
     * @param iterable $cursor
279
     */
280 1
    protected function retrieveNextJob(Iterable $cursor)
281
    {
282
        try {
283 1
            $cursor->next();
284
        } catch (RuntimeException $e) {
285
            $this->logger->error('job queue cursor failed to retrieve next job, restart daemon', [
286
                'category' => get_class($this),
287
                'exception' => $e,
288
            ]);
289
290
            $this->process();
291
        }
292 1
    }
293
294
    /**
295
     * Queue job.
296
     *
297
     * @param array $job
298
     */
299 1
    protected function queueJob(array $job): bool
300
    {
301 1
        if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING)) {
302 1
            $this->processJob($job);
303
        } elseif (self::STATUS_POSTPONED === $job['status']) {
304
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
305
                'category' => get_class($this),
306
            ]);
307
308
            $this->queue[] = $job;
309
        }
310
311 1
        return true;
312
    }
313
314
    /**
315
     * Get cursor.
316
     *
317
     * @param bool $tailable
318
     *
319
     * @return IteratorIterator
320
     */
321 6
    protected function getCursor(bool $tailable = true): IteratorIterator
322
    {
323
        $options = [
324 6
            'typeMap' => [
325
                'document' => 'array',
326
                'root' => 'array',
327
                'array' => 'array',
328
            ],
329
        ];
330
331 6
        if (true === $tailable) {
332 4
            $options['cursorType'] = Find::TAILABLE;
333 4
            $options['noCursorTimeout'] = true;
334
        }
335
336
        try {
337 6
            $cursor = $this->db->{$this->collection_name}->find([
338
                '$or' => [
339 6
                    ['status' => self::STATUS_WAITING],
340 6
                    ['status' => self::STATUS_POSTPONED],
341
                ],
342 6
            ], $options);
343 1
        } catch (ConnectionException $e) {
344 1
            if (2 === $e->getCode()) {
345
                $this->convertQueue();
346
347
                return $this->getCursor($tailable);
348
            }
349
350 1
            throw $e;
351
        }
352
353 5
        $iterator = new IteratorIterator($cursor);
354 5
        $iterator->rewind();
355
356 5
        return $iterator;
357
    }
358
359
    /**
360
     * Update job status.
361
     *
362
     * @param ObjectId $id
363
     * @param int      $status
364
     * @param mixed    $from_status
365
     *
366
     * @return bool
367
     */
368 5
    protected function collectJob(ObjectId $id, int $status, $from_status = self::STATUS_WAITING): bool
369
    {
370
        $set = [
371 5
             'status' => $status,
372
        ];
373
374 5
        if (self::STATUS_PROCESSING === $status) {
375 5
            $set['started'] = new UTCDateTime();
376
        }
377
378 5
        $result = $this->db->{$this->collection_name}->updateMany([
379 5
            '_id' => $id,
380 5
            'status' => $from_status,
381
            '$isolated' => true,
382
        ], [
383 5
            '$set' => $set,
384
        ]);
385
386 5
        if (1 === $result->getModifiedCount()) {
387 5
            $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
388 5
                'category' => get_class($this),
389
            ]);
390
391 5
            return true;
392
        }
393
394 1
        $this->logger->debug('job ['.$id.'] is already collected with status ['.$status.']', [
395 1
            'category' => get_class($this),
396
        ]);
397
398 1
        return false;
399
    }
400
401
    /**
402
     * Update job status.
403
     *
404
     * @param ObjectId $id
405
     * @param int      $status
406
     *
407
     * @return bool
408
     */
409 13
    protected function updateJob(ObjectId $id, int $status): bool
410
    {
411
        $set = [
412 13
            'status' => $status,
413
        ];
414
415 13
        if ($status >= self::STATUS_DONE) {
416 10
            $set['ended'] = new UTCDateTime();
417
        }
418
419 13
        $result = $this->db->{$this->collection_name}->updateMany([
420 13
            '_id' => $id,
421
            '$isolated' => true,
422
        ], [
423 13
            '$set' => $set,
424
        ]);
425
426 13
        return $result->isAcknowledged();
427
    }
428
429
    /**
430
     * Check local queue for postponed jobs.
431
     *
432
     * @return bool
433
     */
434 4
    protected function processLocalQueue(): bool
435
    {
436 4
        $now = new UTCDateTime();
437 4
        foreach ($this->queue as $key => $job) {
438 3
            if ($job['at'] <= $now) {
439 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
440 2
                    'category' => get_class($this),
441
                ]);
442
443 2
                unset($this->queue[$key]);
444 2
                $job['at'] = null;
445
446 2
                if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING, self::STATUS_POSTPONED)) {
447 3
                    $this->processJob($job);
448
                }
449
            }
450
        }
451
452 4
        return true;
453
    }
454
455
    /**
456
     * Process job.
457
     *
458
     * @param array $job
459
     *
460
     * @return ObjectId
461
     */
462 8
    protected function processJob(array $job): ObjectId
463
    {
464 8
        if ($job['at'] instanceof UTCDateTime) {
465 1
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
0 ignored issues
show
Bug introduced by
It seems like $job['_id'] can also be of type MongoDB\BSON\UTCDateTime; however, parameter $id of TaskScheduler\Queue::updateJob() does only seem to accept MongoDB\BSON\ObjectId, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

465
            $this->updateJob(/** @scrutinizer ignore-type */ $job['_id'], self::STATUS_POSTPONED);
Loading history...
466 1
            $this->queue[] = $job;
467
468 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
469 1
                'category' => get_class($this),
470
            ]);
471
472 1
            return $job['_id'];
0 ignored issues
show
Bug Best Practice introduced by
The expression return $job['_id'] could return the type MongoDB\BSON\UTCDateTime which is incompatible with the type-hinted return MongoDB\BSON\ObjectId. Consider adding an additional type-check to rule them out.
Loading history...
473
        }
474
475 7
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
476 7
            'category' => get_class($this),
477 7
            'params' => $job['data'],
478
        ]);
479
480 7
        $this->current_job = $job;
481
482
        try {
483 7
            $this->executeJob($job);
484 2
            $this->current_job = null;
485 5
        } catch (\Exception $e) {
486 5
            $this->logger->error('failed execute job ['.$job['_id'].']', [
487 5
                'category' => get_class($this),
488 5
                'exception' => $e,
489
            ]);
490
491 5
            $this->updateJob($job['_id'], self::STATUS_FAILED);
492 5
            $this->current_job = null;
493
494 5
            if ($job['retry'] >= 0) {
495 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
496 1
                    'category' => get_class($this),
497
                ]);
498
499 1
                return $this->scheduler->addJob($job['class'], $job['data'], [
500 1
                    Scheduler::OPTION_AT => time() + $job['retry_interval'],
501 1
                    Scheduler::OPTION_INTERVAL => $job['interval'],
502 1
                    Scheduler::OPTION_RETRY => --$job['retry'],
503 1
                    Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
504
                ]);
505
            }
506
        }
507
508 6
        if ($job['interval'] >= 0) {
509 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['interval'].'s]', [
510 1
                'category' => get_class($this),
511
            ]);
512
513 1
            return $this->scheduler->addJob($job['class'], $job['data'], [
514 1
                Scheduler::OPTION_AT => time() + $job['interval'],
515 1
                Scheduler::OPTION_INTERVAL => $job['interval'],
516 1
                Scheduler::OPTION_RETRY => $job['retry'],
517 1
                Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
518
            ]);
519
        }
520
521 5
        return $job['_id'];
522
    }
523
524
    /**
525
     * Execute job.
526
     *
527
     * @param array $job
528
     *
529
     * @return bool
530
     */
531 11
    protected function executeJob(array $job): bool
532
    {
533 11
        if (!class_exists($job['class'])) {
534 4
            throw new Exception\InvalidJob('job class does not exists');
535
        }
536
537 7
        if (null === $this->container) {
538 6
            $instance = new $job['class']();
539
        } else {
540 1
            $instance = $this->container->get($job['class']);
541
        }
542
543 7
        if (!($instance instanceof JobInterface)) {
544
            throw new Exception\InvalidJob('job must implement JobInterface');
545
        }
546
547 7
        $instance->setData($job['data'])
548 7
            ->start();
549
550 4
        return $this->updateJob($job['_id'], self::STATUS_DONE);
551
    }
552
}
553