Completed
Branch dev (31fb20)
by Raffael
03:39
created

Worker::collectJob()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 41
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 41
ccs 23
cts 23
cp 1
rs 9.536
c 0
b 0
f 0
cc 3
nc 4
nop 3
crap 3
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Container\ContainerInterface;
19
use Psr\Log\LoggerInterface;
20
use TaskScheduler\Exception\InvalidJobException;
21
22
class Worker extends AbstractHandler
23
{
24
    /**
25
     * Scheduler.
26
     *
27
     * @var Scheduler
28
     */
29
    protected $scheduler;
30
31
    /**
32
     * Database.
33
     *
34
     * @var Database
35
     */
36
    protected $db;
37
38
    /**
39
     * Logger.
40
     *
41
     * @var LoggerInterface
42
     */
43
    protected $logger;
44
45
    /**
46
     * Container.
47
     *
48
     * @var ContainerInterface
49
     */
50
    protected $container;
51
52
    /**
53
     * Local queue.
54
     *
55
     * @var array
56
     */
57
    protected $queue = [];
58
59
    /**
60
     * Current processing job.
61
     *
62
     * @var null|array
63
     */
64
    protected $current_job;
65
66
    /**
67
     * Process ID (fork posix pid).
68
     *
69
     * @var int
70
     */
71
    protected $process;
72
73
    /**
74
     * Jobs queue.
75
     *
76
     * @var MessageQueue
77
     */
78
    protected $jobs;
79
80
    /**
81
     * Worker ID.
82
     *
83
     * @var ObjectId
84
     */
85
    protected $id;
86
87
    /**
88
     * Init worker.
89
     */
90 28
    public function __construct(ObjectId $id, Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
91
    {
92 28
        $this->id = $id;
93 28
        $this->process = getmypid();
94 28
        $this->scheduler = $scheduler;
95 28
        $this->db = $db;
96 28
        $this->logger = $logger;
97 28
        $this->container = $container;
98 28
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
99 28
    }
100
101
    /**
102
     * Handle worker timeout.
103
     */
104 5
    public function timeout(): ?ObjectId
105
    {
106 5
        if (null === $this->current_job) {
107 2
            $this->logger->debug('reached worker timeout signal, no job is currently processing, ignore it', [
108 2
                'category' => get_class($this),
109 2
                'pm' => $this->process,
110
            ]);
111
112 2
            return null;
113
        }
114
115 3
        $this->logger->debug('received timeout signal, reschedule current processing job ['.$this->current_job['_id'].']', [
116 3
            'category' => get_class($this),
117 3
            'pm' => $this->process,
118
        ]);
119
120 3
        $this->updateJob($this->current_job, JobInterface::STATUS_TIMEOUT);
121
122 3
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
123 3
            'job' => $this->current_job['_id'],
124 3
            'worker' => $this->id,
125
            'status' => JobInterface::STATUS_TIMEOUT,
126 3
            'timestamp' => new UTCDateTime(),
127
        ]);
128
129 3
        $job = $this->current_job;
130
131 3
        if (0 !== $job['options']['retry']) {
132 1
            $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
133 1
                'category' => get_class($this),
134 1
                'pm' => $this->process,
135
            ]);
136
137 1
            --$job['options']['retry'];
138 1
            $job['options']['at'] = time() + $job['options']['at'];
139 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
140
141 1
            return $job->getId();
142
        }
143 2
        if ($job['options']['interval'] > 0) {
144 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
145 1
                'category' => get_class($this),
146 1
                'pm' => $this->process,
147
            ]);
148
149 1
            $job['options']['at'] = time() + $job['options']['interval'];
150 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
151
152 1
            return $job->getId();
153
        }
154 1
        if ($job['options']['interval'] <= -1) {
155
            $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [
156
                'category' => get_class($this),
157
                'pm' => $this->process,
158
            ]);
159
160
            unset($job['options']['at']);
161
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
162
163
            return $job->getId();
164
        }
165
166 1
        $this->current_job = null;
167 1
        posix_kill($this->process, SIGTERM);
168
169 1
        return null;
170
    }
171
172
    /**
173
     * Start worker.
174
     */
175 17
    public function start(): void
176
    {
177 17
        $cursor = $this->jobs->getCursor([
178 17
            '$or' => [
179
                ['status' => JobInterface::STATUS_WAITING],
180
                ['status' => JobInterface::STATUS_POSTPONED],
181
            ],
182
        ]);
183
184 17
        $this->catchSignal();
185
186 17
        while ($this->loop()) {
187 17
            $this->processLocalQueue();
188
189 17
            if (null === $cursor->current()) {
190 1
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

190
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
191
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
192
                        'category' => get_class($this),
193
                        'pm' => $this->process,
194
                    ]);
195
196
                    $this->jobs->create();
197
198
                    $this->start();
199
200
                    break;
201
                }
202
203 1
                $this->jobs->next($cursor, function () {
204
                    $this->start();
205 1
                });
206
207 1
                continue;
208
            }
209
210 16
            $job = $cursor->current();
211 16
            $this->jobs->next($cursor, function () {
212
                $this->start();
213 16
            });
214
215 16
            $this->queueJob($job);
216
        }
217 17
    }
218
219
    /**
220
     * Cleanup and exit.
221
     */
222 3
    public function cleanup()
223
    {
224 3
        $this->saveState();
225
226 3
        if (null === $this->current_job) {
227 2
            $this->logger->debug('received cleanup call, no job is currently processing, exit now', [
228 2
                'category' => get_class($this),
229 2
                'pm' => $this->process,
230
            ]);
231
232 2
            return null;
233
        }
234
235 1
        $this->logger->debug('received cleanup call, reschedule current processing job ['.$this->current_job['_id'].']', [
236 1
            'category' => get_class($this),
237 1
            'pm' => $this->process,
238
        ]);
239
240 1
        $this->updateJob($this->current_job, JobInterface::STATUS_CANCELED);
241
242 1
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
243 1
            'job' => $this->current_job['_id'],
244 1
            'worker' => $this->id,
245
            'status' => JobInterface::STATUS_CANCELED,
246 1
            'timestamp' => new UTCDateTime(),
247
        ]);
248
249 1
        $options = $this->current_job['options'];
250 1
        $options['at'] = 0;
251
252 1
        $result = $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], $options)->getId();
253 1
        $this->exit();
254
255 1
        return $result;
256
    }
257
258
    /**
259
     * Save local queue.
260
     */
261 3
    protected function saveState(): self
262
    {
263 3
        foreach ($this->queue as $key => $job) {
264
            try {
265 1
                $options = $job['options'];
266 1
                $options[Scheduler::OPTION_ID] = $job['_id'];
267 1
                $this->scheduler->addJob($job['class'], $job['data'], $options);
268 1
                unset($this->queue[$key]);
269
            } catch (\Exception $e) {
270
                $this->logger->warning('failed reschedule locally queued job ['.$job['_id'].']', [
271
                    'exception' => $e,
272 1
                    'category' => get_class($this),
273
                ]);
274
            }
275
        }
276
277 3
        return $this;
278
    }
279
280
    /**
281
     * Catch signals and cleanup.
282
     */
283 18
    protected function catchSignal(): self
284
    {
285 18
        pcntl_async_signals(true);
286 18
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
287 18
        pcntl_signal(SIGINT, [$this, 'cleanup']);
288 18
        pcntl_signal(SIGALRM, [$this, 'timeout']);
289
290 18
        return $this;
291
    }
292
293
    /**
294
     * Queue job.
295
     */
296 16
    protected function queueJob(array $job): bool
297
    {
298 16
        if (!isset($job['status'])) {
299
            return false;
300
        }
301
302 16
        if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING)) {
303 16
            $this->processJob($job);
304 1
        } elseif (JobInterface::STATUS_POSTPONED === $job['status']) {
305 1
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
306 1
                'category' => get_class($this),
307 1
                'pm' => $this->process,
308
            ]);
309
310 1
            $this->queue[(string) $job['_id']] = $job;
311
        }
312
313 16
        return true;
314
    }
315
316
    /**
317
     * Update job status.
318
     */
319 18
    protected function collectJob(array $job, int $status, $from_status = JobInterface::STATUS_WAITING): bool
320
    {
321
        $set = [
322 18
             'status' => $status,
323
        ];
324
325 18
        if (JobInterface::STATUS_PROCESSING === $status) {
326 18
            $set['started'] = new UTCDateTime();
327 18
            $set['worker'] = $this->id;
328
        }
329
330 18
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
331 18
            '_id' => $job['_id'],
332 18
            'status' => $from_status,
333
            '$isolated' => true,
334
        ], [
335 18
            '$set' => $set,
336
        ]);
337
338 18
        if (1 === $result->getModifiedCount()) {
339 18
            $this->logger->debug('job ['.$job['_id'].'] updated to status ['.$status.']', [
340 18
                'category' => get_class($this),
341 18
                'pm' => $this->process,
342
            ]);
343
344 18
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
345 18
                'job' => $job['_id'],
346 18
                'worker' => $this->id,
347 18
                'status' => $status,
348 18
                'timestamp' => new UTCDateTime(),
349
            ]);
350
351 18
            return true;
352
        }
353
354 2
        $this->logger->debug('job ['.$job['_id'].'] is already collected with status ['.$status.']', [
355 2
            'category' => get_class($this),
356 2
            'pm' => $this->process,
357
        ]);
358
359 2
        return false;
360
    }
361
362
    /**
363
     * Update job status.
364
     */
365 21
    protected function updateJob(array $job, int $status): bool
366
    {
367
        $set = [
368 21
            'status' => $status,
369
        ];
370
371 21
        if ($status >= JobInterface::STATUS_DONE) {
372 17
            $set['ended'] = new UTCDateTime();
373
        }
374
375 21
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
376 21
            '_id' => $job['_id'],
377
            '$isolated' => true,
378
        ], [
379 21
            '$set' => $set,
380
        ]);
381
382 21
        return $result->isAcknowledged();
383
    }
384
385
    /**
386
     * Check local queue for postponed jobs.
387
     */
388 17
    protected function processLocalQueue(): bool
389
    {
390 17
        $now = new UTCDateTime();
391 17
        foreach ($this->queue as $key => $job) {
392 1
            if ($job['options']['at'] <= $now) {
393 1
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
394 1
                    'category' => get_class($this),
395 1
                    'pm' => $this->process,
396
                ]);
397
398 1
                unset($this->queue[$key]);
399 1
                $job['options']['at'] = null;
400
401 1
                if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING, JobInterface::STATUS_POSTPONED)) {
402 1
                    $this->processJob($job);
403
                }
404
            }
405
        }
406
407 17
        return true;
408
    }
409
410
    /**
411
     * Process job.
412
     */
413 16
    protected function processJob(array $job): ObjectId
414
    {
415 16
        $now = new UTCDateTime();
416
417 16
        if ($job['options']['at'] instanceof UTCDateTime && $job['options']['at'] > $now) {
418 4
            $this->updateJob($job, JobInterface::STATUS_POSTPONED);
419 4
            $this->queue[(string) $job['_id']] = $job;
420
421 4
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['options']['at']->toDateTime()->format('c').']', [
422 4
                'category' => get_class($this),
423 4
                'pm' => $this->process,
424
            ]);
425
426 4
            return $job['_id'];
427
        }
428
429 13
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
430 13
            'category' => get_class($this),
431 13
            'pm' => $this->process,
432 13
            'options' => $job['options'],
433 13
            'params' => $job['data'],
434
        ]);
435
436 13
        $this->current_job = $job;
437 13
        pcntl_alarm($job['options']['timeout']);
438
439
        try {
440 13
            $this->executeJob($job);
441 7
            $this->current_job = null;
442 6
        } catch (\Exception $e) {
443 6
            pcntl_alarm(0);
444
445 6
            $this->logger->error('failed execute job ['.$job['_id'].'] of type ['.$job['class'].']', [
446 6
                'category' => get_class($this),
447 6
                'pm' => $this->process,
448 6
                'exception' => $e,
449
            ]);
450
451 6
            $this->updateJob($job, JobInterface::STATUS_FAILED);
452 6
            $this->current_job = null;
453
454 6
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
455 6
                'job' => $job['_id'],
456 6
                'worker' => $this->id,
457
                'status' => JobInterface::STATUS_FAILED,
458 6
                'timestamp' => new UTCDateTime(),
459
                'exception' => [
460 6
                    'class' => get_class($e),
461 6
                    'message' => $e->getMessage(),
462 6
                    'file' => $e->getFile(),
463 6
                    'line' => $e->getLine(),
464 6
                    'code' => $e->getCode(),
465
                ],
466
            ]);
467
468 6
            if (0 !== $job['options']['retry']) {
469 4
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
470 4
                    'category' => get_class($this),
471 4
                    'pm' => $this->process,
472
                ]);
473
474 4
                --$job['options']['retry'];
475 4
                $job['options']['at'] = time() + $job['options']['retry_interval'];
476 4
                $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
477
478 4
                return $job->getId();
479
            }
480
        }
481
482 10
        pcntl_alarm(0);
483
484 10
        if ($job['options']['interval'] > 0) {
485 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
486 1
                'category' => get_class($this),
487 1
                'pm' => $this->process,
488
            ]);
489
490 1
            $job['options']['at'] = time() + $job['options']['interval'];
491 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
492
493 1
            return $job->getId();
494
        }
495 9
        if ($job['options']['interval'] <= -1) {
496 1
            $this->logger->debug('job ['.$job['_id'].'] has an endless interval', [
497 1
                'category' => get_class($this),
498 1
                'pm' => $this->process,
499
            ]);
500
501 1
            unset($job['options']['at']);
502 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
503
504 1
            return $job->getId();
505
        }
506
507 8
        return $job['_id'];
508
    }
509
510
    /**
511
     * Execute job.
512
     */
513 14
    protected function executeJob(array $job): bool
514
    {
515 14
        if (!class_exists($job['class'])) {
516 1
            throw new InvalidJobException('job class does not exists');
517
        }
518
519 13
        if (null === $this->container) {
520 12
            $instance = new $job['class']();
521
        } else {
522 1
            $instance = $this->container->get($job['class']);
523
        }
524
525 13
        if (!($instance instanceof JobInterface)) {
526
            throw new InvalidJobException('job must implement JobInterface');
527
        }
528
529
        $instance
530 13
            ->setData($job['data'])
531 13
            ->setId($job['_id'])
532 13
            ->start();
533
534 7
        $return = $this->updateJob($job, JobInterface::STATUS_DONE);
535
536 7
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
537 7
            'job' => $job['_id'],
538 7
            'worker' => $this->id,
539
            'status' => JobInterface::STATUS_DONE,
540 7
            'timestamp' => new UTCDateTime(),
541
        ]);
542
543 7
        unset($instance);
544
545 7
        return $return;
546
    }
547
}
548