Worker::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 2
Bugs 0 Features 0
Metric Value
eloc 7
c 2
b 0
f 0
dl 0
loc 9
ccs 8
cts 8
cp 1
rs 10
cc 1
nc 1
nop 5
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Container\ContainerInterface;
19
use Psr\Log\LoggerInterface;
20
use TaskScheduler\Exception\InvalidJobException;
21
22
class Worker
23
{
24
    use InjectTrait;
25
26
    /**
27
     * Scheduler.
28
     *
29
     * @var Scheduler
30
     */
31
    protected $scheduler;
32
33
    /**
34
     * Database.
35
     *
36
     * @var Database
37
     */
38
    protected $db;
39
40
    /**
41
     * Logger.
42
     *
43
     * @var LoggerInterface
44
     */
45
    protected $logger;
46
47
    /**
48
     * Container.
49
     *
50
     * @var ContainerInterface
51
     */
52
    protected $container;
53
54
    /**
55
     * Local queue.
56
     *
57
     * @var array
58
     */
59
    protected $queue = [];
60
61
    /**
62
     * Current processing job.
63
     *
64
     * @var null|array
65
     */
66
    protected $current_job;
67
68
    /**
69
     * Process ID (fork posix pid).
70
     *
71
     * @var int
72
     */
73
    protected $process;
74
75
    /**
76
     * Jobs queue.
77
     *
78
     * @var MessageQueue
79
     */
80
    protected $jobs;
81
82
    /**
83
     * Worker ID.
84
     *
85
     * @var ObjectId
86
     */
87
    protected $id;
88
89
    /**
90
     * Init worker.
91
     */
92 29
    public function __construct(ObjectId $id, Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
93
    {
94 29
        $this->id = $id;
95 29
        $this->process = getmypid();
96 29
        $this->scheduler = $scheduler;
97 29
        $this->db = $db;
98 29
        $this->logger = $logger;
99 29
        $this->container = $container;
100 29
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
101 29
    }
102
103
    /**
104
     * Handle worker timeout.
105
     */
106 5
    public function timeout(): ?ObjectId
107
    {
108 5
        if (null === $this->current_job) {
109 2
            $this->logger->debug('reached worker timeout signal, no job is currently processing, ignore it', [
110 2
                'category' => get_class($this),
111 2
                'pm' => $this->process,
112
            ]);
113
114 2
            return null;
115
        }
116
117 3
        $this->logger->debug('received timeout signal, reschedule current processing job ['.$this->current_job['_id'].']', [
118 3
            'category' => get_class($this),
119 3
            'pm' => $this->process,
120
        ]);
121
122 3
        $this->updateJob($this->current_job, JobInterface::STATUS_TIMEOUT);
123
124 3
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
125 3
            'job' => $this->current_job['_id'],
126 3
            'worker' => $this->id,
127
            'status' => JobInterface::STATUS_TIMEOUT,
128 3
            'timestamp' => new UTCDateTime(),
129
        ]);
130
131 3
        $job = $this->current_job;
132
133 3
        if (0 !== $job['options']['retry']) {
134 1
            $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
135 1
                'category' => get_class($this),
136 1
                'pm' => $this->process,
137
            ]);
138
139 1
            --$job['options']['retry'];
140 1
            $job['options']['at'] = time() + $job['options']['retry_interval'];
141 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
142
143 1
            return $job->getId();
144
        }
145 2
        if ($job['options']['interval'] > 0) {
146 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
147 1
                'category' => get_class($this),
148 1
                'pm' => $this->process,
149
            ]);
150
151 1
            $job['options']['at'] = time() + $job['options']['interval'];
152 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
153
154 1
            return $job->getId();
155
        }
156 1
        if ($job['options']['interval'] <= -1) {
157
            $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [
158
                'category' => get_class($this),
159
                'pm' => $this->process,
160
            ]);
161
162
            unset($job['options']['at']);
163
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
164
165
            return $job->getId();
166
        }
167
168 1
        $this->current_job = null;
169 1
        posix_kill($this->process, SIGTERM);
170
171 1
        return null;
172
    }
173
174
    /**
175
     * Start worker.
176
     */
177 11
    public function processAll(): void
178
    {
179 11
        $this->logger->info('start job listener', [
180 11
            'category' => get_class($this),
181
        ]);
182
183 11
        $cursor = $this->jobs->getCursor([
184 11
            'options.force_spawn' => false,
185
            '$or' => [
186
                ['status' => JobInterface::STATUS_WAITING],
187
                ['status' => JobInterface::STATUS_POSTPONED],
188
            ],
189
        ]);
190
191 11
        $this->catchSignal();
192
193 11
        while ($this->loop()) {
194 11
            $this->processLocalQueue();
195
196 11
            if (null === $cursor->current()) {
197 2
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

197
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
198
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
199
                        'category' => get_class($this),
200
                        'pm' => $this->process,
201
                    ]);
202
203
                    $this->jobs->create();
204
205
                    $this->processAll();
206
207
                    break;
208
                }
209
210 2
                $this->jobs->next($cursor, function () {
211
                    $this->processAll();
212 2
                });
213
214 2
                continue;
215
            }
216
217 10
            $job = $cursor->current();
218
219 10
            $this->logger->debug('found job ['.$job['_id'].'] in queue with status ['.$job['status'].']', [
220 10
                'category' => get_class($this),
221
            ]);
222
223 10
            $this->jobs->next($cursor, function () {
224
                $this->processAll();
225 10
            });
226
227 10
            $this->queueJob($job);
228
        }
229 11
    }
230
231
    /**
232
     * Process one.
233
     */
234 7
    public function processOne(ObjectId $id): void
235
    {
236 7
        $this->catchSignal();
237
238 7
        $this->logger->debug('process job ['.$id.'] and exit', [
239 7
            'category' => get_class($this),
240
        ]);
241
242
        try {
243 7
            $job = $this->scheduler->getJob($id)->toArray();
244 7
            $this->queueJob($job);
245
        } catch (\Exception $e) {
246
            $this->logger->error('failed process job ['.$id.']', [
247
                'category' => get_class($this),
248
                'exception' => $e,
249
            ]);
250
        }
251 7
    }
252
253
    /**
254
     * Cleanup and exit.
255
     */
256 3
    public function cleanup()
257
    {
258 3
        $this->saveState();
259
260 3
        if (null === $this->current_job) {
261 2
            $this->logger->debug('received cleanup call on worker ['.$this->id.'], no job is currently processing, exit now', [
262 2
                'category' => get_class($this),
263 2
                'pm' => $this->process,
264
            ]);
265
266 2
            $this->exit();
267
268 2
            return null;
269
        }
270
271 1
        $this->logger->debug('received cleanup call on worker ['.$this->id.'], reschedule current processing job ['.$this->current_job['_id'].']', [
272 1
            'category' => get_class($this),
273 1
            'pm' => $this->process,
274
        ]);
275
276 1
        $this->updateJob($this->current_job, JobInterface::STATUS_CANCELED);
277
278 1
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
279 1
            'job' => $this->current_job['_id'],
280 1
            'worker' => $this->id,
281
            'status' => JobInterface::STATUS_CANCELED,
282 1
            'timestamp' => new UTCDateTime(),
283
        ]);
284
285 1
        $options = $this->current_job['options'];
286 1
        $options['at'] = 0;
287
288 1
        $result = $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], $options)->getId();
289 1
        $this->exit();
290
291 1
        return $result;
292
    }
293
294
    /**
295
     * Save local queue.
296
     */
297 3
    protected function saveState(): self
298
    {
299 3
        foreach ($this->queue as $key => $job) {
300 1
            $this->db->selectCollection($this->scheduler->getJobQueue())->updateOne(
301 1
                ['_id' => $job['_id'], '$isolated' => true],
302 1
                ['$setOnInsert' => $job],
303 1
                ['upsert' => true]
304
            );
305
        }
306
307 3
        return $this;
308
    }
309
310
    /**
311
     * Catch signals and cleanup.
312
     */
313 19
    protected function catchSignal(): self
314
    {
315 19
        pcntl_async_signals(true);
316 19
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
317 19
        pcntl_signal(SIGINT, [$this, 'cleanup']);
318 19
        pcntl_signal(SIGALRM, [$this, 'timeout']);
319
320 19
        return $this;
321
    }
322
323
    /**
324
     * Queue job.
325
     */
326 17
    protected function queueJob(array $job): bool
327
    {
328 17
        if (!isset($job['status'])) {
329
            return false;
330
        }
331
332 17
        if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING)) {
333 17
            $this->processJob($job);
334 1
        } elseif (JobInterface::STATUS_POSTPONED === $job['status']) {
335 1
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
336 1
                'category' => get_class($this),
337 1
                'pm' => $this->process,
338
            ]);
339
340 1
            $this->queue[(string) $job['_id']] = $job;
341
        }
342
343 17
        return true;
344
    }
345
346
    /**
347
     * Update job status.
348
     */
349 19
    protected function collectJob(array $job, int $status, $from_status = JobInterface::STATUS_WAITING): bool
350
    {
351
        $set = [
352 19
             'status' => $status,
353
        ];
354
355 19
        if (JobInterface::STATUS_PROCESSING === $status) {
356 19
            $set['started'] = new UTCDateTime();
357 19
            $set['worker'] = $this->id;
358
        }
359
360 19
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
361 19
            '_id' => $job['_id'],
362 19
            'status' => $from_status,
363
            '$isolated' => true,
364
        ], [
365 19
            '$set' => $set,
366
        ]);
367
368 19
        $this->logger->debug('collect job ['.$job['_id'].'] with status ['.$from_status.']', [
369 19
            'category' => get_class($this),
370 19
            'pm' => $this->process,
371
        ]);
372
373 19
        if (1 === $result->getModifiedCount()) {
374 19
            $this->logger->debug('job ['.$job['_id'].'] collected; update status to ['.$status.']', [
375 19
                'category' => get_class($this),
376 19
                'pm' => $this->process,
377
            ]);
378
379 19
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
380 19
                'job' => $job['_id'],
381 19
                'worker' => $this->id,
382 19
                'status' => $status,
383 19
                'timestamp' => new UTCDateTime(),
384
            ]);
385
386 19
            return true;
387
        }
388
389 2
        $this->logger->debug('job ['.$job['_id'].'] is already collected with status ['.$job['status'].']', [
390 2
            'category' => get_class($this),
391 2
            'pm' => $this->process,
392
        ]);
393
394 2
        return false;
395
    }
396
397
    /**
398
     * Update job status.
399
     */
400 22
    protected function updateJob(array $job, int $status): bool
401
    {
402
        $set = [
403 22
            'status' => $status,
404
        ];
405
406 22
        if ($status >= JobInterface::STATUS_DONE) {
407 18
            $set['ended'] = new UTCDateTime();
408
        }
409
410 22
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
411 22
            '_id' => $job['_id'],
412
            '$isolated' => true,
413
        ], [
414 22
            '$set' => $set,
415
        ]);
416
417 22
        return $result->isAcknowledged();
418
    }
419
420
    /**
421
     * Check local queue for postponed jobs.
422
     */
423 11
    protected function processLocalQueue(): bool
424
    {
425 11
        $now = time();
426 11
        foreach ($this->queue as $key => $job) {
427 2
            $this->db->{$this->scheduler->getJobQueue()}->updateOne(
428 2
                ['_id' => $job['_id'], '$isolated' => true],
429 2
                ['$setOnInsert' => $job],
430 2
                ['upsert' => true]
431
            );
432
433 2
            if ($job['options']['at'] <= $now) {
434 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
435 2
                    'category' => get_class($this),
436 2
                    'pm' => $this->process,
437
                ]);
438
439 2
                unset($this->queue[$key]);
440 2
                $job['options']['at'] = 0;
441
442 2
                if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING, JobInterface::STATUS_POSTPONED)) {
443 2
                    $this->processJob($job);
444
                }
445
            }
446
        }
447
448 11
        return true;
449
    }
450
451
    /**
452
     * Process job.
453
     */
454 17
    protected function processJob(array $job): ObjectId
455
    {
456 17
        $now = time();
457
458 17
        if ($job['options']['at'] > $now) {
459 5
            $this->updateJob($job, JobInterface::STATUS_POSTPONED);
460 5
            $job['status'] = JobInterface::STATUS_POSTPONED;
461 5
            $this->queue[(string) $job['_id']] = $job;
462
463 5
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['options']['at'].']', [
464 5
                'category' => get_class($this),
465 5
                'pm' => $this->process,
466
            ]);
467
468 5
            return $job['_id'];
469
        }
470
471 14
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].'] on worker ['.$this->id.']', [
472 14
            'category' => get_class($this),
473 14
            'pm' => $this->process,
474 14
            'options' => $job['options'],
475 14
            'params' => $job['data'],
476
        ]);
477
478 14
        $this->current_job = $job;
479 14
        pcntl_alarm($job['options']['timeout']);
480
481
        try {
482 14
            $this->executeJob($job);
483 8
            $this->current_job = null;
484 6
        } catch (\Throwable $e) {
485 6
            pcntl_alarm(0);
486
487 6
            $this->logger->error('failed execute job ['.$job['_id'].'] of type ['.$job['class'].'] on worker ['.$this->id.']', [
488 6
                'category' => get_class($this),
489 6
                'pm' => $this->process,
490 6
                'exception' => $e,
491
            ]);
492
493 6
            $this->updateJob($job, JobInterface::STATUS_FAILED);
494 6
            $this->current_job = null;
495
496 6
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
497 6
                'job' => $job['_id'],
498 6
                'worker' => $this->id,
499
                'status' => JobInterface::STATUS_FAILED,
500 6
                'timestamp' => new UTCDateTime(),
501
                'exception' => [
502 6
                    'class' => get_class($e),
503 6
                    'message' => $e->getMessage(),
504 6
                    'file' => $e->getFile(),
505 6
                    'line' => $e->getLine(),
506 6
                    'code' => $e->getCode(),
507
                ],
508
            ]);
509
510 6
            if (0 !== $job['options']['retry']) {
511 4
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
512 4
                    'category' => get_class($this),
513 4
                    'pm' => $this->process,
514
                ]);
515
516 4
                --$job['options']['retry'];
517 4
                $job['options']['at'] = time() + $job['options']['retry_interval'];
518 4
                $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
519
520 4
                return $job->getId();
521
            }
522
        }
523
524 11
        pcntl_alarm(0);
525
526 11
        if ($job['options']['interval'] > 0) {
527 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
528 1
                'category' => get_class($this),
529 1
                'pm' => $this->process,
530
            ]);
531
532 1
            $job['options']['at'] = time() + $job['options']['interval'];
533 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
534
535 1
            return $job->getId();
536
        }
537 10
        if ($job['options']['interval'] <= -1) {
538 1
            $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [
539 1
                'category' => get_class($this),
540 1
                'pm' => $this->process,
541
            ]);
542
543 1
            unset($job['options']['at']);
544 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
545
546 1
            return $job->getId();
547
        }
548
549 9
        return $job['_id'];
550
    }
551
552
    /**
553
     * Execute job.
554
     */
555 15
    protected function executeJob(array $job): bool
556
    {
557 15
        if (!class_exists($job['class'])) {
558 1
            throw new InvalidJobException('job class does not exists');
559
        }
560
561 14
        if (null === $this->container) {
562 13
            $instance = new $job['class']();
563
        } else {
564 1
            $instance = $this->container->get($job['class']);
565
        }
566
567 14
        if (!($instance instanceof JobInterface)) {
568
            throw new InvalidJobException('job must implement JobInterface');
569
        }
570
571
        $instance
572 14
            ->setData($job['data'])
573 14
            ->setId($job['_id'])
574 14
            ->start();
575
576 8
        $return = $this->updateJob($job, JobInterface::STATUS_DONE);
577
578 8
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
579 8
            'job' => $job['_id'],
580 8
            'worker' => $this->id,
581
            'status' => JobInterface::STATUS_DONE,
582 8
            'timestamp' => new UTCDateTime(),
583
        ]);
584
585 8
        unset($instance);
586
587 8
        return $return;
588
    }
589
}
590