Passed
Branch dev (7112df)
by Raffael
03:08
created

Worker::saveState()   A

Complexity

Conditions 5
Paths 10

Size

Total Lines 21
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 6.4222

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 21
ccs 8
cts 13
cp 0.6153
rs 9.5222
c 0
b 0
f 0
cc 5
nc 10
nop 0
crap 6.4222
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use MongoDB\Driver\Exception\BulkWriteException;
19
use Psr\Container\ContainerInterface;
20
use Psr\Log\LoggerInterface;
21
use TaskScheduler\Exception\InvalidJobException;
22
23
class Worker extends AbstractHandler
24
{
25
    /**
26
     * Scheduler.
27
     *
28
     * @var Scheduler
29
     */
30
    protected $scheduler;
31
32
    /**
33
     * Database.
34
     *
35
     * @var Database
36
     */
37
    protected $db;
38
39
    /**
40
     * Logger.
41
     *
42
     * @var LoggerInterface
43
     */
44
    protected $logger;
45
46
    /**
47
     * Container.
48
     *
49
     * @var ContainerInterface
50
     */
51
    protected $container;
52
53
    /**
54
     * Local queue.
55
     *
56
     * @var array
57
     */
58
    protected $queue = [];
59
60
    /**
61
     * Current processing job.
62
     *
63
     * @var null|array
64
     */
65
    protected $current_job;
66
67
    /**
68
     * Process ID (fork posix pid).
69
     *
70
     * @var int
71
     */
72
    protected $process;
73
74
    /**
75
     * Jobs queue.
76
     *
77
     * @var MessageQueue
78
     */
79
    protected $jobs;
80
81
    /**
82
     * Worker ID.
83
     *
84
     * @var ObjectId
85
     */
86
    protected $id;
87
88
    /**
89
     * Init worker.
90
     */
91 28
    public function __construct(ObjectId $id, Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
92
    {
93 28
        $this->id = $id;
94 28
        $this->process = getmypid();
95 28
        $this->scheduler = $scheduler;
96 28
        $this->db = $db;
97 28
        $this->logger = $logger;
98 28
        $this->container = $container;
99 28
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
100 28
    }
101
102
    /**
103
     * Handle worker timeout.
104
     */
105 5
    public function timeout(): ?ObjectId
106
    {
107 5
        if (null === $this->current_job) {
108 2
            $this->logger->debug('reached worker timeout signal, no job is currently processing, ignore it', [
109 2
                'category' => get_class($this),
110 2
                'pm' => $this->process,
111
            ]);
112
113 2
            return null;
114
        }
115
116 3
        $this->logger->debug('received timeout signal, reschedule current processing job ['.$this->current_job['_id'].']', [
117 3
            'category' => get_class($this),
118 3
            'pm' => $this->process,
119
        ]);
120
121 3
        $this->updateJob($this->current_job, JobInterface::STATUS_TIMEOUT);
122
123 3
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
124 3
            'job' => $this->current_job['_id'],
125 3
            'worker' => $this->id,
126
            'status' => JobInterface::STATUS_TIMEOUT,
127 3
            'timestamp' => new UTCDateTime(),
128
        ]);
129
130 3
        $job = $this->current_job;
131
132 3
        if (0 !== $job['options']['retry']) {
133 1
            $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
134 1
                'category' => get_class($this),
135 1
                'pm' => $this->process,
136
            ]);
137
138 1
            --$job['options']['retry'];
139 1
            $job['options']['at'] = time() + $job['options']['at'];
140 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
141
142 1
            return $job->getId();
143
        }
144 2
        if ($job['options']['interval'] > 0) {
145 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
146 1
                'category' => get_class($this),
147 1
                'pm' => $this->process,
148
            ]);
149
150 1
            $job['options']['at'] = time() + $job['options']['interval'];
151 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
152
153 1
            return $job->getId();
154
        }
155 1
        if ($job['options']['interval'] <= -1) {
156
            $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [
157
                'category' => get_class($this),
158
                'pm' => $this->process,
159
            ]);
160
161
            unset($job['options']['at']);
162
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
163
164
            return $job->getId();
165
        }
166
167 1
        $this->current_job = null;
168 1
        posix_kill($this->process, SIGTERM);
169
170 1
        return null;
171
    }
172
173
    /**
174
     * Start worker.
175
     */
176 17
    public function start(): void
177
    {
178 17
        $cursor = $this->jobs->getCursor([
179 17
            '$or' => [
180
                ['status' => JobInterface::STATUS_WAITING],
181
                ['status' => JobInterface::STATUS_POSTPONED],
182
            ],
183
        ]);
184
185 17
        $this->catchSignal();
186
187 17
        while ($this->loop()) {
188 17
            $this->processLocalQueue();
189
190 17
            if (null === $cursor->current()) {
191 1
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

191
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
192
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
193
                        'category' => get_class($this),
194
                        'pm' => $this->process,
195
                    ]);
196
197
                    $this->jobs->create();
198
199
                    $this->start();
200
201
                    break;
202
                }
203
204 1
                $this->jobs->next($cursor, function () {
205
                    $this->start();
206 1
                });
207
208 1
                continue;
209
            }
210
211 16
            $job = $cursor->current();
212 16
            $this->jobs->next($cursor, function () {
213
                $this->start();
214 16
            });
215
216 16
            $this->queueJob($job);
217
        }
218 17
    }
219
220
    /**
221
     * Cleanup and exit.
222
     */
223 3
    public function cleanup()
224
    {
225 3
        $this->saveState();
226
227 3
        if (null === $this->current_job) {
228 2
            $this->logger->debug('received cleanup call, no job is currently processing, exit now', [
229 2
                'category' => get_class($this),
230 2
                'pm' => $this->process,
231
            ]);
232
233 2
            $this->exit();
234
235 2
            return null;
236
        }
237
238 1
        $this->logger->debug('received cleanup call, reschedule current processing job ['.$this->current_job['_id'].']', [
239 1
            'category' => get_class($this),
240 1
            'pm' => $this->process,
241
        ]);
242
243 1
        $this->updateJob($this->current_job, JobInterface::STATUS_CANCELED);
244
245 1
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
246 1
            'job' => $this->current_job['_id'],
247 1
            'worker' => $this->id,
248
            'status' => JobInterface::STATUS_CANCELED,
249 1
            'timestamp' => new UTCDateTime(),
250
        ]);
251
252 1
        $options = $this->current_job['options'];
253 1
        $options['at'] = 0;
254
255 1
        $result = $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], $options)->getId();
256 1
        $this->exit();
257
258 1
        return $result;
259
    }
260
261
    /**
262
     * Save local queue.
263
     */
264 3
    protected function saveState(): self
265
    {
266 3
        foreach ($this->queue as $key => $job) {
267
            try {
268 1
                $options = $job['options'];
269 1
                $options[Scheduler::OPTION_ID] = $job['_id'];
270 1
                $this->scheduler->addJob($job['class'], $job['data'], $options);
271 1
                unset($this->queue[$key]);
272
            } catch (\Exception $e) {
273
                if ($e instanceof BulkWriteException && 11000 === $e->getCode()) {
274
                    continue;
275
                }
276
277
                $this->logger->error('failed reschedule locally queued job ['.$job['_id'].']', [
278
                    'exception' => $e,
279 1
                    'category' => get_class($this),
280
                ]);
281
            }
282
        }
283
284 3
        return $this;
285
    }
286
287
    /**
288
     * Catch signals and cleanup.
289
     */
290 18
    protected function catchSignal(): self
291
    {
292 18
        pcntl_async_signals(true);
293 18
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
294 18
        pcntl_signal(SIGINT, [$this, 'cleanup']);
295 18
        pcntl_signal(SIGALRM, [$this, 'timeout']);
296
297 18
        return $this;
298
    }
299
300
    /**
301
     * Queue job.
302
     */
303 16
    protected function queueJob(array $job): bool
304
    {
305 16
        if (!isset($job['status'])) {
306
            return false;
307
        }
308
309 16
        if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING)) {
310 16
            $this->processJob($job);
311 1
        } elseif (JobInterface::STATUS_POSTPONED === $job['status']) {
312 1
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
313 1
                'category' => get_class($this),
314 1
                'pm' => $this->process,
315
            ]);
316
317 1
            $this->queue[(string) $job['_id']] = $job;
318
        }
319
320 16
        return true;
321
    }
322
323
    /**
324
     * Update job status.
325
     */
326 18
    protected function collectJob(array $job, int $status, $from_status = JobInterface::STATUS_WAITING): bool
327
    {
328
        $set = [
329 18
             'status' => $status,
330
        ];
331
332 18
        if (JobInterface::STATUS_PROCESSING === $status) {
333 18
            $set['started'] = new UTCDateTime();
334 18
            $set['worker'] = $this->id;
335
        }
336
337 18
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
338 18
            '_id' => $job['_id'],
339 18
            'status' => $from_status,
340
            '$isolated' => true,
341
        ], [
342 18
            '$set' => $set,
343
        ]);
344
345 18
        if (1 === $result->getModifiedCount()) {
346 18
            $this->logger->debug('job ['.$job['_id'].'] updated to status ['.$status.']', [
347 18
                'category' => get_class($this),
348 18
                'pm' => $this->process,
349
            ]);
350
351 18
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
352 18
                'job' => $job['_id'],
353 18
                'worker' => $this->id,
354 18
                'status' => $status,
355 18
                'timestamp' => new UTCDateTime(),
356
            ]);
357
358 18
            return true;
359
        }
360
361 2
        $this->logger->debug('job ['.$job['_id'].'] is already collected with status ['.$status.']', [
362 2
            'category' => get_class($this),
363 2
            'pm' => $this->process,
364
        ]);
365
366 2
        return false;
367
    }
368
369
    /**
370
     * Update job status.
371
     */
372 21
    protected function updateJob(array $job, int $status): bool
373
    {
374
        $set = [
375 21
            'status' => $status,
376
        ];
377
378 21
        if ($status >= JobInterface::STATUS_DONE) {
379 17
            $set['ended'] = new UTCDateTime();
380
        }
381
382 21
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
383 21
            '_id' => $job['_id'],
384
            '$isolated' => true,
385
        ], [
386 21
            '$set' => $set,
387
        ]);
388
389 21
        return $result->isAcknowledged();
390
    }
391
392
    /**
393
     * Check local queue for postponed jobs.
394
     */
395 17
    protected function processLocalQueue(): bool
396
    {
397 17
        $now = time();
398 17
        foreach ($this->queue as $key => $job) {
399 1
            if ($job['options']['at'] <= $now) {
400 1
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
401 1
                    'category' => get_class($this),
402 1
                    'pm' => $this->process,
403
                ]);
404
405 1
                unset($this->queue[$key]);
406 1
                $job['options']['at'] = 0;
407
408 1
                if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING, JobInterface::STATUS_POSTPONED)) {
409 1
                    $this->processJob($job);
410
                }
411
            }
412
        }
413
414 17
        return true;
415
    }
416
417
    /**
418
     * Process job.
419
     */
420 16
    protected function processJob(array $job): ObjectId
421
    {
422 16
        $now = time();
423
424 16
        if ($job['options']['at'] > $now) {
425 4
            $this->updateJob($job, JobInterface::STATUS_POSTPONED);
426 4
            $this->queue[(string) $job['_id']] = $job;
427
428 4
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['options']['at'].']', [
429 4
                'category' => get_class($this),
430 4
                'pm' => $this->process,
431
            ]);
432
433 4
            return $job['_id'];
434
        }
435
436 13
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
437 13
            'category' => get_class($this),
438 13
            'pm' => $this->process,
439 13
            'options' => $job['options'],
440 13
            'params' => $job['data'],
441
        ]);
442
443 13
        $this->current_job = $job;
444 13
        pcntl_alarm($job['options']['timeout']);
445
446
        try {
447 13
            $this->executeJob($job);
448 7
            $this->current_job = null;
449 6
        } catch (\Exception $e) {
450 6
            pcntl_alarm(0);
451
452 6
            $this->logger->error('failed execute job ['.$job['_id'].'] of type ['.$job['class'].']', [
453 6
                'category' => get_class($this),
454 6
                'pm' => $this->process,
455 6
                'exception' => $e,
456
            ]);
457
458 6
            $this->updateJob($job, JobInterface::STATUS_FAILED);
459 6
            $this->current_job = null;
460
461 6
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
462 6
                'job' => $job['_id'],
463 6
                'worker' => $this->id,
464
                'status' => JobInterface::STATUS_FAILED,
465 6
                'timestamp' => new UTCDateTime(),
466
                'exception' => [
467 6
                    'class' => get_class($e),
468 6
                    'message' => $e->getMessage(),
469 6
                    'file' => $e->getFile(),
470 6
                    'line' => $e->getLine(),
471 6
                    'code' => $e->getCode(),
472
                ],
473
            ]);
474
475 6
            if (0 !== $job['options']['retry']) {
476 4
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
477 4
                    'category' => get_class($this),
478 4
                    'pm' => $this->process,
479
                ]);
480
481 4
                --$job['options']['retry'];
482 4
                $job['options']['at'] = time() + $job['options']['retry_interval'];
483 4
                $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
484
485 4
                return $job->getId();
486
            }
487
        }
488
489 10
        pcntl_alarm(0);
490
491 10
        if ($job['options']['interval'] > 0) {
492 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
493 1
                'category' => get_class($this),
494 1
                'pm' => $this->process,
495
            ]);
496
497 1
            $job['options']['at'] = time() + $job['options']['interval'];
498 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
499
500 1
            return $job->getId();
501
        }
502 9
        if ($job['options']['interval'] <= -1) {
503 1
            $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [
504 1
                'category' => get_class($this),
505 1
                'pm' => $this->process,
506
            ]);
507
508 1
            unset($job['options']['at']);
509 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
510
511 1
            return $job->getId();
512
        }
513
514 8
        return $job['_id'];
515
    }
516
517
    /**
518
     * Execute job.
519
     */
520 14
    protected function executeJob(array $job): bool
521
    {
522 14
        if (!class_exists($job['class'])) {
523 1
            throw new InvalidJobException('job class does not exists');
524
        }
525
526 13
        if (null === $this->container) {
527 12
            $instance = new $job['class']();
528
        } else {
529 1
            $instance = $this->container->get($job['class']);
530
        }
531
532 13
        if (!($instance instanceof JobInterface)) {
533
            throw new InvalidJobException('job must implement JobInterface');
534
        }
535
536
        $instance
537 13
            ->setData($job['data'])
538 13
            ->setId($job['_id'])
539 13
            ->start();
540
541 7
        $return = $this->updateJob($job, JobInterface::STATUS_DONE);
542
543 7
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
544 7
            'job' => $job['_id'],
545 7
            'worker' => $this->id,
546
            'status' => JobInterface::STATUS_DONE,
547 7
            'timestamp' => new UTCDateTime(),
548
        ]);
549
550 7
        unset($instance);
551
552 7
        return $return;
553
    }
554
}
555