Completed
Branch dev (93c9bf)
by Raffael
04:08
created

Worker::timeout()   B

Complexity

Conditions 5
Paths 5

Size

Total Lines 66
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 34
CRAP Score 5.0843

Importance

Changes 0
Metric Value
eloc 40
dl 0
loc 66
ccs 34
cts 40
cp 0.85
rs 8.9688
c 0
b 0
f 0
cc 5
nc 5
nop 0
crap 5.0843

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Container\ContainerInterface;
19
use Psr\Log\LoggerInterface;
20
use TaskScheduler\Exception\InvalidJobException;
21
22
class Worker
23
{
24
    /**
25
     * Scheduler.
26
     *
27
     * @var Scheduler
28
     */
29
    protected $scheduler;
30
31
    /**
32
     * Database.
33
     *
34
     * @var Database
35
     */
36
    protected $db;
37
38
    /**
39
     * Logger.
40
     *
41
     * @var LoggerInterface
42
     */
43
    protected $logger;
44
45
    /**
46
     * Container.
47
     *
48
     * @var ContainerInterface
49
     */
50
    protected $container;
51
52
    /**
53
     * Local queue.
54
     *
55
     * @var array
56
     */
57
    protected $queue = [];
58
59
    /**
60
     * Current processing job.
61
     *
62
     * @var null|array
63
     */
64
    protected $current_job;
65
66
    /**
67
     * Process ID (fork posix pid).
68
     *
69
     * @var int
70
     */
71
    protected $process;
72
73
    /**
74
     * Jobs queue.
75
     *
76
     * @var MessageQueue
77
     */
78
    protected $jobs;
79
80
    /**
81
     * Worker ID.
82
     *
83
     * @var ObjectId
84
     */
85
    protected $id;
86
87
    /**
88
     * Init worker.
89
     */
90 28
    public function __construct(ObjectId $id, Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
91
    {
92 28
        $this->id = $id;
93 28
        $this->process = getmypid();
94 28
        $this->scheduler = $scheduler;
95 28
        $this->db = $db;
96 28
        $this->logger = $logger;
97 28
        $this->container = $container;
98 28
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
99 28
    }
100
101
    /**
102
     * Handle worker timeout.
103
     */
104 5
    public function timeout(): ?ObjectId
105
    {
106 5
        if (null === $this->current_job) {
107 2
            $this->logger->debug('reached worker timeout signal, no job is currently processing, ignore it', [
108 2
                'category' => get_class($this),
109 2
                'pm' => $this->process,
110
            ]);
111
112 2
            return null;
113
        }
114
115 3
        $this->logger->debug('received timeout signal, reschedule current processing job ['.$this->current_job['_id'].']', [
116 3
            'category' => get_class($this),
117 3
            'pm' => $this->process,
118
        ]);
119
120 3
        $this->updateJob($this->current_job, JobInterface::STATUS_TIMEOUT);
121
122 3
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
123 3
            'job' => $this->current_job['_id'],
124 3
            'worker' => $this->id,
125
            'status' => JobInterface::STATUS_TIMEOUT,
126 3
            'timestamp' => new UTCDateTime(),
127
        ]);
128
129 3
        $job = $this->current_job;
130
131 3
        if (0 !== $job['options']['retry']) {
132 1
            $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
133 1
                'category' => get_class($this),
134 1
                'pm' => $this->process,
135
            ]);
136
137 1
            --$job['options']['retry'];
138 1
            $job['options']['at'] = time() + $job['options']['at'];
139 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
140
141 1
            return $job->getId();
142
        }
143 2
        if ($job['options']['interval'] > 0) {
144 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
145 1
                'category' => get_class($this),
146 1
                'pm' => $this->process,
147
            ]);
148
149 1
            $job['options']['at'] = time() + $job['options']['interval'];
150 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
151
152 1
            return $job->getId();
153
        }
154 1
        if ($job['options']['interval'] <= -1) {
155
            $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [
156
                'category' => get_class($this),
157
                'pm' => $this->process,
158
            ]);
159
160
            unset($job['options']['at']);
161
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
162
163
            return $job->getId();
164
        }
165
166 1
        $this->current_job = null;
167 1
        posix_kill($this->process, SIGTERM);
168
169 1
        return null;
170
    }
171
172
    /**
173
     * Start worker.
174
     */
175 17
    public function start(): void
176
    {
177 17
        $cursor = $this->jobs->getCursor([
178 17
            '$or' => [
179
                ['status' => JobInterface::STATUS_WAITING],
180
                ['status' => JobInterface::STATUS_POSTPONED],
181
            ],
182
        ]);
183
184 17
        $this->catchSignal();
185
186 17
        while ($this->loop()) {
187 17
            $this->processLocalQueue();
188
189 17
            if (null === $cursor->current()) {
190 1
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

190
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
191
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
192
                        'category' => get_class($this),
193
                        'pm' => $this->process,
194
                    ]);
195
196
                    $this->jobs->create();
197
198
                    $this->start();
199
200
                    break;
201
                }
202
203 1
                $this->jobs->next($cursor, function () {
204
                    $this->start();
205 1
                });
206
207 1
                continue;
208
            }
209
210 16
            $job = $cursor->current();
211 16
            $this->jobs->next($cursor, function () {
212
                $this->start();
213 16
            });
214
215 16
            $this->queueJob($job);
216
        }
217 17
    }
218
219
    /**
220
     * Cleanup and exit.
221
     */
222 3
    public function cleanup()
223
    {
224 3
        $this->saveState();
225
226 3
        if (null === $this->current_job) {
227 2
            $this->logger->debug('received cleanup call, no job is currently processing, exit now', [
228 2
                'category' => get_class($this),
229 2
                'pm' => $this->process,
230
            ]);
231
232 2
            return null;
233
        }
234
235 1
        $this->logger->debug('received cleanup call, reschedule current processing job ['.$this->current_job['_id'].']', [
236 1
            'category' => get_class($this),
237 1
            'pm' => $this->process,
238
        ]);
239
240 1
        $this->updateJob($this->current_job, JobInterface::STATUS_CANCELED);
241
242 1
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
243 1
            'job' => $this->current_job['_id'],
244 1
            'worker' => $this->id,
245
            'status' => JobInterface::STATUS_CANCELED,
246 1
            'timestamp' => new UTCDateTime(),
247
        ]);
248
249 1
        $options = $this->current_job['options'];
250 1
        $options['at'] = 0;
251
252 1
        $result = $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], $options)->getId();
253 1
        $this->exit();
254
255 1
        return $result;
256
    }
257
258
    /**
259
     * This method may seem useless but is actually very useful to mock.
260
     */
261
    protected function exit()
262
    {
263
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
264
    }
265
266
    /**
267
     * Save local queue.
268
     */
269 3
    protected function saveState(): self
270
    {
271 3
        foreach ($this->queue as $key => $job) {
272
            try {
273 1
                $options = $job['options'];
274 1
                $options[Scheduler::OPTION_ID] = $job['_id'];
275 1
                $this->scheduler->addJob($job['class'], $job['data'], $options);
276 1
                unset($this->queue[$key]);
277
            } catch (\Exception $e) {
278
                $this->logger->warning('failed reschedule locally queued job ['.$job['_id'].']', [
279
                    'exception' => $e,
280 1
                    'category' => get_class($this),
281
                ]);
282
            }
283
        }
284
285 3
        return $this;
286
    }
287
288
    /**
289
     * This method may seem useless but is actually very useful to mock the loop.
290
     */
291
    protected function loop(): bool
292
    {
293
        return true;
294
    }
295
296
    /**
297
     * Catch signals and cleanup.
298
     */
299 18
    protected function catchSignal(): self
300
    {
301 18
        pcntl_async_signals(true);
302 18
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
303 18
        pcntl_signal(SIGINT, [$this, 'cleanup']);
304 18
        pcntl_signal(SIGALRM, [$this, 'timeout']);
305
306 18
        return $this;
307
    }
308
309
    /**
310
     * Queue job.
311
     */
312 16
    protected function queueJob(array $job): bool
313
    {
314 16
        if (!isset($job['status'])) {
315
            return false;
316
        }
317
318 16
        if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING)) {
319 16
            $this->processJob($job);
320 1
        } elseif (JobInterface::STATUS_POSTPONED === $job['status']) {
321 1
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
322 1
                'category' => get_class($this),
323 1
                'pm' => $this->process,
324
            ]);
325
326 1
            $this->queue[(string) $job['_id']] = $job;
327
        }
328
329 16
        return true;
330
    }
331
332
    /**
333
     * Update job status.
334
     */
335 18
    protected function collectJob(array $job, int $status, $from_status = JobInterface::STATUS_WAITING): bool
336
    {
337
        $set = [
338 18
             'status' => $status,
339
        ];
340
341 18
        if (JobInterface::STATUS_PROCESSING === $status) {
342 18
            $set['started'] = new UTCDateTime();
343 18
            $set['worker'] = $this->id;
344
        }
345
346 18
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
347 18
            '_id' => $job['_id'],
348 18
            'status' => $from_status,
349
            '$isolated' => true,
350
        ], [
351 18
            '$set' => $set,
352
        ]);
353
354 18
        if (1 === $result->getModifiedCount()) {
355 18
            $this->logger->debug('job ['.$job['_id'].'] updated to status ['.$status.']', [
356 18
                'category' => get_class($this),
357 18
                'pm' => $this->process,
358
            ]);
359
360 18
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
361 18
                'job' => $job['_id'],
362 18
                'worker' => $this->id,
363 18
                'status' => $status,
364 18
                'timestamp' => new UTCDateTime(),
365
            ]);
366
367 18
            return true;
368
        }
369
370 2
        $this->logger->debug('job ['.$job['_id'].'] is already collected with status ['.$status.']', [
371 2
            'category' => get_class($this),
372 2
            'pm' => $this->process,
373
        ]);
374
375 2
        return false;
376
    }
377
378
    /**
379
     * Update job status.
380
     */
381 21
    protected function updateJob(array $job, int $status): bool
382
    {
383
        $set = [
384 21
            'status' => $status,
385
        ];
386
387 21
        if ($status >= JobInterface::STATUS_DONE) {
388 17
            $set['ended'] = new UTCDateTime();
389
        }
390
391 21
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
392 21
            '_id' => $job['_id'],
393
            '$isolated' => true,
394
        ], [
395 21
            '$set' => $set,
396
        ]);
397
398 21
        return $result->isAcknowledged();
399
    }
400
401
    /**
402
     * Check local queue for postponed jobs.
403
     */
404 17
    protected function processLocalQueue(): bool
405
    {
406 17
        $now = new UTCDateTime();
407 17
        foreach ($this->queue as $key => $job) {
408 1
            if ($job['options']['at'] <= $now) {
409 1
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
410 1
                    'category' => get_class($this),
411 1
                    'pm' => $this->process,
412
                ]);
413
414 1
                unset($this->queue[$key]);
415 1
                $job['options']['at'] = null;
416
417 1
                if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING, JobInterface::STATUS_POSTPONED)) {
418 1
                    $this->processJob($job);
419
                }
420
            }
421
        }
422
423 17
        return true;
424
    }
425
426
    /**
427
     * Process job.
428
     */
429 16
    protected function processJob(array $job): ObjectId
430
    {
431 16
        $now = new UTCDateTime();
432
433 16
        if ($job['options']['at'] instanceof UTCDateTime && $job['options']['at'] > $now) {
434 4
            $this->updateJob($job, JobInterface::STATUS_POSTPONED);
435 4
            $this->queue[(string) $job['_id']] = $job;
436
437 4
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['options']['at']->toDateTime()->format('c').']', [
438 4
                'category' => get_class($this),
439 4
                'pm' => $this->process,
440
            ]);
441
442 4
            return $job['_id'];
443
        }
444
445 13
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
446 13
            'category' => get_class($this),
447 13
            'pm' => $this->process,
448 13
            'options' => $job['options'],
449 13
            'params' => $job['data'],
450
        ]);
451
452 13
        $this->current_job = $job;
453 13
        pcntl_alarm($job['options']['timeout']);
454
455
        try {
456 13
            $this->executeJob($job);
457 7
            $this->current_job = null;
458 6
        } catch (\Exception $e) {
459 6
            pcntl_alarm(0);
460
461 6
            $this->logger->error('failed execute job ['.$job['_id'].'] of type ['.$job['class'].']', [
462 6
                'category' => get_class($this),
463 6
                'pm' => $this->process,
464 6
                'exception' => $e,
465
            ]);
466
467 6
            $this->updateJob($job, JobInterface::STATUS_FAILED);
468 6
            $this->current_job = null;
469
470 6
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
471 6
                'job' => $job['_id'],
472 6
                'worker' => $this->id,
473
                'status' => JobInterface::STATUS_FAILED,
474 6
                'timestamp' => new UTCDateTime(),
475
                'exception' => [
476 6
                    'class' => get_class($e),
477 6
                    'message' => $e->getMessage(),
478 6
                    'file' => $e->getFile(),
479 6
                    'line' => $e->getLine(),
480 6
                    'code' => $e->getCode(),
481
                ],
482
            ]);
483
484 6
            if (0 !== $job['options']['retry']) {
485 4
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
486 4
                    'category' => get_class($this),
487 4
                    'pm' => $this->process,
488
                ]);
489
490 4
                --$job['options']['retry'];
491 4
                $job['options']['at'] = time() + $job['options']['retry_interval'];
492 4
                $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
493
494 4
                return $job->getId();
495
            }
496
        }
497
498 10
        pcntl_alarm(0);
499
500 10
        if ($job['options']['interval'] > 0) {
501 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
502 1
                'category' => get_class($this),
503 1
                'pm' => $this->process,
504
            ]);
505
506 1
            $job['options']['at'] = time() + $job['options']['interval'];
507 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
508
509 1
            return $job->getId();
510
        }
511 9
        if ($job['options']['interval'] <= -1) {
512 1
            $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [
513 1
                'category' => get_class($this),
514 1
                'pm' => $this->process,
515
            ]);
516
517 1
            unset($job['options']['at']);
518 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
519
520 1
            return $job->getId();
521
        }
522
523 8
        return $job['_id'];
524
    }
525
526
    /**
527
     * Execute job.
528
     */
529 14
    protected function executeJob(array $job): bool
530
    {
531 14
        if (!class_exists($job['class'])) {
532 1
            throw new InvalidJobException('job class does not exists');
533
        }
534
535 13
        if (null === $this->container) {
536 12
            $instance = new $job['class']();
537
        } else {
538 1
            $instance = $this->container->get($job['class']);
539
        }
540
541 13
        if (!($instance instanceof JobInterface)) {
542
            throw new InvalidJobException('job must implement JobInterface');
543
        }
544
545
        $instance
546 13
            ->setData($job['data'])
547 13
            ->setId($job['_id'])
548 13
            ->start();
549
550 7
        $return = $this->updateJob($job, JobInterface::STATUS_DONE);
551
552 7
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
553 7
            'job' => $job['_id'],
554 7
            'worker' => $this->id,
555
            'status' => JobInterface::STATUS_DONE,
556 7
            'timestamp' => new UTCDateTime(),
557
        ]);
558
559 7
        unset($instance);
560
561 7
        return $return;
562
    }
563
}
564