Completed
Pull Request — master (#2)
by Raffael
04:59
created

Queue::executeJob()   A

Complexity

Conditions 4
Paths 5

Size

Total Lines 22
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 4.0092

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 22
c 0
b 0
f 0
ccs 11
cts 12
cp 0.9167
rs 9.8333
cc 4
nc 5
nop 1
crap 4.0092
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Exception\ConnectionException;
20
use MongoDB\Driver\Exception\RuntimeException;
21
use MongoDB\Operation\Find;
22
use Psr\Container\ContainerInterface;
23
use Psr\Log\LoggerInterface;
24
25
class Queue
26
{
27
    /**
28
     * Job status.
29
     */
30
    const STATUS_WAITING = 0;
31
    const STATUS_POSTPONED = 1;
32
    const STATUS_PROCESSING = 2;
33
    const STATUS_DONE = 3;
34
    const STATUS_FAILED = 4;
35
    const STATUS_CANCELED = 5;
36
37
    /**
38
     * Scheduler.
39
     *
40
     * @param Scheduler
41
     */
42
    protected $scheduler;
43
44
    /**
45
     * Database.
46
     *
47
     * @var Database
48
     */
49
    protected $db;
50
51
    /**
52
     * LoggerInterface.
53
     *
54
     * @var LoggerInterface
55
     */
56
    protected $logger;
57
58
    /**
59
     * Local queue.
60
     *
61
     * @var array
62
     */
63
    protected $queue = [];
64
65
    /**
66
     * Collection name.
67
     *
68
     * @var string
69
     */
70
    protected $collection_name = 'queue';
71
72
    /**
73
     * Container.
74
     *
75
     * @var ContainerInterface
76
     */
77
    protected $container;
78
79
    /**
80
     * Current processing job.
81
     *
82
     * @var array
83
     */
84
    protected $current_job;
85
86
    /**
87
     * Init queue.
88
     *
89
     * @param Scheduler          $scheduler
90
     * @param Database           $db
91
     * @param LoggerInterface    $logger
92
     * @param ContainerInterface $container
93
     */
94 28
    public function __construct(Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
95
    {
96 28
        $this->scheduler = $scheduler;
97 28
        $this->db = $db;
98 28
        $this->logger = $logger;
99 28
        $this->container = $container;
100 28
        $this->collection_name = $scheduler->getCollection();
101 28
    }
102
103
    /**
104
     * Execute job queue as endless loop.
105
     */
106
    public function process()
107
    {
108
        $cursor = $this->getCursor();
109
        $this->catchSignal();
110
111
        while (true) {
112
            $this->processLocalQueue();
113
114
            if (null === $cursor->current()) {
115
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

115
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
116
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
117
                        'category' => get_class($this),
118
                    ]);
119
120
                    $this->createQueue();
121
122
                    return $this->process();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->process() targeting TaskScheduler\Queue::process() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
123
                }
124
125
                $this->retrieveNextJob($cursor);
126
127
                continue;
128
            }
129
130
            $job = $cursor->current();
131
            $this->retrieveNextJob($cursor);
132
            $this->queueJob($job);
133
        }
134
    }
135
136
    /**
137
     * Execute job queue.
138
     *
139
     * @return bool
140
     */
141 2
    public function processOnce(): bool
142
    {
143 2
        $cursor = $this->getCursor(false);
144
145 1
        while (true) {
146 1
            $this->processLocalQueue();
147
148 1
            if (null === $cursor->current()) {
149 1
                if ($cursor->getInnerIterator()->isDead()) {
150
                    $this->logger->debug('all jobs were processed', [
151
                        'category' => get_class($this),
152
                    ]);
153
154
                    return false;
155
                }
156
157 1
                return true;
158
            }
159
160 1
            $job = $cursor->current();
161 1
            $cursor->next();
162 1
            $this->queueJob($job);
163
        }
0 ignored issues
show
Bug Best Practice introduced by
In this branch, the function will implicitly return null which is incompatible with the type-hinted return boolean. Consider adding a return statement or allowing null as return value.

For hinted functions/methods where all return statements with the correct type are only reachable via conditions, ?null? gets implicitly returned which may be incompatible with the hinted type. Let?s take a look at an example:

interface ReturnsInt {
    public function returnsIntHinted(): int;
}

class MyClass implements ReturnsInt {
    public function returnsIntHinted(): int
    {
        if (foo()) {
            return 123;
        }
        // here: null is implicitly returned
    }
}
Loading history...
164
    }
165
166
    /**
167
     * Cleanup and exit.
168
     *
169
     * @param int $sig
170
     */
171
    public function cleanup(int $sig)
172
    {
173
        $this->handleSignal($sig);
174
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
175
    }
176
177
    /**
178
     * Catch signals and cleanup.
179
     *
180
     * @return Queue
181
     */
182 1
    protected function catchSignal(): self
183
    {
184 1
        pcntl_async_signals(true);
185 1
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
186 1
        pcntl_signal(SIGINT, [$this, 'cleanup']);
187
188 1
        return $this;
189
    }
190
191
    /**
192
     * Cleanup and exit.
193
     *
194
     * @param int $sig
195
     *
196
     * @return ObjectId
197
     */
198 2
    protected function handleSignal(int $sig): ?ObjectId
199
    {
200 2
        if (null === $this->current_job) {
201 2
            $this->logger->debug('received signal ['.$sig.'], no job is currently processing, exit now', [
202 2
                'category' => get_class($this),
203
            ]);
204
205 2
            return null;
206
        }
207
208
        $this->logger->debug('received signal ['.$sig.'], reschedule current processing job ['.$this->current_job['_id'].']', [
209
            'category' => get_class($this),
210
        ]);
211
212
        $this->updateJob($this->current_job['_id'], self::STATUS_CANCELED);
213
214
        return $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], [
215
            Scheduler::OPTION_AT => $this->current_job['retry_interval'],
216
            Scheduler::OPTION_INTERVAL => $this->current_job['interval'],
217
            Scheduler::OPTION_RETRY => --$this->current_job['retry'],
218
            Scheduler::OPTION_RETRY_INTERVAL => $this->current_job['retry_interval'],
219
        ]);
220
    }
221
222
    /**
223
     * Create queue and insert a dummy object to start cursor
224
     * Dummy object is required, otherwise we would get a dead cursor.
225
     *
226
     * @return Queue
227
     */
228 3
    protected function createQueue(): self
229
    {
230 3
        $this->logger->info('create new queue ['.$this->collection_name.']', [
231 3
            'category' => get_class($this),
232
        ]);
233
234
        try {
235 3
            $this->db->createCollection(
236 3
                $this->collection_name,
237
                [
238 3
                    'capped' => true,
239 3
                    'size' => $this->scheduler->getQueueSize(),
240
                ]
241
            );
242
243 2
            $this->db->{$this->collection_name}->insertOne(['class' => 'dummy']);
244 2
        } catch (RuntimeException $e) {
245 2
            if (48 !== $e->getCode()) {
246 1
                throw $e;
247
            }
248
        }
249
250 2
        return $this;
251
    }
252
253
    /**
254
     * Create queue and insert a dummy object to start cursor
255
     * Dummy object is required, otherwise we would get a dead cursor.
256
     *
257
     * @return Queue
258
     */
259 1
    protected function convertQueue(): self
260
    {
261 1
        $this->logger->info('convert existing queue collection ['.$this->collection_name.'] into a capped collection', [
262 1
            'category' => get_class($this),
263
        ]);
264
265 1
        $this->db->command([
266 1
            'convertToCapped' => $this->collection_name,
267 1
            'size' => $this->scheduler->getQueueSize(),
268
        ]);
269
270 1
        $this->db->{$this->collection_name}->insertOne(['class' => 'dummy']);
271
272 1
        return $this;
273
    }
274
275
    /**
276
     * Retrieve next job.
277
     *
278
     * @param iterable $cursor
279
     */
280 1
    protected function retrieveNextJob(Iterable $cursor)
281
    {
282
        try {
283 1
            $cursor->next();
284
        } catch (RuntimeException $e) {
285
            $this->logger->error('job queue cursor failed to retrieve next job, restart daemon', [
286
                'category' => get_class($this),
287
                'exception' => $e,
288
            ]);
289
290
            $this->process();
291
        }
292 1
    }
293
294
    /**
295
     * Queue job.
296
     *
297
     * @param array $job
298
     */
299 1
    protected function queueJob(array $job): bool
300
    {
301 1
        if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING)) {
302 1
            $this->processJob($job);
303
        } elseif (self::STATUS_POSTPONED === $job['status']) {
304
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
305
                'category' => get_class($this),
306
            ]);
307
308
            $this->queue[] = $job;
309
        }
310
311 1
        return true;
312
    }
313
314
    /**
315
     * Get cursor.
316
     *
317
     * @param bool $tailable
318
     *
319
     * @return IteratorIterator
320
     */
321 6
    protected function getCursor(bool $tailable = true): IteratorIterator
322
    {
323 6
        $options = ['typeMap' => Scheduler::TYPE_MAP];
324
325 6
        if (true === $tailable) {
326 4
            $options['cursorType'] = Find::TAILABLE;
327 4
            $options['noCursorTimeout'] = true;
328
        }
329
330
        try {
331 6
            $cursor = $this->db->{$this->collection_name}->find([
332
                '$or' => [
333 6
                    ['status' => self::STATUS_WAITING],
334 6
                    ['status' => self::STATUS_POSTPONED],
335
                ],
336 6
            ], $options);
337 1
        } catch (ConnectionException $e) {
338 1
            if (2 === $e->getCode()) {
339
                $this->convertQueue();
340
341
                return $this->getCursor($tailable);
342
            }
343
344 1
            throw $e;
345
        }
346
347 5
        $iterator = new IteratorIterator($cursor);
348 5
        $iterator->rewind();
349
350 5
        return $iterator;
351
    }
352
353
    /**
354
     * Update job status.
355
     *
356
     * @param ObjectId $id
357
     * @param int      $status
358
     * @param mixed    $from_status
359
     *
360
     * @return bool
361
     */
362 5
    protected function collectJob(ObjectId $id, int $status, $from_status = self::STATUS_WAITING): bool
363
    {
364
        $set = [
365 5
             'status' => $status,
366
        ];
367
368 5
        if (self::STATUS_PROCESSING === $status) {
369 5
            $set['started'] = new UTCDateTime();
370
        }
371
372 5
        $result = $this->db->{$this->collection_name}->updateMany([
373 5
            '_id' => $id,
374 5
            'status' => $from_status,
375
            '$isolated' => true,
376
        ], [
377 5
            '$set' => $set,
378
        ]);
379
380 5
        if (1 === $result->getModifiedCount()) {
381 5
            $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
382 5
                'category' => get_class($this),
383
            ]);
384
385 5
            return true;
386
        }
387
388 1
        $this->logger->debug('job ['.$id.'] is already collected with status ['.$status.']', [
389 1
            'category' => get_class($this),
390
        ]);
391
392 1
        return false;
393
    }
394
395
    /**
396
     * Update job status.
397
     *
398
     * @param ObjectId $id
399
     * @param int      $status
400
     *
401
     * @return bool
402
     */
403 13
    protected function updateJob(ObjectId $id, int $status): bool
404
    {
405
        $set = [
406 13
            'status' => $status,
407
        ];
408
409 13
        if ($status >= self::STATUS_DONE) {
410 10
            $set['ended'] = new UTCDateTime();
411
        }
412
413 13
        $result = $this->db->{$this->collection_name}->updateMany([
414 13
            '_id' => $id,
415
            '$isolated' => true,
416
        ], [
417 13
            '$set' => $set,
418
        ]);
419
420 13
        return $result->isAcknowledged();
421
    }
422
423
    /**
424
     * Check local queue for postponed jobs.
425
     *
426
     * @return bool
427
     */
428 4
    protected function processLocalQueue(): bool
429
    {
430 4
        $now = new UTCDateTime();
431 4
        foreach ($this->queue as $key => $job) {
432 3
            if ($job['at'] <= $now) {
433 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
434 2
                    'category' => get_class($this),
435
                ]);
436
437 2
                unset($this->queue[$key]);
438 2
                $job['at'] = null;
439
440 2
                if (true === $this->collectJob($job['_id'], self::STATUS_PROCESSING, self::STATUS_POSTPONED)) {
441 3
                    $this->processJob($job);
442
                }
443
            }
444
        }
445
446 4
        return true;
447
    }
448
449
    /**
450
     * Process job.
451
     *
452
     * @param array $job
453
     *
454
     * @return ObjectId
455
     */
456 8
    protected function processJob(array $job): ObjectId
457
    {
458 8
        if ($job['at'] instanceof UTCDateTime) {
459 1
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
460 1
            $this->queue[] = $job;
461
462 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
463 1
                'category' => get_class($this),
464
            ]);
465
466 1
            return $job['_id'];
467
        }
468
469 7
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
470 7
            'category' => get_class($this),
471 7
            'params' => $job['data'],
472
        ]);
473
474 7
        $this->current_job = $job;
475
476
        try {
477 7
            $this->executeJob($job);
478 2
            $this->current_job = null;
479 5
        } catch (\Exception $e) {
480 5
            $this->logger->error('failed execute job ['.$job['_id'].']', [
481 5
                'category' => get_class($this),
482 5
                'exception' => $e,
483
            ]);
484
485 5
            $this->updateJob($job['_id'], self::STATUS_FAILED);
486 5
            $this->current_job = null;
487
488 5
            if ($job['retry'] >= 0) {
489 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
490 1
                    'category' => get_class($this),
491
                ]);
492
493 1
                return $this->scheduler->addJob($job['class'], $job['data'], [
494 1
                    Scheduler::OPTION_AT => time() + $job['retry_interval'],
495 1
                    Scheduler::OPTION_INTERVAL => $job['interval'],
496 1
                    Scheduler::OPTION_RETRY => --$job['retry'],
497 1
                    Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
498
                ]);
499
            }
500
        }
501
502 6
        if ($job['interval'] >= 0) {
503 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['interval'].'s]', [
504 1
                'category' => get_class($this),
505
            ]);
506
507 1
            return $this->scheduler->addJob($job['class'], $job['data'], [
508 1
                Scheduler::OPTION_AT => time() + $job['interval'],
509 1
                Scheduler::OPTION_INTERVAL => $job['interval'],
510 1
                Scheduler::OPTION_RETRY => $job['retry'],
511 1
                Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
512
            ]);
513
        }
514
515 5
        return $job['_id'];
516
    }
517
518
    /**
519
     * Execute job.
520
     *
521
     * @param array $job
522
     *
523
     * @return bool
524
     */
525 11
    protected function executeJob(array $job): bool
526
    {
527 11
        if (!class_exists($job['class'])) {
528 4
            throw new Exception\InvalidJob('job class does not exists');
529
        }
530
531 7
        if (null === $this->container) {
532 6
            $instance = new $job['class']();
533
        } else {
534 1
            $instance = $this->container->get($job['class']);
535
        }
536
537 7
        if (!($instance instanceof JobInterface)) {
538
            throw new Exception\InvalidJob('job must implement JobInterface');
539
        }
540
541
        $instance
542 7
            ->setData($job['data'])
543 7
            ->setId($job['_id'])
544 7
            ->start();
545
546 4
        return $this->updateJob($job['_id'], self::STATUS_DONE);
547
    }
548
}
549