Completed
Push — master ( 7dab27...ecf44b )
by Raffael
03:22
created

Queue::__destroy()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 3
ccs 0
cts 2
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 1
nc 1
nop 0
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Exception\ConnectionException;
20
use MongoDB\Driver\Exception\RuntimeException;
21
use MongoDB\Operation\Find;
22
use Psr\Container\ContainerInterface;
23
use Psr\Log\LoggerInterface;
24
25
class Queue
26
{
27
    /**
28
     * Job status.
29
     */
30
    const STATUS_WAITING = 0;
31
    const STATUS_POSTPONED = 1;
32
    const STATUS_PROCESSING = 2;
33
    const STATUS_DONE = 3;
34
    const STATUS_FAILED = 4;
35
    const STATUS_CANCELED = 5;
36
37
    /**
38
     * Scheduler.
39
     *
40
     * @param Scheduler
41
     */
42
    protected $scheduler;
43
44
    /**
45
     * Database.
46
     *
47
     * @var Database
48
     */
49
    protected $db;
50
51
    /**
52
     * LoggerInterface.
53
     *
54
     * @var LoggerInterface
55
     */
56
    protected $logger;
57
58
    /**
59
     * Local queue.
60
     *
61
     * @var array
62
     */
63
    protected $queue = [];
64
65
    /**
66
     * Collection name.
67
     *
68
     * @var string
69
     */
70
    protected $collection_name = 'queue';
71
72
    /**
73
     * Container.
74
     *
75
     * @var ContainerInterface
76
     */
77
    protected $container;
78
79
    /**
80
     * Current processing job.
81
     *
82
     * @var array
83
     */
84
    protected $current_job;
85
86
    /**
87
     * Init queue.
88
     *
89
     * @param Scheduler          $scheduler
90
     * @param Database           $db
91
     * @param LoggerInterface    $logger
92
     * @param ContainerInterface $container
93
     */
94 22
    public function __construct(Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
95
    {
96 22
        $this->scheduler = $scheduler;
97 22
        $this->db = $db;
98 22
        $this->logger = $logger;
99 22
        $this->container = $container;
100 22
        $this->collection_name = $scheduler->getCollection();
101 22
    }
102
103
    /**
104
     * Cleanup and exit.
105
     */
106
    public function __destroy()
107
    {
108
        $this->cleanup(SIGTERM);
109
    }
110
111
    /**
112
     * Execute job queue as endless loop.
113
     */
114
    public function process()
115
    {
116
        $cursor = $this->getCursor();
117
        $this->catchSignal();
118
119
        while (true) {
120
            $this->processLocalQueue();
121
122
            if (null === $cursor->current()) {
123
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

123
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
124
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
125
                        'category' => get_class($this),
126
                    ]);
127
128
                    $this->createQueue();
129
130
                    return $this->process();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->process() targeting TaskScheduler\Queue::process() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
131
                }
132
133
                $this->retrieveNextJob($cursor);
0 ignored issues
show
Bug introduced by
$cursor of type IteratorIterator is incompatible with the type iterable expected by parameter $cursor of TaskScheduler\Queue::retrieveNextJob(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

133
                $this->retrieveNextJob(/** @scrutinizer ignore-type */ $cursor);
Loading history...
134
135
                continue;
136
            }
137
138
            $job = $cursor->current();
139
            $this->retrieveNextJob($cursor);
140
            $this->queueJob($job);
141
        }
142
    }
143
144
    /**
145
     * Execute job queue.
146
     *
147
     * @return bool
148
     */
149 1
    public function processOnce(): bool
150
    {
151 1
        $cursor = $this->getCursor(false);
152
153 1
        while (true) {
154 1
            $this->processLocalQueue();
155
156 1
            if (null === $cursor->current()) {
157 1
                if ($cursor->getInnerIterator()->isDead()) {
158
                    $this->logger->debug('all jobs were processed', [
159
                        'category' => get_class($this),
160
                    ]);
161
162
                    return false;
163
                }
164
165 1
                return true;
166
            }
167
168 1
            $job = $cursor->current();
169 1
            $cursor->next();
170 1
            $this->queueJob($job);
171
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
172
    }
173
174
    /**
175
     * Catch signals and cleanup.
176
     *
177
     * @return Queue
178
     */
179 1
    protected function catchSignal(): self
180
    {
181 1
        pcntl_async_signals(true);
182 1
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
183 1
        pcntl_signal(SIGINT, [$this, 'cleanup']);
184
185 1
        return $this;
186
    }
187
188
    /**
189
     * Cleanup and exit.
190
     *
191
     * @param mixed $sig
192
     *
193
     * @return ObjectId
194
     */
195 2
    protected function handleSignal($sig): ?ObjectId
196
    {
197 2
        if (null === $this->current_job) {
198 2
            $this->logger->debug('received signal ['.$sig.'], no job is currently processing, exit now', [
199 2
                'category' => get_class($this),
200
            ]);
201
202 2
            return null;
203
        }
204
205
        $this->logger->debug('received signal ['.$sig.'], reschedule current processing job ['.$this->current_job['_id'].']', [
206
            'category' => get_class($this),
207
        ]);
208
209
        $this->updateJob($this->current_job['_id'], self::STATUS_CANCELED);
210
211
        return $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], [
212
            Scheduler::OPTION_AT => $this->current_job['retry_interval'],
213
            Scheduler::OPTION_INTERVAL => $this->current_job['interval'],
214
            Scheduler::OPTION_RETRY => --$this->current_job['retry'],
215
            Scheduler::OPTION_RETRY_INTERVAL => $this->current_job['retry_interval'],
216
        ]);
217
    }
218
219
    /**
220
     * Cleanup and exit.
221
     */
222
    protected function cleanup(int $sig)
223
    {
224
        $this->handleSignal($sig);
225
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
226
    }
227
228
    /**
229
     * Create queue and insert a dummy object to start cursor
230
     * Dummy object is required, otherwise we would get a dead cursor.
231
     *
232
     * @return Queue
233
     */
234
    protected function createQueue(): self
235
    {
236
        $this->logger->info('create new queue ['.$this->collection_name.']', [
237
            'category' => get_class($this),
238
        ]);
239
240
        try {
241
            $this->db->createCollection(
242
                $this->collection_name,
243
                [
244
                    'capped' => true,
245
                    'size' => $this->scheduler->getQueueSize(),
246
                ]
247
            );
248
249
            $this->db->{$this->collection_name}->insertOne(['class' => 'dummy']);
250
        } catch (RuntimeException $e) {
251
            if (48 !== $e->getCode()) {
252
                throw $e;
253
            }
254
        }
255
256
        return $this;
257
    }
258
259
    /**
260
     * Create queue and insert a dummy object to start cursor
261
     * Dummy object is required, otherwise we would get a dead cursor.
262
     *
263
     * @return Queue
264
     */
265
    protected function convertQueue(): self
266
    {
267
        $this->logger->info('convert existing queue collection ['.$this->collection_name.'] into a capped collection', [
268
            'category' => get_class($this),
269
        ]);
270
271
        $this->db->command([
272
            'convertToCapped' => $this->collection_name,
273
            'size' => $this->scheduler->getQueueSize(),
274
        ]);
275
276
        $this->db->{$this->collection_name}->insertOne(['class' => 'dummy']);
277
278
        return $this;
279
    }
280
281
    /**
282
     * Retrieve next job.
283
     *
284
     * @param iterable $cursor
285
     */
286 1
    protected function retrieveNextJob(Iterable $cursor)
287
    {
288
        try {
289 1
            $cursor->next();
290
        } catch (RuntimeException $e) {
291
            $this->logger->error('job queue cursor failed to retrieve next job, restart daemon', [
292
                'category' => get_class($this),
293
                'exception' => $e,
294
            ]);
295
296
            $this->process();
297
        }
298 1
    }
299
300
    /**
301
     * Queue job.
302
     *
303
     * @param array $job
304
     */
305 1
    protected function queueJob(array $job): bool
306
    {
307 1
        if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING)) {
308 1
            $this->processJob($job);
309
        } elseif (self::STATUS_POSTPONED === $job['status']) {
310
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
311
                'category' => get_class($this),
312
            ]);
313
314
            $this->queue[] = $job;
315
        }
316
317 1
        return true;
318
    }
319
320
    /**
321
     * Get cursor.
322
     *
323
     * @param bool $tailable
324
     *
325
     * @return IteratorIterator
326
     */
327 4
    protected function getCursor(bool $tailable = true): IteratorIterator
328
    {
329
        $options = [
330 4
            'typeMap' => [
331
                'document' => 'array',
332
                'root' => 'array',
333
                'array' => 'array',
334
            ],
335
        ];
336
337 4
        if (true === $tailable) {
338 3
            $options['cursorType'] = Find::TAILABLE;
339 3
            $options['noCursorTimeout'] = true;
340
        }
341
342
        try {
343 4
            $cursor = $this->db->{$this->collection_name}->find([
344
                '$or' => [
345 4
                    ['status' => self::STATUS_WAITING],
346 4
                    ['status' => self::STATUS_POSTPONED],
347
                ],
348 4
            ], $options);
349
        } catch (ConnectionException $e) {
350
            if (2 === $e->getCode()) {
351
                $this->convertQueue();
352
353
                return $this->getCursor($tailable);
354
            }
355
356
            throw $e;
357
        }
358
359 4
        $iterator = new IteratorIterator($cursor);
360 4
        $iterator->rewind();
361
362 4
        return $iterator;
363
    }
364
365
    /**
366
     * Update job status.
367
     *
368
     * @param ObjectId $id
369
     * @param int      $status
370
     * @param mixed    $from_status
371
     *
372
     * @return bool
373
     */
374 5
    protected function collectJob(ObjectId $id, int $status, $from_status = self::STATUS_WAITING): bool
375
    {
376 5
        $result = $this->db->{$this->collection_name}->updateMany([
377 5
            '_id' => $id,
378 5
            'status' => $from_status,
379
            '$isolated' => true,
380
        ], [
381
            '$set' => [
382 5
                'status' => $status,
383 5
                'started' => self::STATUS_PROCESSING === $status ? new UTCDateTime() : new UTCDateTime(0),
384
            ],
385
        ]);
386
387 5
        if (1 === $result->getModifiedCount()) {
388 5
            $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
389 5
                'category' => get_class($this),
390
            ]);
391
392 5
            return true;
393
        }
394
395 1
        $this->logger->debug('job ['.$id.'] is already collected with status ['.$status.']', [
396 1
            'category' => get_class($this),
397
        ]);
398
399 1
        return false;
400
    }
401
402
    /**
403
     * Update job status.
404
     *
405
     * @param ObjectId $id
406
     * @param int      $status
407
     *
408
     * @return bool
409
     */
410 13
    protected function updateJob(ObjectId $id, int $status): bool
411
    {
412
        $set = [
413 13
            'status' => $status,
414
        ];
415
416 13
        if (self::STATUS_DONE === $status || self::STATUS_FAILED === $status) {
417 10
            $set['ended'] = new UTCDateTime();
418
        }
419
420 13
        $result = $this->db->{$this->collection_name}->updateMany([
421 13
            '_id' => $id,
422
            '$isolated' => true,
423
        ], [
424 13
            '$set' => $set,
425
        ]);
426
427 13
        return $result->isAcknowledged();
428
    }
429
430
    /**
431
     * Check local queue for postponed jobs.
432
     *
433
     * @return bool
434
     */
435 4
    protected function processLocalQueue(): bool
436
    {
437 4
        $now = new UTCDateTime();
438 4
        foreach ($this->queue as $key => $job) {
439 3
            if ($job['at'] <= $now) {
440 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
441 2
                    'category' => get_class($this),
442
                ]);
443
444 2
                unset($this->queue[$key]);
445 2
                $job['at'] = null;
446
447 2
                if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING, self::STATUS_POSTPONED)) {
448 3
                    $this->processJob($job);
449
                }
450
            }
451
        }
452
453 4
        return true;
454
    }
455
456
    /**
457
     * Process job.
458
     *
459
     * @param array $job
460
     *
461
     * @return ObjectId
462
     */
463 8
    protected function processJob(array $job): ObjectId
464
    {
465 8
        if ($job['at'] instanceof UTCDateTime) {
466 1
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
0 ignored issues
show
Bug introduced by
It seems like $job['_id'] can also be of type MongoDB\BSON\UTCDateTime; however, parameter $id of TaskScheduler\Queue::updateJob() does only seem to accept MongoDB\BSON\ObjectId, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

466
            $this->updateJob(/** @scrutinizer ignore-type */ $job['_id'], self::STATUS_POSTPONED);
Loading history...
467 1
            $this->queue[] = $job;
468
469 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
470 1
                'category' => get_class($this),
471
            ]);
472
473 1
            return $job['_id'];
0 ignored issues
show
Bug Best Practice introduced by
The expression return $job['_id'] could return the type MongoDB\BSON\UTCDateTime which is incompatible with the type-hinted return MongoDB\BSON\ObjectId. Consider adding an additional type-check to rule them out.
Loading history...
474
        }
475
476 7
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
477 7
            'category' => get_class($this),
478 7
            'params' => $job['data'],
479
        ]);
480
481 7
        $this->current_job = $job;
482
483
        try {
484 7
            $this->executeJob($job);
485 2
            $this->current_job = null;
486 5
        } catch (\Exception $e) {
487 5
            $this->logger->error('failed execute job ['.$job['_id'].']', [
488 5
                'category' => get_class($this),
489 5
                'exception' => $e,
490
            ]);
491
492 5
            $this->updateJob($job['_id'], self::STATUS_FAILED);
493 5
            $this->current_job = null;
494
495 5
            if ($job['retry'] >= 0) {
496 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
497 1
                    'category' => get_class($this),
498
                ]);
499
500 1
                return $this->scheduler->addJob($job['class'], $job['data'], [
501 1
                    Scheduler::OPTION_AT => time() + $job['retry_interval'],
502 1
                    Scheduler::OPTION_INTERVAL => $job['interval'],
503 1
                    Scheduler::OPTION_RETRY => --$job['retry'],
504 1
                    Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
505
                ]);
506
            }
507
        }
508
509 6
        if ($job['interval'] >= 0) {
510 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['interval'].'s]', [
511 1
                'category' => get_class($this),
512
            ]);
513
514 1
            return $this->scheduler->addJob($job['class'], $job['data'], [
515 1
                Scheduler::OPTION_AT => time() + $job['interval'],
516 1
                Scheduler::OPTION_INTERVAL => $job['interval'],
517 1
                Scheduler::OPTION_RETRY => $job['retry'],
518 1
                Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
519
            ]);
520
        }
521
522 5
        return $job['_id'];
523
    }
524
525
    /**
526
     * Execute job.
527
     *
528
     * @param array $job
529
     *
530
     * @return bool
531
     */
532 11
    protected function executeJob(array $job): bool
533
    {
534 11
        if (!class_exists($job['class'])) {
535 4
            throw new Exception\InvalidJob('job class does not exists');
536
        }
537
538 7
        if (null === $this->container) {
539 6
            $instance = new $job['class']();
540
        } else {
541 1
            $instance = $this->container->get($job['class']);
542
        }
543
544 7
        if (!($instance instanceof JobInterface)) {
545
            throw new Exception\InvalidJob('job must implement JobInterface');
546
        }
547
548 7
        $instance->setData($job['data'])
549 7
            ->start();
550
551 4
        return $this->updateJob($job['_id'], self::STATUS_DONE);
552
    }
553
}
554