Completed
Branch dev (6d5d8c)
by Raffael
22:30 queued 21:18
created

Worker::queueJob()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 4.0119

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 18
ccs 10
cts 11
cp 0.9091
rs 9.9332
c 0
b 0
f 0
cc 4
nc 4
nop 1
crap 4.0119
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Container\ContainerInterface;
19
use Psr\Log\LoggerInterface;
20
use TaskScheduler\Exception\InvalidJobException;
21
22
class Worker extends AbstractHandler
23
{
24
    /**
25
     * Scheduler.
26
     *
27
     * @var Scheduler
28
     */
29
    protected $scheduler;
30
31
    /**
32
     * Database.
33
     *
34
     * @var Database
35
     */
36
    protected $db;
37
38
    /**
39
     * Logger.
40
     *
41
     * @var LoggerInterface
42
     */
43
    protected $logger;
44
45
    /**
46
     * Container.
47
     *
48
     * @var ContainerInterface
49
     */
50
    protected $container;
51
52
    /**
53
     * Local queue.
54
     *
55
     * @var array
56
     */
57
    protected $queue = [];
58
59
    /**
60
     * Current processing job.
61
     *
62
     * @var null|array
63
     */
64
    protected $current_job;
65
66
    /**
67
     * Process ID (fork posix pid).
68
     *
69
     * @var int
70
     */
71
    protected $process;
72
73
    /**
74
     * Jobs queue.
75
     *
76
     * @var MessageQueue
77
     */
78
    protected $jobs;
79
80
    /**
81
     * Worker ID.
82
     *
83
     * @var ObjectId
84
     */
85
    protected $id;
86
87
    /**
88
     * Init worker.
89
     */
90 29
    public function __construct(ObjectId $id, Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
91
    {
92 29
        $this->id = $id;
93 29
        $this->process = getmypid();
94 29
        $this->scheduler = $scheduler;
95 29
        $this->db = $db;
96 29
        $this->logger = $logger;
97 29
        $this->container = $container;
98 29
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
99 29
    }
100
101
    /**
102
     * Handle worker timeout.
103
     */
104 5
    public function timeout(): ?ObjectId
105
    {
106 5
        if (null === $this->current_job) {
107 2
            $this->logger->debug('reached worker timeout signal, no job is currently processing, ignore it', [
108 2
                'category' => get_class($this),
109 2
                'pm' => $this->process,
110
            ]);
111
112 2
            return null;
113
        }
114
115 3
        $this->logger->debug('received timeout signal, reschedule current processing job ['.$this->current_job['_id'].']', [
116 3
            'category' => get_class($this),
117 3
            'pm' => $this->process,
118
        ]);
119
120 3
        $this->updateJob($this->current_job, JobInterface::STATUS_TIMEOUT);
121
122 3
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
123 3
            'job' => $this->current_job['_id'],
124 3
            'worker' => $this->id,
125
            'status' => JobInterface::STATUS_TIMEOUT,
126 3
            'timestamp' => new UTCDateTime(),
127
        ]);
128
129 3
        $job = $this->current_job;
130
131 3
        if (0 !== $job['options']['retry']) {
132 1
            $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
133 1
                'category' => get_class($this),
134 1
                'pm' => $this->process,
135
            ]);
136
137 1
            --$job['options']['retry'];
138 1
            $job['options']['at'] = time() + $job['options']['retry_interval'];
139 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
140
141 1
            return $job->getId();
142
        }
143 2
        if ($job['options']['interval'] > 0) {
144 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
145 1
                'category' => get_class($this),
146 1
                'pm' => $this->process,
147
            ]);
148
149 1
            $job['options']['at'] = time() + $job['options']['interval'];
150 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
151
152 1
            return $job->getId();
153
        }
154 1
        if ($job['options']['interval'] <= -1) {
155
            $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [
156
                'category' => get_class($this),
157
                'pm' => $this->process,
158
            ]);
159
160
            unset($job['options']['at']);
161
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
162
163
            return $job->getId();
164
        }
165
166 1
        $this->current_job = null;
167 1
        posix_kill($this->process, SIGTERM);
168
169 1
        return null;
170
    }
171
172
    /**
173
     * Start worker.
174
     */
175 18
    public function start(): void
176
    {
177 18
        $cursor = $this->jobs->getCursor([
178 18
            '$or' => [
179
                ['status' => JobInterface::STATUS_WAITING],
180
                ['status' => JobInterface::STATUS_POSTPONED],
181
            ],
182
        ]);
183
184 18
        $this->catchSignal();
185
186 18
        while ($this->loop()) {
187 18
            $this->processLocalQueue();
188
189 18
            if (null === $cursor->current()) {
190 2
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

190
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
191
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
192
                        'category' => get_class($this),
193
                        'pm' => $this->process,
194
                    ]);
195
196
                    $this->jobs->create();
197
198
                    $this->start();
199
200
                    break;
201
                }
202
203 2
                $this->jobs->next($cursor, function () {
204
                    $this->start();
205 2
                });
206
207 2
                continue;
208
            }
209
210 17
            $job = $cursor->current();
211
212 17
            $this->logger->debug('found job ['.$job['_id'].'] in queue with status ['.$job['status'].']', [
213 17
                'category' => get_class($this),
214
            ]);
215
216 17
            $this->jobs->next($cursor, function () {
217
                $this->start();
218 17
            });
219
220 17
            $this->queueJob($job);
221
        }
222 18
    }
223
224
    /**
225
     * Cleanup and exit.
226
     */
227 3
    public function cleanup()
228
    {
229 3
        $this->saveState();
230
231 3
        if (null === $this->current_job) {
232 2
            $this->logger->debug('received cleanup call on worker ['.$this->id.'], no job is currently processing, exit now', [
233 2
                'category' => get_class($this),
234 2
                'pm' => $this->process,
235
            ]);
236
237 2
            $this->exit();
238
239 2
            return null;
240
        }
241
242 1
        $this->logger->debug('received cleanup call on worker ['.$this->id.'], reschedule current processing job ['.$this->current_job['_id'].']', [
243 1
            'category' => get_class($this),
244 1
            'pm' => $this->process,
245
        ]);
246
247 1
        $this->updateJob($this->current_job, JobInterface::STATUS_CANCELED);
248
249 1
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
250 1
            'job' => $this->current_job['_id'],
251 1
            'worker' => $this->id,
252
            'status' => JobInterface::STATUS_CANCELED,
253 1
            'timestamp' => new UTCDateTime(),
254
        ]);
255
256 1
        $options = $this->current_job['options'];
257 1
        $options['at'] = 0;
258
259 1
        $result = $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], $options)->getId();
260 1
        $this->exit();
261
262 1
        return $result;
263
    }
264
265
    /**
266
     * Save local queue.
267
     */
268 3
    protected function saveState(): self
269
    {
270 3
        foreach ($this->queue as $key => $job) {
271 1
            $this->db->selectCollection($this->scheduler->getJobQueue())->updateOne(
272 1
                ['_id' => $job['_id'], '$isolated' => true],
273 1
                ['$setOnInsert' => $job],
274 1
                ['upsert' => true]
275
            );
276
        }
277
278 3
        return $this;
279
    }
280
281
    /**
282
     * Catch signals and cleanup.
283
     */
284 19
    protected function catchSignal(): self
285
    {
286 19
        pcntl_async_signals(true);
287 19
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
288 19
        pcntl_signal(SIGINT, [$this, 'cleanup']);
289 19
        pcntl_signal(SIGALRM, [$this, 'timeout']);
290
291 19
        return $this;
292
    }
293
294
    /**
295
     * Queue job.
296
     */
297 17
    protected function queueJob(array $job): bool
298
    {
299 17
        if (!isset($job['status'])) {
300
            return false;
301
        }
302
303 17
        if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING)) {
304 17
            $this->processJob($job);
305 1
        } elseif (JobInterface::STATUS_POSTPONED === $job['status']) {
306 1
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
307 1
                'category' => get_class($this),
308 1
                'pm' => $this->process,
309
            ]);
310
311 1
            $this->queue[(string) $job['_id']] = $job;
312
        }
313
314 17
        return true;
315
    }
316
317
    /**
318
     * Update job status.
319
     */
320 19
    protected function collectJob(array $job, int $status, $from_status = JobInterface::STATUS_WAITING): bool
321
    {
322
        $set = [
323 19
             'status' => $status,
324
        ];
325
326 19
        if (JobInterface::STATUS_PROCESSING === $status) {
327 19
            $set['started'] = new UTCDateTime();
328 19
            $set['worker'] = $this->id;
329
        }
330
331 19
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
332 19
            '_id' => $job['_id'],
333 19
            'status' => $from_status,
334
            '$isolated' => true,
335
        ], [
336 19
            '$set' => $set,
337
        ]);
338
339 19
        $this->logger->debug('collect job ['.$job['_id'].'] with status ['.$from_status.']', [
340 19
            'category' => get_class($this),
341 19
            'pm' => $this->process,
342
        ]);
343
344 19
        if (1 === $result->getModifiedCount()) {
345 19
            $this->logger->debug('job ['.$job['_id'].'] collected; update status to ['.$status.']', [
346 19
                'category' => get_class($this),
347 19
                'pm' => $this->process,
348
            ]);
349
350 19
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
351 19
                'job' => $job['_id'],
352 19
                'worker' => $this->id,
353 19
                'status' => $status,
354 19
                'timestamp' => new UTCDateTime(),
355
            ]);
356
357 19
            return true;
358
        }
359
360 2
        $this->logger->debug('job ['.$job['_id'].'] is already collected with status ['.$status.']', [
361 2
            'category' => get_class($this),
362 2
            'pm' => $this->process,
363
        ]);
364
365 2
        return false;
366
    }
367
368
    /**
369
     * Update job status.
370
     */
371 22
    protected function updateJob(array $job, int $status): bool
372
    {
373
        $set = [
374 22
            'status' => $status,
375
        ];
376
377 22
        if ($status >= JobInterface::STATUS_DONE) {
378 18
            $set['ended'] = new UTCDateTime();
379
        }
380
381 22
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
382 22
            '_id' => $job['_id'],
383
            '$isolated' => true,
384
        ], [
385 22
            '$set' => $set,
386
        ]);
387
388 22
        return $result->isAcknowledged();
389
    }
390
391
    /**
392
     * Check local queue for postponed jobs.
393
     */
394 18
    protected function processLocalQueue(): bool
395
    {
396 18
        $now = time();
397 18
        foreach ($this->queue as $key => $job) {
398 2
            $this->db->{$this->scheduler->getJobQueue()}->updateOne(
399 2
                ['_id' => $job['_id'], '$isolated' => true],
400 2
                ['$setOnInsert' => $job],
401 2
                ['upsert' => true]
402
            );
403
404 2
            if ($job['options']['at'] <= $now) {
405 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
406 2
                    'category' => get_class($this),
407 2
                    'pm' => $this->process,
408
                ]);
409
410 2
                unset($this->queue[$key]);
411 2
                $job['options']['at'] = 0;
412
413 2
                if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING, JobInterface::STATUS_POSTPONED)) {
414 2
                    $this->processJob($job);
415
                }
416
            }
417
        }
418
419 18
        return true;
420
    }
421
422
    /**
423
     * Process job.
424
     */
425 17
    protected function processJob(array $job): ObjectId
426
    {
427 17
        $now = time();
428
429 17
        if ($job['options']['at'] > $now) {
430 5
            $this->updateJob($job, JobInterface::STATUS_POSTPONED);
431 5
            $job['status'] = JobInterface::STATUS_POSTPONED;
432 5
            $this->queue[(string) $job['_id']] = $job;
433
434 5
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['options']['at'].']', [
435 5
                'category' => get_class($this),
436 5
                'pm' => $this->process,
437
            ]);
438
439 5
            return $job['_id'];
440
        }
441
442 14
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].'] on worker ['.$this->id.']', [
443 14
            'category' => get_class($this),
444 14
            'pm' => $this->process,
445 14
            'options' => $job['options'],
446 14
            'params' => $job['data'],
447
        ]);
448
449 14
        $this->current_job = $job;
450 14
        pcntl_alarm($job['options']['timeout']);
451
452
        try {
453 14
            $this->executeJob($job);
454 8
            $this->current_job = null;
455 6
        } catch (\Exception $e) {
456 6
            pcntl_alarm(0);
457
458 6
            $this->logger->error('failed execute job ['.$job['_id'].'] of type ['.$job['class'].'] on worker ['.$this->id.']', [
459 6
                'category' => get_class($this),
460 6
                'pm' => $this->process,
461 6
                'exception' => $e,
462
            ]);
463
464 6
            $this->updateJob($job, JobInterface::STATUS_FAILED);
465 6
            $this->current_job = null;
466
467 6
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
468 6
                'job' => $job['_id'],
469 6
                'worker' => $this->id,
470
                'status' => JobInterface::STATUS_FAILED,
471 6
                'timestamp' => new UTCDateTime(),
472
                'exception' => [
473 6
                    'class' => get_class($e),
474 6
                    'message' => $e->getMessage(),
475 6
                    'file' => $e->getFile(),
476 6
                    'line' => $e->getLine(),
477 6
                    'code' => $e->getCode(),
478
                ],
479
            ]);
480
481 6
            if (0 !== $job['options']['retry']) {
482 4
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
483 4
                    'category' => get_class($this),
484 4
                    'pm' => $this->process,
485
                ]);
486
487 4
                --$job['options']['retry'];
488 4
                $job['options']['at'] = time() + $job['options']['retry_interval'];
489 4
                $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
490
491 4
                return $job->getId();
492
            }
493
        }
494
495 11
        pcntl_alarm(0);
496
497 11
        if ($job['options']['interval'] > 0) {
498 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
499 1
                'category' => get_class($this),
500 1
                'pm' => $this->process,
501
            ]);
502
503 1
            $job['options']['at'] = time() + $job['options']['interval'];
504 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
505
506 1
            return $job->getId();
507
        }
508 10
        if ($job['options']['interval'] <= -1) {
509 1
            $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [
510 1
                'category' => get_class($this),
511 1
                'pm' => $this->process,
512
            ]);
513
514 1
            unset($job['options']['at']);
515 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
516
517 1
            return $job->getId();
518
        }
519
520 9
        return $job['_id'];
521
    }
522
523
    /**
524
     * Execute job.
525
     */
526 15
    protected function executeJob(array $job): bool
527
    {
528 15
        if (!class_exists($job['class'])) {
529 1
            throw new InvalidJobException('job class does not exists');
530
        }
531
532 14
        if (null === $this->container) {
533 13
            $instance = new $job['class']();
534
        } else {
535 1
            $instance = $this->container->get($job['class']);
536
        }
537
538 14
        if (!($instance instanceof JobInterface)) {
539
            throw new InvalidJobException('job must implement JobInterface');
540
        }
541
542
        $instance
543 14
            ->setData($job['data'])
544 14
            ->setId($job['_id'])
545 14
            ->start();
546
547 8
        $return = $this->updateJob($job, JobInterface::STATUS_DONE);
548
549 8
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
550 8
            'job' => $job['_id'],
551 8
            'worker' => $this->id,
552
            'status' => JobInterface::STATUS_DONE,
553 8
            'timestamp' => new UTCDateTime(),
554
        ]);
555
556 8
        unset($instance);
557
558 8
        return $return;
559
    }
560
}
561