Completed
Branch dev (58fe22)
by Raffael
02:55
created

Worker   A

Complexity

Total Complexity 37

Size/Duplication

Total Lines 488
Duplicated Lines 0 %

Test Coverage

Coverage 92.96%

Importance

Changes 0
Metric Value
wmc 37
eloc 220
dl 0
loc 488
ccs 198
cts 213
cp 0.9296
rs 9.44
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A loop() 0 3 1
A collectJob() 0 40 3
A __construct() 0 9 1
A catchSignal() 0 8 1
A terminate() 0 27 2
A processLocalQueue() 0 20 4
A timeout() 0 55 4
A cleanup() 0 4 1
A start() 0 41 4
A updateJob() 0 18 2
A executeJob() 0 32 4
B processJob() 0 84 6
A queueJob() 0 18 4
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Container\ContainerInterface;
19
use Psr\Log\LoggerInterface;
20
use TaskScheduler\Exception\InvalidJobException;
21
22
class Worker
23
{
24
    /**
25
     * Scheduler.
26
     *
27
     * @var Scheduler
28
     */
29
    protected $scheduler;
30
31
    /**
32
     * Database.
33
     *
34
     * @var Database
35
     */
36
    protected $db;
37
38
    /**
39
     * Logger.
40
     *
41
     * @var LoggerInterface
42
     */
43
    protected $logger;
44
45
    /**
46
     * Container.
47
     *
48
     * @var ContainerInterface
49
     */
50
    protected $container;
51
52
    /**
53
     * Local queue.
54
     *
55
     * @var array
56
     */
57
    protected $queue = [];
58
59
    /**
60
     * Current processing job.
61
     *
62
     * @var null|array
63
     */
64
    protected $current_job;
65
66
    /**
67
     * Process ID (fork posix pid).
68
     *
69
     * @var int
70
     */
71
    protected $process;
72
73
    /**
74
     * Jobs queue.
75
     *
76
     * @var MessageQueue
77
     */
78
    protected $jobs;
79
80
    /**
81
     * Worker ID.
82
     *
83
     * @var ObjectId
84
     */
85
    protected $id;
86
87
    /**
88
     * Init worker.
89
     */
90 22
    public function __construct(ObjectId $id, Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
91
    {
92 22
        $this->id = $id;
93 22
        $this->process = getmypid();
94 22
        $this->scheduler = $scheduler;
95 22
        $this->db = $db;
96 22
        $this->logger = $logger;
97 22
        $this->container = $container;
98 22
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
99 22
    }
100
101
    /**
102
     * Cleanup and exit.
103
     */
104
    public function cleanup(int $sig)
105
    {
106
        $this->terminate($sig);
107
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
108
    }
109
110
    /**
111
     * Handle worker timeout.
112
     */
113 4
    public function timeout(): ?ObjectId
114
    {
115 4
        if (null === $this->current_job) {
116 1
            $this->logger->debug('reached worker timeout signal, no job is currently processing, ignore it', [
117 1
                'category' => get_class($this),
118 1
                'pm' => $this->process,
119
            ]);
120
121 1
            return null;
122
        }
123
124 3
        $this->logger->debug('received timeout signal, reschedule current processing job ['.$this->current_job['_id'].']', [
125 3
            'category' => get_class($this),
126 3
            'pm' => $this->process,
127
        ]);
128
129 3
        $this->updateJob($this->current_job, JobInterface::STATUS_TIMEOUT);
130
131 3
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
132 3
            'job' => $this->current_job['_id'],
133
            'status' => JobInterface::STATUS_TIMEOUT,
134 3
            'timestamp' => new UTCDateTime(),
135
        ]);
136
137 3
        $job = $this->current_job;
138
139 3
        if ($job['options']['retry'] >= 0) {
140 1
            $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
141 1
                'category' => get_class($this),
142 1
                'pm' => $this->process,
143
            ]);
144
145 1
            --$job['options']['retry'];
146 1
            $job['options']['at'] = time() + $job['options']['at'];
147 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
148
149 1
            return $job->getId();
150
        }
151
152 2
        if ($job['options']['interval'] >= 0) {
153 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
154 1
                'category' => get_class($this),
155 1
                'pm' => $this->process,
156
            ]);
157
158 1
            $job['options']['at'] = time() + $job['options']['interval'];
159 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
160
161 1
            return $job->getId();
162
        }
163
164 1
        $this->current_job = null;
165 1
        posix_kill($this->process, SIGTERM);
166
167 1
        return null;
168
    }
169
170
    /**
171
     * Start worker.
172
     */
173 11
    public function start(): void
174
    {
175 11
        $cursor = $this->jobs->getCursor([
176 11
            '$or' => [
177
                ['status' => JobInterface::STATUS_WAITING],
178
                ['status' => JobInterface::STATUS_POSTPONED],
179
            ],
180
        ]);
181
182 11
        $this->catchSignal();
183
184 11
        while ($this->loop()) {
185 11
            $this->processLocalQueue();
186
187 11
            if (null === $cursor->current()) {
188 1
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

188
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
189
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
190
                        'category' => get_class($this),
191
                        'pm' => $this->process,
192
                    ]);
193
194
                    $this->jobs->create();
195
196
                    $this->start();
197
198
                    break;
199
                }
200
201 1
                $this->jobs->next($cursor, function () {
202
                    $this->start();
203 1
                });
204
205 1
                continue;
206
            }
207
208 10
            $job = $cursor->current();
209 10
            $this->jobs->next($cursor, function () {
210
                $this->start();
211 10
            });
212
213 10
            $this->queueJob($job);
214
        }
215 11
    }
216
217
    /**
218
     * This method may seem useless but is actually very useful to mock the loop.
219
     */
220
    protected function loop(): bool
221
    {
222
        return true;
223
    }
224
225
    /**
226
     * Catch signals and cleanup.
227
     */
228 12
    protected function catchSignal(): self
229
    {
230 12
        pcntl_async_signals(true);
231 12
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
232 12
        pcntl_signal(SIGINT, [$this, 'cleanup']);
233 12
        pcntl_signal(SIGALRM, [$this, 'timeout']);
234
235 12
        return $this;
236
    }
237
238
    /**
239
     * Cleanup and exit.
240
     */
241 2
    protected function terminate(int $sig): ?ObjectId
242
    {
243 2
        if (null === $this->current_job) {
244 1
            $this->logger->debug('received signal ['.$sig.'], no job is currently processing, exit now', [
245 1
                'category' => get_class($this),
246 1
                'pm' => $this->process,
247
            ]);
248
249 1
            return null;
250
        }
251
252 1
        $this->logger->debug('received signal ['.$sig.'], reschedule current processing job ['.$this->current_job['_id'].']', [
253 1
            'category' => get_class($this),
254 1
            'pm' => $this->process,
255
        ]);
256
257 1
        $this->updateJob($this->current_job, JobInterface::STATUS_CANCELED);
258
259 1
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
260 1
            'job' => $this->current_job['_id'],
261
            'status' => JobInterface::STATUS_CANCELED,
262 1
            'timestamp' => new UTCDateTime(),
263
        ]);
264
265 1
        $this->current_job['options']['at'] = 0;
266
267 1
        return $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], $this->current_job['options'])->getId();
268
    }
269
270
    /**
271
     * Queue job.
272
     */
273 10
    protected function queueJob(array $job): bool
274
    {
275 10
        if (!isset($job['status'])) {
276
            return false;
277
        }
278
279 10
        if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING)) {
280 10
            $this->processJob($job);
281 1
        } elseif (JobInterface::STATUS_POSTPONED === $job['status']) {
282 1
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
283 1
                'category' => get_class($this),
284 1
                'pm' => $this->process,
285
            ]);
286
287 1
            $this->queue[(string) $job['_id']] = $job;
288
        }
289
290 10
        return true;
291
    }
292
293
    /**
294
     * Update job status.
295
     */
296 12
    protected function collectJob(array $job, int $status, $from_status = JobInterface::STATUS_WAITING): bool
297
    {
298
        $set = [
299 12
             'status' => $status,
300
        ];
301
302 12
        if (JobInterface::STATUS_PROCESSING === $status) {
303 12
            $set['started'] = new UTCDateTime();
304 12
            $set['worker'] = $this->id;
305
        }
306
307 12
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
308 12
            '_id' => $job['_id'],
309 12
            'status' => $from_status,
310
            '$isolated' => true,
311
        ], [
312 12
            '$set' => $set,
313
        ]);
314
315 12
        if (1 === $result->getModifiedCount()) {
316 12
            $this->logger->debug('job ['.$job['_id'].'] updated to status ['.$status.']', [
317 12
                'category' => get_class($this),
318 12
                'pm' => $this->process,
319
            ]);
320
321 12
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
322 12
                'job' => $job['_id'],
323 12
                'status' => $status,
324 12
                'timestamp' => new UTCDateTime(),
325
            ]);
326
327 12
            return true;
328
        }
329
330 2
        $this->logger->debug('job ['.$job['_id'].'] is already collected with status ['.$status.']', [
331 2
            'category' => get_class($this),
332 2
            'pm' => $this->process,
333
        ]);
334
335 2
        return false;
336
    }
337
338
    /**
339
     * Update job status.
340
     */
341 15
    protected function updateJob(array $job, int $status): bool
342
    {
343
        $set = [
344 15
            'status' => $status,
345
        ];
346
347 15
        if ($status >= JobInterface::STATUS_DONE) {
348 13
            $set['ended'] = new UTCDateTime();
349
        }
350
351 15
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
352 15
            '_id' => $job['_id'],
353
            '$isolated' => true,
354
        ], [
355 15
            '$set' => $set,
356
        ]);
357
358 15
        return $result->isAcknowledged();
359
    }
360
361
    /**
362
     * Check local queue for postponed jobs.
363
     */
364 11
    protected function processLocalQueue(): bool
365
    {
366 11
        $now = new UTCDateTime();
367 11
        foreach ($this->queue as $key => $job) {
368 1
            if ($job['options']['at'] <= $now) {
369 1
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
370 1
                    'category' => get_class($this),
371 1
                    'pm' => $this->process,
372
                ]);
373
374 1
                unset($this->queue[$key]);
375 1
                $job['options']['at'] = null;
376
377 1
                if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING, JobInterface::STATUS_POSTPONED)) {
378 1
                    $this->processJob($job);
379
                }
380
            }
381
        }
382
383 11
        return true;
384
    }
385
386
    /**
387
     * Process job.
388
     */
389 10
    protected function processJob(array $job): ObjectId
390
    {
391 10
        $now = new UTCDateTime();
392
393 10
        if ($job['options']['at'] instanceof UTCDateTime && $job['options']['at'] > $now) {
394 2
            $this->updateJob($job, JobInterface::STATUS_POSTPONED);
395 2
            $this->queue[(string) $job['_id']] = $job;
396
397 2
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['options']['at']->toDateTime()->format('c').']', [
398 2
                'category' => get_class($this),
399 2
                'pm' => $this->process,
400
            ]);
401
402 2
            return $job['_id'];
403
        }
404
405 9
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
406 9
            'category' => get_class($this),
407 9
            'pm' => $this->process,
408 9
            'options' => $job['options'],
409 9
            'params' => $job['data'],
410
        ]);
411
412 9
        $this->current_job = $job;
413 9
        pcntl_alarm($job['options']['timeout']);
414
415
        try {
416 9
            $this->executeJob($job);
417 6
            $this->current_job = null;
418 3
        } catch (\Exception $e) {
419 3
            pcntl_alarm(0);
420
421 3
            $this->logger->error('failed execute job ['.$job['_id'].']', [
422 3
                'category' => get_class($this),
423 3
                'pm' => $this->process,
424 3
                'exception' => $e,
425
            ]);
426
427 3
            $this->updateJob($job, JobInterface::STATUS_FAILED);
428 3
            $this->current_job = null;
429
430 3
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
431 3
                'job' => $job['_id'],
432
                'status' => JobInterface::STATUS_FAILED,
433 3
                'timestamp' => new UTCDateTime(),
434
                'exception' => [
435 3
                    'class' => get_class($e),
436 3
                    'message' => $e->getMessage(),
437 3
                    'file' => $e->getFile(),
438 3
                    'line' => $e->getLine(),
439 3
                    'code' => $e->getCode(),
440 3
                    'trace' => $e->getTrace(),
441
                ],
442
            ]);
443
444 3
            if ($job['options']['retry'] >= 0) {
445 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
446 1
                    'category' => get_class($this),
447 1
                    'pm' => $this->process,
448
                ]);
449
450 1
                --$job['options']['retry'];
451 1
                $job['options']['at'] = time() + $job['options']['retry_interval'];
452 1
                $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
453
454 1
                return $job->getId();
455
            }
456
        }
457
458 8
        pcntl_alarm(0);
459
460 8
        if ($job['options']['interval'] >= 0) {
461 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
462 1
                'category' => get_class($this),
463 1
                'pm' => $this->process,
464
            ]);
465
466 1
            $job['options']['at'] = time() + $job['options']['interval'];
467 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
468
469 1
            return $job->getId();
470
        }
471
472 7
        return $job['_id'];
473
    }
474
475
    /**
476
     * Execute job.
477
     */
478 10
    protected function executeJob(array $job): bool
479
    {
480 10
        if (!class_exists($job['class'])) {
481 1
            throw new InvalidJobException('job class does not exists');
482
        }
483
484 9
        if (null === $this->container) {
485 8
            $instance = new $job['class']();
486
        } else {
487 1
            $instance = $this->container->get($job['class']);
488
        }
489
490 9
        if (!($instance instanceof JobInterface)) {
491
            throw new InvalidJobException('job must implement JobInterface');
492
        }
493
494
        $instance
495 9
            ->setData($job['data'])
496 9
            ->setId($job['_id'])
497 9
            ->start();
498
499 6
        $return = $this->updateJob($job, JobInterface::STATUS_DONE);
500
501 6
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
502 6
            'job' => $job['_id'],
503
            'status' => JobInterface::STATUS_DONE,
504 6
            'timestamp' => new UTCDateTime(),
505
        ]);
506
507 6
        unset($instance);
508
509 6
        return $return;
510
    }
511
}
512