Passed
Push — master ( 8f7a9d...8d41c2 )
by Raffael
03:49
created

Async::processLocalQueue()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 9
nc 3
nop 0
dl 0
loc 17
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use IteratorIterator;
16
use MongoDB\BSON\ObjectId;
17
use MongoDB\BSON\UTCDateTime;
18
use MongoDB\Database;
19
use MongoDB\Driver\Cursor;
20
use MongoDB\Operation\Find;
21
use Psr\Container\ContainerInterface;
22
use Psr\Log\LoggerInterface;
23
24
class Async
25
{
26
    /**
27
     * Job status.
28
     */
29
    const STATUS_WAITING = 0;
30
    const STATUS_POSTPONED = 1;
31
    const STATUS_PROCESSING = 2;
32
    const STATUS_DONE = 3;
33
    const STATUS_FAILED = 4;
34
35
    /**
36
     * Job options.
37
     */
38
    const OPTION_AT = 'at';
39
    const OPTION_INTERVAL = 'interval';
40
    const OPTION_RETRY = 'retry';
41
    const OPTION_RETRY_INTERVAL = 'retry_interval';
42
43
    /**
44
     * Database.
45
     *
46
     * @var Database
47
     */
48
    protected $db;
49
50
    /**
51
     * LoggerInterface.
52
     *
53
     * @var LoggerInterface
54
     */
55
    protected $logger;
56
57
    /**
58
     * Local queue.
59
     *
60
     * @var array
61
     */
62
    protected $queue = [];
63
64
    /**
65
     * Collection name.
66
     *
67
     * @var string
68
     */
69
    protected $collection_name = 'queue';
70
71
    /**
72
     * Container.
73
     *
74
     * @var ContainerInterface
75
     */
76
    protected $container;
77
78
    /**
79
     * Default at (Secconds from now).
80
     *
81
     * @var int
82
     */
83
    protected $default_at = 0;
84
85
    /**
86
     * Default interval (secconds).
87
     *
88
     * @var int
89
     */
90
    protected $default_interval = -1;
91
92
    /**
93
     * Default retry.
94
     *
95
     * @var int
96
     */
97
    protected $default_retry = 0;
98
99
    /**
100
     * Default retry interval (secconds).
101
     *
102
     * @var int
103
     */
104
    protected $default_retry_interval = 300;
105
106
    /**
107
     * Queue size.
108
     *
109
     * @var int
110
     */
111
    protected $queue_size = 100000;
112
113
    /**
114
     * Init queue.
115
     *
116
     * @param Database           $db
117
     * @param LoggerInterface    $logger
118
     * @param ContainerInterface $container
119
     * @param iterable           $config
120
     */
121
    public function __construct(Database $db, LoggerInterface $logger, ?ContainerInterface $container = null, ?Iterable $config = null)
122
    {
123
        $this->db = $db;
124
        $this->logger = $logger;
125
        $this->container = $container;
126
        $this->setOptions($config);
127
    }
128
129
    /**
130
     * Set options.
131
     *
132
     * @param iterable $config
133
     *
134
     * @return Async
135
     */
136
    public function setOptions(? Iterable $config = null): self
137
    {
138
        if (null === $config) {
139
            return $this;
140
        }
141
142
        foreach ($config as $option => $value) {
143
            switch ($option) {
144
                case 'collection_name':
145
                    $this->{$option} = (string) $value;
146
147
                break;
148
                case 'default_retry':
149
                case 'default_at':
150
                case 'default_retry_interval':
151
                case 'default_interval':
152
                case 'queue_size':
153
                    $this->{$option} = (int) $value;
154
155
                break;
156
                default:
157
                    throw new Exception('invalid option '.$option.' given');
158
            }
159
        }
160
161
        return $this;
162
    }
163
164
    /**
165
     * Create queue collection.
166
     *
167
     * @return Async
168
     */
169
    public function createQueue(): self
170
    {
171
        $this->db->createCollection(
172
            $this->collection_name,
173
            [
174
                'capped' => true,
175
                'size' => $this->queue_size,
176
            ]
177
        );
178
179
        return $this;
180
    }
181
182
    /**
183
     * Validate given job options.
184
     *
185
     * @param array $options
186
     *
187
     * @return Async
188
     */
189
    public function validateOptions(array $options): self
190
    {
191
        foreach ($options as $option => $value) {
192
            switch ($option) {
193
                case self::OPTION_AT:
194
                case self::OPTION_RETRY:
195
                case self::OPTION_RETRY_INTERVAL:
196
                case self::OPTION_INTERVAL:
197
                    if (!is_int($value)) {
198
                        throw new Exception('option '.$option.' must be an integer');
199
                    }
200
201
                break;
202
                default:
203
                    throw new Exception('invalid option '.$option.' given');
204
            }
205
        }
206
207
        return $this;
208
    }
209
210
    /**
211
     * Add job to queue.
212
     *
213
     * @param string $class
214
     * @param mixed  $data
215
     * @param array  $options
216
     *
217
     * @return bool
218
     */
219
    public function addJob(string $class, $data, array $options = []): bool
220
    {
221
        $defaults = [
222
            self::OPTION_AT => $this->default_at,
223
            self::OPTION_INTERVAL => $this->default_interval,
224
            self::OPTION_RETRY => $this->default_retry,
225
            self::OPTION_RETRY_INTERVAL => $this->default_retry_interval,
226
        ];
227
228
        $options = array_merge($defaults, $options);
229
        $this->validateOptions($options);
230
231
        if ($options[self::OPTION_AT] > 0) {
232
            $at = new UTCDateTime($options[self::OPTION_AT] * 1000);
233
        } else {
234
            $at = null;
235
        }
236
237
        $result = $this->db->{$this->collection_name}->insertOne([
238
            'class' => $class,
239
            'status' => self::STATUS_WAITING,
240
            'timestamp' => new UTCDateTime(),
241
            'at' => $at,
242
            'retry' => $options[self::OPTION_RETRY],
243
            'retry_interval' => $options[self::OPTION_RETRY_INTERVAL],
244
            'interval' => $options[self::OPTION_INTERVAL],
245
            'data' => $data,
246
        ]);
247
248
        $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [
249
            'category' => get_class($this),
250
            'params' => $options,
251
            'data' => $data,
252
        ]);
253
254
        return $result->isAcknowledged();
255
    }
256
257
    /**
258
     * Only add job if not in queue yet.
259
     *
260
     * @param string $class
261
     * @param mixed  $data
262
     * @param array  $options
263
     *
264
     * @return bool
265
     */
266
    public function addJobOnce(string $class, $data, array $options = []): bool
267
    {
268
        $filter = [
269
            'class' => $class,
270
            'data' => $data,
271
            '$or' => [
272
                ['status' => self::STATUS_WAITING],
273
                ['status' => self::STATUS_POSTPONED],
274
            ],
275
        ];
276
277
        $result = $this->db->queue->findOne($filter);
278
279
        if (null === $result) {
280
            return $this->addJob($class, $data, $options);
281
        }
282
        $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [
283
                'category' => get_class($this),
284
                'data' => $data,
285
            ]);
286
287
        return true;
288
    }
289
290
    /**
291
     * Execute job queue as endless loop.
292
     *
293
     * @return bool
294
     */
295
    public function startDaemon(): bool
296
    {
297
        $cursor = $this->getCursor();
298
299
        while (true) {
300
            $this->processLocalQueue();
301
302
            if (null === $cursor->current()) {
303
                if ($cursor->getInnerIterator()->isDead()) {
304
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
305
                        'category' => get_class($this),
306
                    ]);
307
308
                    return $this->startDaemon();
309
                }
310
311
                $cursor->next();
312
313
                continue;
314
            }
315
316
            $job = $cursor->current();
317
            $cursor->next();
318
            $this->processJob($job);
319
        }
320
    }
321
322
    /**
323
     * Execute job queue.
324
     *
325
     * @return bool
326
     */
327
    public function startOnce(): bool
328
    {
329
        $cursor = $this->getCursor(false);
330
331
        while (true) {
332
            $this->processLocalQueue();
333
334
            if (null === $cursor->current()) {
335
                if ($cursor->getInnerIterator()->isDead()) {
336
                    $this->logger->debug('all jobs were processed', [
337
                        'category' => get_class($this),
338
                    ]);
339
340
                    return false;
341
                }
342
343
                return true;
344
            }
345
346
            $job = $cursor->current();
347
            $cursor->next();
348
            $this->processJob($job);
349
        }
350
    }
351
352
    /**
353
     * Get cursor.
354
     *
355
     * @param bool $tailable
356
     *
357
     * @return IteratorIterator
358
     */
359
    protected function getCursor(bool $tailable = true): IteratorIterator
360
    {
361
        $options = [];
362
        if (true === $tailable) {
363
            $options['cursorType'] = Find::TAILABLE;
364
            $options['noCursorTimeout'] = true;
365
        }
366
367
        $cursor = $this->db->{$this->collection_name}->find([
368
            '$or' => [
369
                ['status' => self::STATUS_WAITING],
370
                ['status' => self::STATUS_POSTPONED,
371
                 'at' => ['$gte' => new UTCDateTime()], ],
372
            ],
373
        ], $options);
374
375
        $iterator = new IteratorIterator($cursor);
376
        $iterator->rewind();
377
378
        return $iterator;
379
    }
380
381
    /**
382
     * Update job status.
383
     *
384
     * @param ObjectId $id
385
     * @param int      $status
386
     *
387
     * @return bool
388
     */
389
    protected function updateJob(ObjectId $id, int $status): bool
390
    {
391
        $result = $this->db->{$this->collection_name}->updateMany(['_id' => $id, '$isolated' => true], ['$set' => [
392
            'status' => $status,
393
            'timestamp' => new UTCDateTime(),
394
        ]]);
395
396
        $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [
397
            'category' => get_class($this),
398
        ]);
399
400
        return $result->isAcknowledged();
401
    }
402
403
    /**
404
     * Check local queue for postponed jobs.
405
     *
406
     * @return bool
407
     */
408
    protected function processLocalQueue()
409
    {
410
        $now = new UTCDateTime();
411
        foreach ($this->queue as $key => $job) {
412
            if ($job['at'] <= $now) {
413
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
414
                    'category' => get_class($this),
415
                ]);
416
417
                unset($this->queue[$key]);
418
                $job['at'] = null;
419
420
                $this->processJob($job);
421
            }
422
        }
423
424
        return true;
425
    }
426
427
    /**
428
     * Process job.
429
     *
430
     * @param array $job
431
     *
432
     * @return bool
433
     */
434
    protected function processJob(array $job): bool
435
    {
436
        if ($job['at'] instanceof UTCDateTime) {
437
            $this->updateJob($job['_id'], self::STATUS_POSTPONED);
438
            $this->queue[] = $job;
439
440
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at'].']', [
441
                'category' => get_class($this),
442
            ]);
443
444
            return true;
445
        }
446
447
        $this->updateJob($job['_id'], self::STATUS_PROCESSING);
448
449
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
450
            'category' => get_class($this),
451
            'params' => $job['data'],
452
        ]);
453
454
        try {
455
            $this->executeJob($job);
456
        } catch (\Exception $e) {
457
            $this->logger->error('failed execute job ['.$job['_id'].']', [
458
                'category' => get_class($this),
459
                'exception' => $e,
460
            ]);
461
462
            $this->updateJob($job['_id'], self::STATUS_FAILED);
463
464
            if ($job['retry'] > 0) {
465
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
466
                    'category' => get_class($this),
467
                ]);
468
469
                $this->addJob($job['class'], $job['data'], [
470
                    self::OPTION_AT => time() + $job['retry_interval'],
471
                    self::OPTION_INTERVAL => $job['interval'],
472
                    self::OPTION_RETRY => --$job['retry'],
473
                    self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
474
                ]);
475
            }
476
        }
477
478
        if ($job['interval'] >= 0) {
479
            $this->addJob($job['class'], $job['data'], [
480
                self::OPTION_AT => time() + $job['interval'],
481
                self::OPTION_INTERVAL => $job['interval'],
482
                self::OPTION_RETRY => $job['retry'],
483
                self::OPTION_RETRY_INTERVAL => $job['retry_interval'],
484
            ]);
485
        }
486
487
        return true;
488
    }
489
490
    /**
491
     * Execute job.
492
     *
493
     * @param array $job
494
     *
495
     * @return bool
496
     */
497
    protected function executeJob(array $job): bool
498
    {
499
        if (!class_exists($job['class'])) {
500
            throw new Exception('job class does not exists');
501
        }
502
503
        if (null === $this->container) {
504
            $instance = new $job['class']();
505
        } else {
506
            $instance = $this->container->get($job['class']);
507
        }
508
509
        if (!($instance instanceof JobInterface)) {
510
            throw new Exception('job must implement JobInterface');
511
        }
512
513
        $instance->setData($job['data'])
514
            ->start();
515
516
        return $this->updateJob($job['_id'], self::STATUS_DONE);
517
    }
518
}
519