Passed
Branch dev (b1b11b)
by Raffael
03:32
created

Worker::executeJob()   A

Complexity

Conditions 4
Paths 5

Size

Total Lines 33
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 4.0027

Importance

Changes 0
Metric Value
eloc 20
dl 0
loc 33
ccs 17
cts 18
cp 0.9444
rs 9.6
c 0
b 0
f 0
cc 4
nc 5
nop 1
crap 4.0027
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Container\ContainerInterface;
19
use Psr\Log\LoggerInterface;
20
use TaskScheduler\Exception\InvalidJobException;
21
22
class Worker
23
{
24
    /**
25
     * Scheduler.
26
     *
27
     * @var Scheduler
28
     */
29
    protected $scheduler;
30
31
    /**
32
     * Database.
33
     *
34
     * @var Database
35
     */
36
    protected $db;
37
38
    /**
39
     * Logger.
40
     *
41
     * @var LoggerInterface
42
     */
43
    protected $logger;
44
45
    /**
46
     * Container.
47
     *
48
     * @var ContainerInterface
49
     */
50
    protected $container;
51
52
    /**
53
     * Local queue.
54
     *
55
     * @var array
56
     */
57
    protected $queue = [];
58
59
    /**
60
     * Current processing job.
61
     *
62
     * @var null|array
63
     */
64
    protected $current_job;
65
66
    /**
67
     * Process ID (fork posix pid).
68
     *
69
     * @var int
70
     */
71
    protected $process;
72
73
    /**
74
     * Jobs queue.
75
     *
76
     * @var MessageQueue
77
     */
78
    protected $jobs;
79
80
    /**
81
     * Worker ID.
82
     *
83
     * @var ObjectId
84
     */
85
    protected $id;
86
87
    /**
88
     * Init worker.
89
     */
90 22
    public function __construct(ObjectId $id, Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
91
    {
92 22
        $this->id = $id;
93 22
        $this->process = getmypid();
94 22
        $this->scheduler = $scheduler;
95 22
        $this->db = $db;
96 22
        $this->logger = $logger;
97 22
        $this->container = $container;
98 22
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
99 22
    }
100
101
    /**
102
     * Cleanup and exit.
103
     */
104
    public function cleanup(int $sig)
105
    {
106
        $this->terminate($sig);
107
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
108
    }
109
110
    /**
111
     * Handle worker timeout.
112
     */
113 4
    public function timeout(): ?ObjectId
114
    {
115 4
        if (null === $this->current_job) {
116 1
            $this->logger->debug('reached worker timeout signal, no job is currently processing, ignore it', [
117 1
                'category' => get_class($this),
118 1
                'pm' => $this->process,
119
            ]);
120
121 1
            return null;
122
        }
123
124 3
        $this->logger->debug('received timeout signal, reschedule current processing job ['.$this->current_job['_id'].']', [
125 3
            'category' => get_class($this),
126 3
            'pm' => $this->process,
127
        ]);
128
129 3
        $this->updateJob($this->current_job, JobInterface::STATUS_TIMEOUT);
130
131 3
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
132 3
            'job' => $this->current_job['_id'],
133 3
            'worker' => $this->id,
134
            'status' => JobInterface::STATUS_TIMEOUT,
135 3
            'timestamp' => new UTCDateTime(),
136
        ]);
137
138 3
        $job = $this->current_job;
139
140 3
        if ($job['options']['retry'] >= 0) {
141 1
            $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
142 1
                'category' => get_class($this),
143 1
                'pm' => $this->process,
144
            ]);
145
146 1
            --$job['options']['retry'];
147 1
            $job['options']['at'] = time() + $job['options']['at'];
148 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
149
150 1
            return $job->getId();
151
        }
152
153 2
        if ($job['options']['interval'] >= 0) {
154 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
155 1
                'category' => get_class($this),
156 1
                'pm' => $this->process,
157
            ]);
158
159 1
            $job['options']['at'] = time() + $job['options']['interval'];
160 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
161
162 1
            return $job->getId();
163
        }
164
165 1
        $this->current_job = null;
166 1
        posix_kill($this->process, SIGTERM);
167
168 1
        return null;
169
    }
170
171
    /**
172
     * Start worker.
173
     */
174 11
    public function start(): void
175
    {
176 11
        $cursor = $this->jobs->getCursor([
177 11
            '$or' => [
178
                ['status' => JobInterface::STATUS_WAITING],
179
                ['status' => JobInterface::STATUS_POSTPONED],
180
            ],
181
        ]);
182
183 11
        $this->catchSignal();
184
185 11
        while ($this->loop()) {
186 11
            $this->processLocalQueue();
187
188 11
            if (null === $cursor->current()) {
189 1
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

189
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
190
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
191
                        'category' => get_class($this),
192
                        'pm' => $this->process,
193
                    ]);
194
195
                    $this->jobs->create();
196
197
                    $this->start();
198
199
                    break;
200
                }
201
202 1
                $this->jobs->next($cursor, function () {
203
                    $this->start();
204 1
                });
205
206 1
                continue;
207
            }
208
209 10
            $job = $cursor->current();
210 10
            $this->jobs->next($cursor, function () {
211
                $this->start();
212 10
            });
213
214 10
            $this->queueJob($job);
215
        }
216 11
    }
217
218
    /**
219
     * This method may seem useless but is actually very useful to mock the loop.
220
     */
221
    protected function loop(): bool
222
    {
223
        return true;
224
    }
225
226
    /**
227
     * Catch signals and cleanup.
228
     */
229 12
    protected function catchSignal(): self
230
    {
231 12
        pcntl_async_signals(true);
232 12
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
233 12
        pcntl_signal(SIGINT, [$this, 'cleanup']);
234 12
        pcntl_signal(SIGALRM, [$this, 'timeout']);
235
236 12
        return $this;
237
    }
238
239
    /**
240
     * Cleanup and exit.
241
     */
242 2
    protected function terminate(int $sig): ?ObjectId
243
    {
244 2
        if (null === $this->current_job) {
245 1
            $this->logger->debug('received signal ['.$sig.'], no job is currently processing, exit now', [
246 1
                'category' => get_class($this),
247 1
                'pm' => $this->process,
248
            ]);
249
250 1
            return null;
251
        }
252
253 1
        $this->logger->debug('received signal ['.$sig.'], reschedule current processing job ['.$this->current_job['_id'].']', [
254 1
            'category' => get_class($this),
255 1
            'pm' => $this->process,
256
        ]);
257
258 1
        $this->updateJob($this->current_job, JobInterface::STATUS_CANCELED);
259
260 1
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
261 1
            'job' => $this->current_job['_id'],
262 1
            'worker' => $this->id,
263
            'status' => JobInterface::STATUS_CANCELED,
264 1
            'timestamp' => new UTCDateTime(),
265
        ]);
266
267 1
        $this->current_job['options']['at'] = 0;
268
269 1
        return $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], $this->current_job['options'])->getId();
270
    }
271
272
    /**
273
     * Queue job.
274
     */
275 10
    protected function queueJob(array $job): bool
276
    {
277 10
        if (!isset($job['status'])) {
278
            return false;
279
        }
280
281 10
        if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING)) {
282 10
            $this->processJob($job);
283 1
        } elseif (JobInterface::STATUS_POSTPONED === $job['status']) {
284 1
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
285 1
                'category' => get_class($this),
286 1
                'pm' => $this->process,
287
            ]);
288
289 1
            $this->queue[(string) $job['_id']] = $job;
290
        }
291
292 10
        return true;
293
    }
294
295
    /**
296
     * Update job status.
297
     */
298 12
    protected function collectJob(array $job, int $status, $from_status = JobInterface::STATUS_WAITING): bool
299
    {
300
        $set = [
301 12
             'status' => $status,
302
        ];
303
304 12
        if (JobInterface::STATUS_PROCESSING === $status) {
305 12
            $set['started'] = new UTCDateTime();
306 12
            $set['worker'] = $this->id;
307
        }
308
309 12
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
310 12
            '_id' => $job['_id'],
311 12
            'status' => $from_status,
312
            '$isolated' => true,
313
        ], [
314 12
            '$set' => $set,
315
        ]);
316
317 12
        if (1 === $result->getModifiedCount()) {
318 12
            $this->logger->debug('job ['.$job['_id'].'] updated to status ['.$status.']', [
319 12
                'category' => get_class($this),
320 12
                'pm' => $this->process,
321
            ]);
322
323 12
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
324 12
                'job' => $job['_id'],
325 12
                'worker' => $this->id,
326 12
                'status' => $status,
327 12
                'timestamp' => new UTCDateTime(),
328
            ]);
329
330 12
            return true;
331
        }
332
333 2
        $this->logger->debug('job ['.$job['_id'].'] is already collected with status ['.$status.']', [
334 2
            'category' => get_class($this),
335 2
            'pm' => $this->process,
336
        ]);
337
338 2
        return false;
339
    }
340
341
    /**
342
     * Update job status.
343
     */
344 15
    protected function updateJob(array $job, int $status): bool
345
    {
346
        $set = [
347 15
            'status' => $status,
348
        ];
349
350 15
        if ($status >= JobInterface::STATUS_DONE) {
351 13
            $set['ended'] = new UTCDateTime();
352
        }
353
354 15
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
355 15
            '_id' => $job['_id'],
356
            '$isolated' => true,
357
        ], [
358 15
            '$set' => $set,
359
        ]);
360
361 15
        return $result->isAcknowledged();
362
    }
363
364
    /**
365
     * Check local queue for postponed jobs.
366
     */
367 11
    protected function processLocalQueue(): bool
368
    {
369 11
        $now = new UTCDateTime();
370 11
        foreach ($this->queue as $key => $job) {
371 1
            if ($job['options']['at'] <= $now) {
372 1
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
373 1
                    'category' => get_class($this),
374 1
                    'pm' => $this->process,
375
                ]);
376
377 1
                unset($this->queue[$key]);
378 1
                $job['options']['at'] = null;
379
380 1
                if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING, JobInterface::STATUS_POSTPONED)) {
381 1
                    $this->processJob($job);
382
                }
383
            }
384
        }
385
386 11
        return true;
387
    }
388
389
    /**
390
     * Process job.
391
     */
392 10
    protected function processJob(array $job): ObjectId
393
    {
394 10
        $now = new UTCDateTime();
395
396 10
        if ($job['options']['at'] instanceof UTCDateTime && $job['options']['at'] > $now) {
397 2
            $this->updateJob($job, JobInterface::STATUS_POSTPONED);
398 2
            $this->queue[(string) $job['_id']] = $job;
399
400 2
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['options']['at']->toDateTime()->format('c').']', [
401 2
                'category' => get_class($this),
402 2
                'pm' => $this->process,
403
            ]);
404
405 2
            return $job['_id'];
406
        }
407
408 9
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
409 9
            'category' => get_class($this),
410 9
            'pm' => $this->process,
411 9
            'options' => $job['options'],
412 9
            'params' => $job['data'],
413
        ]);
414
415 9
        $this->current_job = $job;
416 9
        pcntl_alarm($job['options']['timeout']);
417
418
        try {
419 9
            $this->executeJob($job);
420 6
            $this->current_job = null;
421 3
        } catch (\Exception $e) {
422 3
            pcntl_alarm(0);
423
424 3
            $this->logger->error('failed execute job ['.$job['_id'].']', [
425 3
                'category' => get_class($this),
426 3
                'pm' => $this->process,
427 3
                'exception' => $e,
428
            ]);
429
430 3
            $this->updateJob($job, JobInterface::STATUS_FAILED);
431 3
            $this->current_job = null;
432
433 3
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
434 3
                'job' => $job['_id'],
435 3
                'worker' => $this->id,
436
                'status' => JobInterface::STATUS_FAILED,
437 3
                'timestamp' => new UTCDateTime(),
438
                'exception' => [
439 3
                    'class' => get_class($e),
440 3
                    'message' => $e->getMessage(),
441 3
                    'file' => $e->getFile(),
442 3
                    'line' => $e->getLine(),
443 3
                    'code' => $e->getCode(),
444 3
                    'trace' => $e->getTrace(),
445
                ],
446
            ]);
447
448 3
            if ($job['options']['retry'] >= 0) {
449 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
450 1
                    'category' => get_class($this),
451 1
                    'pm' => $this->process,
452
                ]);
453
454 1
                --$job['options']['retry'];
455 1
                $job['options']['at'] = time() + $job['options']['retry_interval'];
456 1
                $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
457
458 1
                return $job->getId();
459
            }
460
        }
461
462 8
        pcntl_alarm(0);
463
464 8
        if ($job['options']['interval'] >= 0) {
465 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
466 1
                'category' => get_class($this),
467 1
                'pm' => $this->process,
468
            ]);
469
470 1
            $job['options']['at'] = time() + $job['options']['interval'];
471 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
472
473 1
            return $job->getId();
474
        }
475
476 7
        return $job['_id'];
477
    }
478
479
    /**
480
     * Execute job.
481
     */
482 10
    protected function executeJob(array $job): bool
483
    {
484 10
        if (!class_exists($job['class'])) {
485 1
            throw new InvalidJobException('job class does not exists');
486
        }
487
488 9
        if (null === $this->container) {
489 8
            $instance = new $job['class']();
490
        } else {
491 1
            $instance = $this->container->get($job['class']);
492
        }
493
494 9
        if (!($instance instanceof JobInterface)) {
495
            throw new InvalidJobException('job must implement JobInterface');
496
        }
497
498
        $instance
499 9
            ->setData($job['data'])
500 9
            ->setId($job['_id'])
501 9
            ->start();
502
503 6
        $return = $this->updateJob($job, JobInterface::STATUS_DONE);
504
505 6
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
506 6
            'job' => $job['_id'],
507 6
            'worker' => $this->id,
508
            'status' => JobInterface::STATUS_DONE,
509 6
            'timestamp' => new UTCDateTime(),
510
        ]);
511
512 6
        unset($instance);
513
514 6
        return $return;
515
    }
516
}
517