Passed
Push — master ( b0131a...01bb58 )
by Raffael
03:28
created

Queue::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 7
c 0
b 0
f 0
ccs 6
cts 6
cp 1
rs 9.4285
cc 1
eloc 5
nc 1
nop 4
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Exception\RuntimeException;
20
use MongoDB\Operation\Find;
21
use Psr\Container\ContainerInterface;
22
use Psr\Log\LoggerInterface;
23
24
class Queue
25
{
26
    /**
27
     * Job status.
28
     */
29
    const STATUS_WAITING = 0;
30
    const STATUS_POSTPONED = 1;
31
    const STATUS_PROCESSING = 2;
32
    const STATUS_DONE = 3;
33
    const STATUS_FAILED = 4;
34
    const STATUS_CANCELED = 5;
35
36
    /**
37
     * Scheduler.
38
     *
39
     * @param Scheduler
40
     */
41
    protected $scheduler;
42
43
    /**
44
     * Database.
45
     *
46
     * @var Database
47
     */
48
    protected $db;
49
50
    /**
51
     * LoggerInterface.
52
     *
53
     * @var LoggerInterface
54
     */
55
    protected $logger;
56
57
    /**
58
     * Local queue.
59
     *
60
     * @var array
61
     */
62
    protected $queue = [];
63
64
    /**
65
     * Collection name.
66
     *
67
     * @var string
68
     */
69
    protected $collection_name = 'queue';
70
71
    /**
72
     * Container.
73
     *
74
     * @var ContainerInterface
75
     */
76
    protected $container;
77
78
    /**
79
     * Init queue.
80
     *
81
     * @param Scheduler          $scheduler
82
     * @param Database           $db
83
     * @param LoggerInterface    $logger
84
     * @param ContainerInterface $container
85
     * @param iterable           $config
86
     */
87 19
    public function __construct(Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
88
    {
89 19
        $this->scheduler = $scheduler;
90 19
        $this->db = $db;
91 19
        $this->logger = $logger;
92 19
        $this->container = $container;
93 19
        $this->collection_name = $scheduler->getCollection();
94 19
    }
95
96
    /**
97
     * Execute job queue as endless loop.
98
     */
99
    public function process()
100
    {
101
        $cursor = $this->getCursor();
102
103
        while (true) {
104
            $this->processLocalQueue();
105
106
            if (null === $cursor->current()) {
107
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

107
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
108
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
109
                        'category' => get_class($this),
110
                    ]);
111
112
                    return $this->process();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->process() targeting TaskScheduler\Queue::process() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
113
                }
114
115
                $this->retrieveNextJob($cursor);
0 ignored issues
show
Bug introduced by
$cursor of type IteratorIterator is incompatible with the type iterable expected by parameter $cursor of TaskScheduler\Queue::retrieveNextJob(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

115
                $this->retrieveNextJob(/** @scrutinizer ignore-type */ $cursor);
Loading history...
116
117
                continue;
118
            }
119
120
            $job = $cursor->current();
121
            $this->retrieveNextJob($cursor);
122
            $this->queueJob($job);
123
        }
124
    }
125
126
    /**
127
     * Execute job queue.
128
     *
129
     * @return bool
130
     */
131 1
    public function processOnce(): bool
132
    {
133 1
        $cursor = $this->getCursor(false);
134
135 1
        while (true) {
136 1
            $this->processLocalQueue();
137
138 1
            if (null === $cursor->current()) {
139 1
                if ($cursor->getInnerIterator()->isDead()) {
140
                    $this->logger->debug('all jobs were processed', [
141
                        'category' => get_class($this),
142
                    ]);
143
144
                    return false;
145
                }
146
147 1
                return true;
148
            }
149
150 1
            $job = $cursor->current();
151 1
            $cursor->next();
152 1
            $this->queueJob($job);
153
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
154
    }
155
156
    /**
157
     * Retrieve next job.
158
     *
159
     * @param iterable $cursor
160
     */
161 1
    protected function retrieveNextJob(Iterable $cursor)
162
    {
163
        try {
164 1
            $cursor->next();
165
        } catch (RuntimeException $e) {
166
            $this->logger->error('job queue cursor failed to retrieve next job, restart daemon', [
167
                'category' => get_class($this),
168
                'exception' => $e,
169
            ]);
170
171
            $this->process();
172
        }
173 1
    }
174
175
    /**
176
     * Queue job.
177
     *
178
     * @param array $job
179
     */
180 1
    protected function queueJob(array $job): bool
181
    {
182 1
        if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING)) {
183 1
            $this->processJob($job);
184
        } elseif (self::STATUS_POSTPONED === $job['status']) {
185
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
186
                'category' => get_class($this),
187
            ]);
188
189
            $this->queue[] = $job;
190
        }
191
192 1
        return true;
193
    }
194
195
    /**
196
     * Get cursor.
197
     *
198
     * @param bool $tailable
199
     *
200
     * @return IteratorIterator
201
     */
202 4
    protected function getCursor(bool $tailable = true): IteratorIterator
203
    {
204
        $options = [
205 4
            'typeMap' => [
206
                'document' => 'array',
207
                'root' => 'array',
208
                'array' => 'array',
209
            ],
210
        ];
211
212 4
        if (true === $tailable) {
213 3
            $options['cursorType'] = Find::TAILABLE;
214 3
            $options['noCursorTimeout'] = true;
215
        }
216
217 4
        $cursor = $this->db->{$this->collection_name}->find([
218
            '$or' => [
219 4
                ['status' => self::STATUS_WAITING],
220 4
                ['status' => self::STATUS_POSTPONED],
221
            ],
222 4
        ], $options);
223
224 4
        $iterator = new IteratorIterator($cursor);
225 4
        $iterator->rewind();
226
227 4
        return $iterator;
228
    }
229
230
    /**
231
     * Update job status.
232
     *
233
     * @param ObjectId $id
234
     * @param int      $status
235
     * @param mixed    $from_status
236
     *
237
     * @return bool
238
     */
239 5
    protected function collectJob(ObjectId $id, int $status, $from_status = self::STATUS_WAITING): bool
240
    {
241 5
        $result = $this->db->{$this->collection_name}->updateMany([
242 5
            '_id' => $id,
243 5
            'status' => $from_status,
244
            '$isolated' => true,
245
        ], [
246
            '$set' => [
247 5
                'status' => $status,
248 5
                'timestamp' => new UTCDateTime(),
249
            ],
250
        ]);
251
252 5
        if (1 === $result->getModifiedCount()) {
253 5
            $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
254 5
                'category' => get_class($this),
255
            ]);
256
257 5
            return true;
258
        }
259
260 1
        $this->logger->debug('job ['.$id.'] is already collected with status ['.$status.']', [
261 1
            'category' => get_class($this),
262
        ]);
263
264 1
        return false;
265
    }
266
267
    /**
268
     * Update job status.
269
     *
270
     * @param ObjectId $id
271
     * @param int      $status
272
     *
273
     * @return bool
274
     */
275 13
    protected function updateJob(ObjectId $id, int $status): bool
276
    {
277 13
        $result = $this->db->{$this->collection_name}->updateMany([
278 13
            '_id' => $id,
279
            '$isolated' => true,
280
        ], [
281
            '$set' => [
282 13
                'status' => $status,
283 13
                'timestamp' => new UTCDateTime(),
284
            ],
285
        ]);
286
287 13
        return $result->isAcknowledged();
288
    }
289
290
    /**
291
     * Check local queue for postponed jobs.
292
     *
293
     * @return bool
294
     */
295 4
    protected function processLocalQueue(): bool
296
    {
297 4
        $now = new UTCDateTime();
298 4
        foreach ($this->queue as $key => $job) {
299 3
            if ($job['at'] <= $now) {
300 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
301 2
                    'category' => get_class($this),
302
                ]);
303
304 2
                unset($this->queue[$key]);
305 2
                $job['at'] = null;
306
307 2
                if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING, self::STATUS_POSTPONED)) {
308 3
                    $this->processJob($job);
309
                }
310
            }
311
        }
312
313 4
        return true;
314
    }
315
316
    /**
317
     * Process job.
318
     *
319
     * @param array $job
320
     *
321
     * @return ObjectId
322
     */
323 8
    protected function processJob(array $job): ObjectId
324
    {
325 8
        if ($job['at'] instanceof UTCDateTime) {
326 1
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
0 ignored issues
show
Bug introduced by
It seems like $job['_id'] can also be of type MongoDB\BSON\UTCDateTime; however, parameter $id of TaskScheduler\Queue::updateJob() does only seem to accept MongoDB\BSON\ObjectId, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

326
            $this->updateJob(/** @scrutinizer ignore-type */ $job['_id'], self::STATUS_POSTPONED);
Loading history...
327 1
            $this->queue[] = $job;
328
329 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
330 1
                'category' => get_class($this),
331
            ]);
332
333 1
            return $job['_id'];
0 ignored issues
show
Bug Best Practice introduced by
The expression return $job['_id'] could return the type MongoDB\BSON\UTCDateTime which is incompatible with the type-hinted return MongoDB\BSON\ObjectId. Consider adding an additional type-check to rule them out.
Loading history...
334
        }
335
336 7
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
337 7
            'category' => get_class($this),
338 7
            'params' => $job['data'],
339
        ]);
340
341
        try {
342 7
            $this->executeJob($job);
343 5
        } catch (\Exception $e) {
344 5
            $this->logger->error('failed execute job ['.$job['_id'].']', [
345 5
                'category' => get_class($this),
346 5
                'exception' => $e,
347
            ]);
348
349 5
            $this->updateJob($job['_id'], self::STATUS_FAILED);
350
351 5
            if ($job['retry'] > 0) {
352 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
353 1
                    'category' => get_class($this),
354
                ]);
355
356 1
                return $this->scheduler->addJob($job['class'], $job['data'], [
357 1
                    Scheduler::OPTION_AT => time() + $job['retry_interval'],
358 1
                    Scheduler::OPTION_INTERVAL => $job['interval'],
359 1
                    Scheduler::OPTION_RETRY => --$job['retry'],
360 1
                    Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
361
                ]);
362
            }
363
        }
364
365 6
        if ($job['interval'] >= 0) {
366 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['interval'].'s]', [
367 1
                'category' => get_class($this),
368
            ]);
369
370 1
            return $this->scheduler->addJob($job['class'], $job['data'], [
371 1
                Scheduler::OPTION_AT => time() + $job['interval'],
372 1
                Scheduler::OPTION_INTERVAL => $job['interval'],
373 1
                Scheduler::OPTION_RETRY => $job['retry'],
374 1
                Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
375
            ]);
376
        }
377
378 5
        return $job['_id'];
379
    }
380
381
    /**
382
     * Execute job.
383
     *
384
     * @param array $job
385
     *
386
     * @return bool
387
     */
388 11
    protected function executeJob(array $job): bool
389
    {
390 11
        if (!class_exists($job['class'])) {
391 4
            throw new Exception\InvalidJob('job class does not exists');
392
        }
393
394 7
        if (null === $this->container) {
395 6
            $instance = new $job['class']();
396
        } else {
397 1
            $instance = $this->container->get($job['class']);
398
        }
399
400 7
        if (!($instance instanceof JobInterface)) {
401
            throw new Exception\InvalidJob('job must implement JobInterface');
402
        }
403
404 7
        $instance->setData($job['data'])
405 7
            ->start();
406
407 4
        return $this->updateJob($job['_id'], self::STATUS_DONE);
408
    }
409
}
410