Passed
Push — master ( 01bb58...63e0fd )
by Raffael
02:41
created

Queue   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 389
Duplicated Lines 0 %

Test Coverage

Coverage 79.29%

Importance

Changes 0
Metric Value
wmc 35
dl 0
loc 389
ccs 111
cts 140
cp 0.7929
rs 9
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
B processOnce() 0 22 4
B process() 0 24 4
A queueJob() 0 13 3
A retrieveNextJob() 0 11 2
B getCursor() 0 26 2
A __construct() 0 7 1
B collectJob() 0 26 3
A executeJob() 0 20 4
A processLocalQueue() 0 19 4
B processJob() 0 56 5
A updateJob() 0 18 3
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Exception\RuntimeException;
20
use MongoDB\Operation\Find;
21
use Psr\Container\ContainerInterface;
22
use Psr\Log\LoggerInterface;
23
24
class Queue
25
{
26
    /**
27
     * Job status.
28
     */
29
    const STATUS_WAITING = 0;
30
    const STATUS_POSTPONED = 1;
31
    const STATUS_PROCESSING = 2;
32
    const STATUS_DONE = 3;
33
    const STATUS_FAILED = 4;
34
    const STATUS_CANCELED = 5;
35
36
    /**
37
     * Scheduler.
38
     *
39
     * @param Scheduler
40
     */
41
    protected $scheduler;
42
43
    /**
44
     * Database.
45
     *
46
     * @var Database
47
     */
48
    protected $db;
49
50
    /**
51
     * LoggerInterface.
52
     *
53
     * @var LoggerInterface
54
     */
55
    protected $logger;
56
57
    /**
58
     * Local queue.
59
     *
60
     * @var array
61
     */
62
    protected $queue = [];
63
64
    /**
65
     * Collection name.
66
     *
67
     * @var string
68
     */
69
    protected $collection_name = 'queue';
70
71
    /**
72
     * Container.
73
     *
74
     * @var ContainerInterface
75
     */
76
    protected $container;
77
78
    /**
79
     * Init queue.
80
     *
81
     * @param Scheduler          $scheduler
82
     * @param Database           $db
83
     * @param LoggerInterface    $logger
84
     * @param ContainerInterface $container
85
     * @param iterable           $config
86
     */
87 19
    public function __construct(Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
88
    {
89 19
        $this->scheduler = $scheduler;
90 19
        $this->db = $db;
91 19
        $this->logger = $logger;
92 19
        $this->container = $container;
93 19
        $this->collection_name = $scheduler->getCollection();
94 19
    }
95
96
    /**
97
     * Execute job queue as endless loop.
98
     */
99
    public function process()
100
    {
101
        $cursor = $this->getCursor();
102
103
        while (true) {
104
            $this->processLocalQueue();
105
106
            if (null === $cursor->current()) {
107
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

107
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
108
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
109
                        'category' => get_class($this),
110
                    ]);
111
112
                    return $this->process();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->process() targeting TaskScheduler\Queue::process() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
113
                }
114
115
                $this->retrieveNextJob($cursor);
0 ignored issues
show
Bug introduced by
$cursor of type IteratorIterator is incompatible with the type iterable expected by parameter $cursor of TaskScheduler\Queue::retrieveNextJob(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

115
                $this->retrieveNextJob(/** @scrutinizer ignore-type */ $cursor);
Loading history...
116
117
                continue;
118
            }
119
120
            $job = $cursor->current();
121
            $this->retrieveNextJob($cursor);
122
            $this->queueJob($job);
123
        }
124
    }
125
126
    /**
127
     * Execute job queue.
128
     *
129
     * @return bool
130
     */
131 1
    public function processOnce(): bool
132
    {
133 1
        $cursor = $this->getCursor(false);
134
135 1
        while (true) {
136 1
            $this->processLocalQueue();
137
138 1
            if (null === $cursor->current()) {
139 1
                if ($cursor->getInnerIterator()->isDead()) {
140
                    $this->logger->debug('all jobs were processed', [
141
                        'category' => get_class($this),
142
                    ]);
143
144
                    return false;
145
                }
146
147 1
                return true;
148
            }
149
150 1
            $job = $cursor->current();
151 1
            $cursor->next();
152 1
            $this->queueJob($job);
153
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
154
    }
155
156
    /**
157
     * Retrieve next job.
158
     *
159
     * @param iterable $cursor
160
     */
161 1
    protected function retrieveNextJob(Iterable $cursor)
162
    {
163
        try {
164 1
            $cursor->next();
165
        } catch (RuntimeException $e) {
166
            $this->logger->error('job queue cursor failed to retrieve next job, restart daemon', [
167
                'category' => get_class($this),
168
                'exception' => $e,
169
            ]);
170
171
            $this->process();
172
        }
173 1
    }
174
175
    /**
176
     * Queue job.
177
     *
178
     * @param array $job
179
     */
180 1
    protected function queueJob(array $job): bool
181
    {
182 1
        if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING)) {
183 1
            $this->processJob($job);
184
        } elseif (self::STATUS_POSTPONED === $job['status']) {
185
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
186
                'category' => get_class($this),
187
            ]);
188
189
            $this->queue[] = $job;
190
        }
191
192 1
        return true;
193
    }
194
195
    /**
196
     * Get cursor.
197
     *
198
     * @param bool $tailable
199
     *
200
     * @return IteratorIterator
201
     */
202 4
    protected function getCursor(bool $tailable = true): IteratorIterator
203
    {
204
        $options = [
205 4
            'typeMap' => [
206
                'document' => 'array',
207
                'root' => 'array',
208
                'array' => 'array',
209
            ],
210
        ];
211
212 4
        if (true === $tailable) {
213 3
            $options['cursorType'] = Find::TAILABLE;
214 3
            $options['noCursorTimeout'] = true;
215
        }
216
217 4
        $cursor = $this->db->{$this->collection_name}->find([
218
            '$or' => [
219 4
                ['status' => self::STATUS_WAITING],
220 4
                ['status' => self::STATUS_POSTPONED],
221
            ],
222 4
        ], $options);
223
224 4
        $iterator = new IteratorIterator($cursor);
225 4
        $iterator->rewind();
226
227 4
        return $iterator;
228
    }
229
230
    /**
231
     * Update job status.
232
     *
233
     * @param ObjectId $id
234
     * @param int      $status
235
     * @param mixed    $from_status
236
     *
237
     * @return bool
238
     */
239 5
    protected function collectJob(ObjectId $id, int $status, $from_status = self::STATUS_WAITING): bool
240
    {
241 5
        $result = $this->db->{$this->collection_name}->updateMany([
242 5
            '_id' => $id,
243 5
            'status' => $from_status,
244
            '$isolated' => true,
245
        ], [
246
            '$set' => [
247 5
                'status' => $status,
248 5
                'started' => self::STATUS_PROCESSING === $status ? new UTCDateTime() : new UTCDateTime(0),
249
            ],
250
        ]);
251
252 5
        if (1 === $result->getModifiedCount()) {
253 5
            $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
254 5
                'category' => get_class($this),
255
            ]);
256
257 5
            return true;
258
        }
259
260 1
        $this->logger->debug('job ['.$id.'] is already collected with status ['.$status.']', [
261 1
            'category' => get_class($this),
262
        ]);
263
264 1
        return false;
265
    }
266
267
    /**
268
     * Update job status.
269
     *
270
     * @param ObjectId $id
271
     * @param int      $status
272
     *
273
     * @return bool
274
     */
275 13
    protected function updateJob(ObjectId $id, int $status): bool
276
    {
277
        $set = [
278 13
            'status' => $status,
279
        ];
280
281 13
        if (self::STATUS_DONE === $status || self::STATUS_FAILED === $status) {
282 10
            $set['ended'] = new UTCDateTime();
283
        }
284
285 13
        $result = $this->db->{$this->collection_name}->updateMany([
286 13
            '_id' => $id,
287
            '$isolated' => true,
288
        ], [
289 13
            '$set' => $set,
290
        ]);
291
292 13
        return $result->isAcknowledged();
293
    }
294
295
    /**
296
     * Check local queue for postponed jobs.
297
     *
298
     * @return bool
299
     */
300 4
    protected function processLocalQueue(): bool
301
    {
302 4
        $now = new UTCDateTime();
303 4
        foreach ($this->queue as $key => $job) {
304 3
            if ($job['at'] <= $now) {
305 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
306 2
                    'category' => get_class($this),
307
                ]);
308
309 2
                unset($this->queue[$key]);
310 2
                $job['at'] = null;
311
312 2
                if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING, self::STATUS_POSTPONED)) {
313 3
                    $this->processJob($job);
314
                }
315
            }
316
        }
317
318 4
        return true;
319
    }
320
321
    /**
322
     * Process job.
323
     *
324
     * @param array $job
325
     *
326
     * @return ObjectId
327
     */
328 8
    protected function processJob(array $job): ObjectId
329
    {
330 8
        if ($job['at'] instanceof UTCDateTime) {
331 1
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
0 ignored issues
show
Bug introduced by
It seems like $job['_id'] can also be of type MongoDB\BSON\UTCDateTime; however, parameter $id of TaskScheduler\Queue::updateJob() does only seem to accept MongoDB\BSON\ObjectId, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

331
            $this->updateJob(/** @scrutinizer ignore-type */ $job['_id'], self::STATUS_POSTPONED);
Loading history...
332 1
            $this->queue[] = $job;
333
334 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
335 1
                'category' => get_class($this),
336
            ]);
337
338 1
            return $job['_id'];
0 ignored issues
show
Bug Best Practice introduced by
The expression return $job['_id'] could return the type MongoDB\BSON\UTCDateTime which is incompatible with the type-hinted return MongoDB\BSON\ObjectId. Consider adding an additional type-check to rule them out.
Loading history...
339
        }
340
341 7
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
342 7
            'category' => get_class($this),
343 7
            'params' => $job['data'],
344
        ]);
345
346
        try {
347 7
            $this->executeJob($job);
348 5
        } catch (\Exception $e) {
349 5
            $this->logger->error('failed execute job ['.$job['_id'].']', [
350 5
                'category' => get_class($this),
351 5
                'exception' => $e,
352
            ]);
353
354 5
            $this->updateJob($job['_id'], self::STATUS_FAILED);
355
356 5
            if ($job['retry'] > 0) {
357 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
358 1
                    'category' => get_class($this),
359
                ]);
360
361 1
                return $this->scheduler->addJob($job['class'], $job['data'], [
362 1
                    Scheduler::OPTION_AT => time() + $job['retry_interval'],
363 1
                    Scheduler::OPTION_INTERVAL => $job['interval'],
364 1
                    Scheduler::OPTION_RETRY => --$job['retry'],
365 1
                    Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
366
                ]);
367
            }
368
        }
369
370 6
        if ($job['interval'] >= 0) {
371 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['interval'].'s]', [
372 1
                'category' => get_class($this),
373
            ]);
374
375 1
            return $this->scheduler->addJob($job['class'], $job['data'], [
376 1
                Scheduler::OPTION_AT => time() + $job['interval'],
377 1
                Scheduler::OPTION_INTERVAL => $job['interval'],
378 1
                Scheduler::OPTION_RETRY => $job['retry'],
379 1
                Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
380
            ]);
381
        }
382
383 5
        return $job['_id'];
384
    }
385
386
    /**
387
     * Execute job.
388
     *
389
     * @param array $job
390
     *
391
     * @return bool
392
     */
393 11
    protected function executeJob(array $job): bool
394
    {
395 11
        if (!class_exists($job['class'])) {
396 4
            throw new Exception\InvalidJob('job class does not exists');
397
        }
398
399 7
        if (null === $this->container) {
400 6
            $instance = new $job['class']();
401
        } else {
402 1
            $instance = $this->container->get($job['class']);
403
        }
404
405 7
        if (!($instance instanceof JobInterface)) {
406
            throw new Exception\InvalidJob('job must implement JobInterface');
407
        }
408
409 7
        $instance->setData($job['data'])
410 7
            ->start();
411
412 4
        return $this->updateJob($job['_id'], self::STATUS_DONE);
413
    }
414
}
415