Passed
Push — master ( ecf44b...de2af5 )
by Raffael
02:46
created

Queue::__destroy()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 3
ccs 0
cts 2
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 1
nc 1
nop 0
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Exception\ConnectionException;
20
use MongoDB\Driver\Exception\RuntimeException;
21
use MongoDB\Operation\Find;
22
use Psr\Container\ContainerInterface;
23
use Psr\Log\LoggerInterface;
24
25
class Queue
26
{
27
    /**
28
     * Job status.
29
     */
30
    const STATUS_WAITING = 0;
31
    const STATUS_POSTPONED = 1;
32
    const STATUS_PROCESSING = 2;
33
    const STATUS_DONE = 3;
34
    const STATUS_FAILED = 4;
35
    const STATUS_CANCELED = 5;
36
37
    /**
38
     * Scheduler.
39
     *
40
     * @param Scheduler
41
     */
42
    protected $scheduler;
43
44
    /**
45
     * Database.
46
     *
47
     * @var Database
48
     */
49
    protected $db;
50
51
    /**
52
     * LoggerInterface.
53
     *
54
     * @var LoggerInterface
55
     */
56
    protected $logger;
57
58
    /**
59
     * Local queue.
60
     *
61
     * @var array
62
     */
63
    protected $queue = [];
64
65
    /**
66
     * Collection name.
67
     *
68
     * @var string
69
     */
70
    protected $collection_name = 'queue';
71
72
    /**
73
     * Container.
74
     *
75
     * @var ContainerInterface
76
     */
77
    protected $container;
78
79
    /**
80
     * Current processing job.
81
     *
82
     * @var array
83
     */
84
    protected $current_job;
85
86
    /**
87
     * Init queue.
88
     *
89
     * @param Scheduler          $scheduler
90
     * @param Database           $db
91
     * @param LoggerInterface    $logger
92
     * @param ContainerInterface $container
93
     */
94 24
    public function __construct(Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
95
    {
96 24
        $this->scheduler = $scheduler;
97 24
        $this->db = $db;
98 24
        $this->logger = $logger;
99 24
        $this->container = $container;
100 24
        $this->collection_name = $scheduler->getCollection();
101 24
    }
102
103
    /**
104
     * Execute job queue as endless loop.
105
     */
106
    public function process()
107
    {
108
        $cursor = $this->getCursor();
109
        $this->catchSignal();
110
111
        while (true) {
112
            $this->processLocalQueue();
113
114
            if (null === $cursor->current()) {
115
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

115
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
116
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
117
                        'category' => get_class($this),
118
                    ]);
119
120
                    $this->createQueue();
121
122
                    return $this->process();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->process() targeting TaskScheduler\Queue::process() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
123
                }
124
125
                $this->retrieveNextJob($cursor);
0 ignored issues
show
Bug introduced by
$cursor of type IteratorIterator is incompatible with the type iterable expected by parameter $cursor of TaskScheduler\Queue::retrieveNextJob(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

125
                $this->retrieveNextJob(/** @scrutinizer ignore-type */ $cursor);
Loading history...
126
127
                continue;
128
            }
129
130
            $job = $cursor->current();
131
            $this->retrieveNextJob($cursor);
132
            $this->queueJob($job);
133
        }
134
    }
135
136
    /**
137
     * Execute job queue.
138
     *
139
     * @return bool
140
     */
141 1
    public function processOnce(): bool
142
    {
143 1
        $cursor = $this->getCursor(false);
144
145 1
        while (true) {
146 1
            $this->processLocalQueue();
147
148 1
            if (null === $cursor->current()) {
149 1
                if ($cursor->getInnerIterator()->isDead()) {
150
                    $this->logger->debug('all jobs were processed', [
151
                        'category' => get_class($this),
152
                    ]);
153
154
                    return false;
155
                }
156
157 1
                return true;
158
            }
159
160 1
            $job = $cursor->current();
161 1
            $cursor->next();
162 1
            $this->queueJob($job);
163
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
164
    }
165
166
    /**
167
     * Catch signals and cleanup.
168
     *
169
     * @return Queue
170
     */
171 1
    protected function catchSignal(): self
172
    {
173 1
        pcntl_async_signals(true);
174 1
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
175 1
        pcntl_signal(SIGINT, [$this, 'cleanup']);
176
177 1
        return $this;
178
    }
179
180
    /**
181
     * Cleanup and exit.
182
     *
183
     * @param int $sig
184
     *
185
     * @return ObjectId
186
     */
187 2
    protected function handleSignal(int $sig): ?ObjectId
188
    {
189 2
        if (null === $this->current_job) {
190 2
            $this->logger->debug('received signal ['.$sig.'], no job is currently processing, exit now', [
191 2
                'category' => get_class($this),
192
            ]);
193
194 2
            return null;
195
        }
196
197
        $this->logger->debug('received signal ['.$sig.'], reschedule current processing job ['.$this->current_job['_id'].']', [
198
            'category' => get_class($this),
199
        ]);
200
201
        $this->updateJob($this->current_job['_id'], self::STATUS_CANCELED);
202
203
        return $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], [
204
            Scheduler::OPTION_AT => $this->current_job['retry_interval'],
205
            Scheduler::OPTION_INTERVAL => $this->current_job['interval'],
206
            Scheduler::OPTION_RETRY => --$this->current_job['retry'],
207
            Scheduler::OPTION_RETRY_INTERVAL => $this->current_job['retry_interval'],
208
        ]);
209
    }
210
211
    /**
212
     * Cleanup and exit.
213
     *
214
     * @param int $sig
215
     */
216
    protected function cleanup(int $sig)
217
    {
218
        $this->handleSignal($sig);
219
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
220
    }
221
222
    /**
223
     * Create queue and insert a dummy object to start cursor
224
     * Dummy object is required, otherwise we would get a dead cursor.
225
     *
226
     * @return Queue
227
     */
228 1
    protected function createQueue(): self
229
    {
230 1
        $this->logger->info('create new queue ['.$this->collection_name.']', [
231 1
            'category' => get_class($this),
232
        ]);
233
234
        try {
235 1
            $this->db->createCollection(
236 1
                $this->collection_name,
237
                [
238 1
                    'capped' => true,
239 1
                    'size' => $this->scheduler->getQueueSize(),
240
                ]
241
            );
242
243 1
            $this->db->{$this->collection_name}->insertOne(['class' => 'dummy']);
244
        } catch (RuntimeException $e) {
245
            if (48 !== $e->getCode()) {
246
                throw $e;
247
            }
248
        }
249
250 1
        return $this;
251
    }
252
253
    /**
254
     * Create queue and insert a dummy object to start cursor
255
     * Dummy object is required, otherwise we would get a dead cursor.
256
     *
257
     * @return Queue
258
     */
259 1
    protected function convertQueue(): self
260
    {
261 1
        $this->logger->info('convert existing queue collection ['.$this->collection_name.'] into a capped collection', [
262 1
            'category' => get_class($this),
263
        ]);
264
265 1
        $this->db->command([
266 1
            'convertToCapped' => $this->collection_name,
267 1
            'size' => $this->scheduler->getQueueSize(),
268
        ]);
269
270 1
        $this->db->{$this->collection_name}->insertOne(['class' => 'dummy']);
271
272 1
        return $this;
273
    }
274
275
    /**
276
     * Retrieve next job.
277
     *
278
     * @param iterable $cursor
279
     */
280 1
    protected function retrieveNextJob(Iterable $cursor)
281
    {
282
        try {
283 1
            $cursor->next();
284
        } catch (RuntimeException $e) {
285
            $this->logger->error('job queue cursor failed to retrieve next job, restart daemon', [
286
                'category' => get_class($this),
287
                'exception' => $e,
288
            ]);
289
290
            $this->process();
291
        }
292 1
    }
293
294
    /**
295
     * Queue job.
296
     *
297
     * @param array $job
298
     */
299 1
    protected function queueJob(array $job): bool
300
    {
301 1
        if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING)) {
302 1
            $this->processJob($job);
303
        } elseif (self::STATUS_POSTPONED === $job['status']) {
304
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
305
                'category' => get_class($this),
306
            ]);
307
308
            $this->queue[] = $job;
309
        }
310
311 1
        return true;
312
    }
313
314
    /**
315
     * Get cursor.
316
     *
317
     * @param bool $tailable
318
     *
319
     * @return IteratorIterator
320
     */
321 4
    protected function getCursor(bool $tailable = true): IteratorIterator
322
    {
323
        $options = [
324 4
            'typeMap' => [
325
                'document' => 'array',
326
                'root' => 'array',
327
                'array' => 'array',
328
            ],
329
        ];
330
331 4
        if (true === $tailable) {
332 3
            $options['cursorType'] = Find::TAILABLE;
333 3
            $options['noCursorTimeout'] = true;
334
        }
335
336
        try {
337 4
            $cursor = $this->db->{$this->collection_name}->find([
338
                '$or' => [
339 4
                    ['status' => self::STATUS_WAITING],
340 4
                    ['status' => self::STATUS_POSTPONED],
341
                ],
342 4
            ], $options);
343
        } catch (ConnectionException $e) {
344
            if (2 === $e->getCode()) {
345
                $this->convertQueue();
346
347
                return $this->getCursor($tailable);
348
            }
349
350
            throw $e;
351
        }
352
353 4
        $iterator = new IteratorIterator($cursor);
354 4
        $iterator->rewind();
355
356 4
        return $iterator;
357
    }
358
359
    /**
360
     * Update job status.
361
     *
362
     * @param ObjectId $id
363
     * @param int      $status
364
     * @param mixed    $from_status
365
     *
366
     * @return bool
367
     */
368 5
    protected function collectJob(ObjectId $id, int $status, $from_status = self::STATUS_WAITING): bool
369
    {
370 5
        $result = $this->db->{$this->collection_name}->updateMany([
371 5
            '_id' => $id,
372 5
            'status' => $from_status,
373
            '$isolated' => true,
374
        ], [
375
            '$set' => [
376 5
                'status' => $status,
377 5
                'started' => self::STATUS_PROCESSING === $status ? new UTCDateTime() : new UTCDateTime(0),
378
            ],
379
        ]);
380
381 5
        if (1 === $result->getModifiedCount()) {
382 5
            $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
383 5
                'category' => get_class($this),
384
            ]);
385
386 5
            return true;
387
        }
388
389 1
        $this->logger->debug('job ['.$id.'] is already collected with status ['.$status.']', [
390 1
            'category' => get_class($this),
391
        ]);
392
393 1
        return false;
394
    }
395
396
    /**
397
     * Update job status.
398
     *
399
     * @param ObjectId $id
400
     * @param int      $status
401
     *
402
     * @return bool
403
     */
404 13
    protected function updateJob(ObjectId $id, int $status): bool
405
    {
406
        $set = [
407 13
            'status' => $status,
408
        ];
409
410 13
        if (self::STATUS_DONE === $status || self::STATUS_FAILED === $status) {
411 10
            $set['ended'] = new UTCDateTime();
412
        }
413
414 13
        $result = $this->db->{$this->collection_name}->updateMany([
415 13
            '_id' => $id,
416
            '$isolated' => true,
417
        ], [
418 13
            '$set' => $set,
419
        ]);
420
421 13
        return $result->isAcknowledged();
422
    }
423
424
    /**
425
     * Check local queue for postponed jobs.
426
     *
427
     * @return bool
428
     */
429 4
    protected function processLocalQueue(): bool
430
    {
431 4
        $now = new UTCDateTime();
432 4
        foreach ($this->queue as $key => $job) {
433 3
            if ($job['at'] <= $now) {
434 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
435 2
                    'category' => get_class($this),
436
                ]);
437
438 2
                unset($this->queue[$key]);
439 2
                $job['at'] = null;
440
441 2
                if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING, self::STATUS_POSTPONED)) {
442 3
                    $this->processJob($job);
443
                }
444
            }
445
        }
446
447 4
        return true;
448
    }
449
450
    /**
451
     * Process job.
452
     *
453
     * @param array $job
454
     *
455
     * @return ObjectId
456
     */
457 8
    protected function processJob(array $job): ObjectId
458
    {
459 8
        if ($job['at'] instanceof UTCDateTime) {
460 1
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
0 ignored issues
show
Bug introduced by
It seems like $job['_id'] can also be of type MongoDB\BSON\UTCDateTime; however, parameter $id of TaskScheduler\Queue::updateJob() does only seem to accept MongoDB\BSON\ObjectId, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

460
            $this->updateJob(/** @scrutinizer ignore-type */ $job['_id'], self::STATUS_POSTPONED);
Loading history...
461 1
            $this->queue[] = $job;
462
463 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
464 1
                'category' => get_class($this),
465
            ]);
466
467 1
            return $job['_id'];
0 ignored issues
show
Bug Best Practice introduced by
The expression return $job['_id'] could return the type MongoDB\BSON\UTCDateTime which is incompatible with the type-hinted return MongoDB\BSON\ObjectId. Consider adding an additional type-check to rule them out.
Loading history...
468
        }
469
470 7
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
471 7
            'category' => get_class($this),
472 7
            'params' => $job['data'],
473
        ]);
474
475 7
        $this->current_job = $job;
476
477
        try {
478 7
            $this->executeJob($job);
479 2
            $this->current_job = null;
480 5
        } catch (\Exception $e) {
481 5
            $this->logger->error('failed execute job ['.$job['_id'].']', [
482 5
                'category' => get_class($this),
483 5
                'exception' => $e,
484
            ]);
485
486 5
            $this->updateJob($job['_id'], self::STATUS_FAILED);
487 5
            $this->current_job = null;
488
489 5
            if ($job['retry'] >= 0) {
490 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
491 1
                    'category' => get_class($this),
492
                ]);
493
494 1
                return $this->scheduler->addJob($job['class'], $job['data'], [
495 1
                    Scheduler::OPTION_AT => time() + $job['retry_interval'],
496 1
                    Scheduler::OPTION_INTERVAL => $job['interval'],
497 1
                    Scheduler::OPTION_RETRY => --$job['retry'],
498 1
                    Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
499
                ]);
500
            }
501
        }
502
503 6
        if ($job['interval'] >= 0) {
504 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['interval'].'s]', [
505 1
                'category' => get_class($this),
506
            ]);
507
508 1
            return $this->scheduler->addJob($job['class'], $job['data'], [
509 1
                Scheduler::OPTION_AT => time() + $job['interval'],
510 1
                Scheduler::OPTION_INTERVAL => $job['interval'],
511 1
                Scheduler::OPTION_RETRY => $job['retry'],
512 1
                Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
513
            ]);
514
        }
515
516 5
        return $job['_id'];
517
    }
518
519
    /**
520
     * Execute job.
521
     *
522
     * @param array $job
523
     *
524
     * @return bool
525
     */
526 11
    protected function executeJob(array $job): bool
527
    {
528 11
        if (!class_exists($job['class'])) {
529 4
            throw new Exception\InvalidJob('job class does not exists');
530
        }
531
532 7
        if (null === $this->container) {
533 6
            $instance = new $job['class']();
534
        } else {
535 1
            $instance = $this->container->get($job['class']);
536
        }
537
538 7
        if (!($instance instanceof JobInterface)) {
539
            throw new Exception\InvalidJob('job must implement JobInterface');
540
        }
541
542 7
        $instance->setData($job['data'])
543 7
            ->start();
544
545 4
        return $this->updateJob($job['_id'], self::STATUS_DONE);
546
    }
547
}
548