Completed
Branch dev (c58bbf)
by Raffael
03:36
created

Worker::queueJob()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 4.0119

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 18
ccs 10
cts 11
cp 0.9091
rs 9.9332
c 0
b 0
f 0
cc 4
nc 4
nop 1
crap 4.0119
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Container\ContainerInterface;
19
use Psr\Log\LoggerInterface;
20
use TaskScheduler\Exception\InvalidJobException;
21
22
class Worker
23
{
24
    /**
25
     * Scheduler.
26
     *
27
     * @var Scheduler
28
     */
29
    protected $scheduler;
30
31
    /**
32
     * Database.
33
     *
34
     * @var Database
35
     */
36
    protected $db;
37
38
    /**
39
     * Logger.
40
     *
41
     * @var LoggerInterface
42
     */
43
    protected $logger;
44
45
    /**
46
     * Container.
47
     *
48
     * @var ContainerInterface
49
     */
50
    protected $container;
51
52
    /**
53
     * Local queue.
54
     *
55
     * @var array
56
     */
57
    protected $queue = [];
58
59
    /**
60
     * Current processing job.
61
     *
62
     * @var null|array
63
     */
64
    protected $current_job;
65
66
    /**
67
     * Process ID (fork posix pid).
68
     *
69
     * @var int
70
     */
71
    protected $process;
72
73
    /**
74
     * Jobs queue.
75
     *
76
     * @var MessageQueue
77
     */
78
    protected $jobs;
79
80
    /**
81
     * Init worker.
82
     */
83 22
    public function __construct(Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
84
    {
85 22
        $this->process = getmypid();
86 22
        $this->scheduler = $scheduler;
87 22
        $this->db = $db;
88 22
        $this->logger = $logger;
89 22
        $this->container = $container;
90 22
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
91 22
    }
92
93
    /**
94
     * Cleanup and exit.
95
     */
96
    public function cleanup(int $sig)
97
    {
98
        $this->terminate($sig);
99
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
100
    }
101
102
    /**
103
     * Handle worker timeout.
104
     */
105 4
    public function timeout(): ?ObjectId
106
    {
107 4
        if (null === $this->current_job) {
108 1
            $this->logger->debug('reached worker timeout signal, no job is currently processing, ignore it', [
109 1
                'category' => get_class($this),
110 1
                'pm' => $this->process,
111
            ]);
112
113 1
            return null;
114
        }
115
116 3
        $this->logger->debug('received timeout signal, reschedule current processing job ['.$this->current_job['_id'].']', [
117 3
            'category' => get_class($this),
118 3
            'pm' => $this->process,
119
        ]);
120
121 3
        $this->updateJob($this->current_job, JobInterface::STATUS_TIMEOUT);
122
123 3
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
124 3
            'job' => $this->current_job['_id'],
125
            'status' => JobInterface::STATUS_TIMEOUT,
126 3
            'timestamp' => new UTCDateTime(),
127
        ]);
128
129 3
        $job = $this->current_job;
130
131 3
        if ($job['options']['retry'] >= 0) {
132 1
            $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
133 1
                'category' => get_class($this),
134 1
                'pm' => $this->process,
135
            ]);
136
137 1
            --$job['options']['retry'];
138 1
            $job['options']['at'] = time() + $job['options']['at'];
139 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
140
141 1
            return $job->getId();
142
        }
143
144 2
        if ($job['options']['interval'] >= 0) {
145 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
146 1
                'category' => get_class($this),
147 1
                'pm' => $this->process,
148
            ]);
149
150 1
            $job['options']['at'] = time() + $job['options']['interval'];
151 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
152
153 1
            return $job->getId();
154
        }
155
156 1
        $this->current_job = null;
157 1
        posix_kill($this->process, SIGTERM);
158
159 1
        return null;
160
    }
161
162
    /**
163
     * Start worker.
164
     */
165 11
    public function start(): void
166
    {
167 11
        $cursor = $this->jobs->getCursor([
168 11
            '$or' => [
169
                ['status' => JobInterface::STATUS_WAITING],
170
                ['status' => JobInterface::STATUS_POSTPONED],
171
            ],
172
        ]);
173
174 11
        $this->catchSignal();
175
176 11
        while ($this->loop()) {
177 11
            $this->processLocalQueue();
178
179 11
            if (null === $cursor->current()) {
180 1
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

180
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
181
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
182
                        'category' => get_class($this),
183
                        'pm' => $this->process,
184
                    ]);
185
186
                    $this->jobs->create();
187
188
                    $this->start();
189
190
                    break;
191
                }
192
193 1
                $this->jobs->next($cursor, function () {
194
                    $this->start();
195 1
                });
196
197 1
                continue;
198
            }
199
200 10
            $job = $cursor->current();
201 10
            $this->jobs->next($cursor, function () {
202
                $this->start();
203 10
            });
204
205 10
            $this->queueJob($job);
206
        }
207 11
    }
208
209
    /**
210
     * This method may seem useless but is actually very useful to mock the loop.
211
     */
212
    protected function loop(): bool
213
    {
214
        return true;
215
    }
216
217
    /**
218
     * Catch signals and cleanup.
219
     */
220 12
    protected function catchSignal(): self
221
    {
222 12
        pcntl_async_signals(true);
223 12
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
224 12
        pcntl_signal(SIGINT, [$this, 'cleanup']);
225 12
        pcntl_signal(SIGALRM, [$this, 'timeout']);
226
227 12
        return $this;
228
    }
229
230
    /**
231
     * Cleanup and exit.
232
     */
233 2
    protected function terminate(int $sig): ?ObjectId
234
    {
235 2
        if (null === $this->current_job) {
236 1
            $this->logger->debug('received signal ['.$sig.'], no job is currently processing, exit now', [
237 1
                'category' => get_class($this),
238 1
                'pm' => $this->process,
239
            ]);
240
241 1
            return null;
242
        }
243
244 1
        $this->logger->debug('received signal ['.$sig.'], reschedule current processing job ['.$this->current_job['_id'].']', [
245 1
            'category' => get_class($this),
246 1
            'pm' => $this->process,
247
        ]);
248
249 1
        $this->updateJob($this->current_job, JobInterface::STATUS_CANCELED);
250
251 1
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
252 1
            'job' => $this->current_job['_id'],
253
            'status' => JobInterface::STATUS_CANCELED,
254 1
            'timestamp' => new UTCDateTime(),
255
        ]);
256
257 1
        $this->current_job['options']['at'] = time() + $this->current_job['options']['retry_interval'];
258
259 1
        return $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], $this->current_job['options'])->getId();
260
    }
261
262
    /**
263
     * Queue job.
264
     */
265 10
    protected function queueJob(array $job): bool
266
    {
267 10
        if (!isset($job['status'])) {
268
            return false;
269
        }
270
271 10
        if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING)) {
272 10
            $this->processJob($job);
273 1
        } elseif (JobInterface::STATUS_POSTPONED === $job['status']) {
274 1
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
275 1
                'category' => get_class($this),
276 1
                'pm' => $this->process,
277
            ]);
278
279 1
            $this->queue[(string) $job['_id']] = $job;
280
        }
281
282 10
        return true;
283
    }
284
285
    /**
286
     * Update job status.
287
     */
288 12
    protected function collectJob(array $job, int $status, $from_status = JobInterface::STATUS_WAITING): bool
289
    {
290
        $set = [
291 12
             'status' => $status,
292
        ];
293
294 12
        if (JobInterface::STATUS_PROCESSING === $status) {
295 12
            $set['started'] = new UTCDateTime();
296
        }
297
298 12
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
299 12
            '_id' => $job['_id'],
300 12
            'status' => $from_status,
301
            '$isolated' => true,
302
        ], [
303 12
            '$set' => $set,
304
        ]);
305
306 12
        if (1 === $result->getModifiedCount()) {
307 12
            $this->logger->debug('job ['.$job['_id'].'] updated to status ['.$status.']', [
308 12
                'category' => get_class($this),
309 12
                'pm' => $this->process,
310
            ]);
311
312 12
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
313 12
                'job' => $job['_id'],
314 12
                'status' => $status,
315 12
                'timestamp' => new UTCDateTime(),
316
            ]);
317
318 12
            return true;
319
        }
320
321 2
        $this->logger->debug('job ['.$job['_id'].'] is already collected with status ['.$status.']', [
322 2
            'category' => get_class($this),
323 2
            'pm' => $this->process,
324
        ]);
325
326 2
        return false;
327
    }
328
329
    /**
330
     * Update job status.
331
     */
332 15
    protected function updateJob(array $job, int $status): bool
333
    {
334
        $set = [
335 15
            'status' => $status,
336
        ];
337
338 15
        if ($status >= JobInterface::STATUS_DONE) {
339 13
            $set['ended'] = new UTCDateTime();
340
        }
341
342 15
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
343 15
            '_id' => $job['_id'],
344
            '$isolated' => true,
345
        ], [
346 15
            '$set' => $set,
347
        ]);
348
349 15
        return $result->isAcknowledged();
350
    }
351
352
    /**
353
     * Check local queue for postponed jobs.
354
     */
355 11
    protected function processLocalQueue(): bool
356
    {
357 11
        $now = new UTCDateTime();
358 11
        foreach ($this->queue as $key => $job) {
359 1
            if ($job['options']['at'] <= $now) {
360 1
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
361 1
                    'category' => get_class($this),
362 1
                    'pm' => $this->process,
363
                ]);
364
365 1
                unset($this->queue[$key]);
366 1
                $job['options']['at'] = null;
367
368 1
                if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING, JobInterface::STATUS_POSTPONED)) {
369 1
                    $this->processJob($job);
370
                }
371
            }
372
        }
373
374 11
        return true;
375
    }
376
377
    /**
378
     * Process job.
379
     */
380 10
    protected function processJob(array $job): ObjectId
381
    {
382 10
        $now = new UTCDateTime();
383
384 10
        if ($job['options']['at'] instanceof UTCDateTime && $job['options']['at'] > $now) {
385 2
            $this->updateJob($job, JobInterface::STATUS_POSTPONED);
386 2
            $this->queue[(string) $job['_id']] = $job;
387
388 2
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['options']['at']->toDateTime()->format('c').']', [
389 2
                'category' => get_class($this),
390 2
                'pm' => $this->process,
391
            ]);
392
393 2
            return $job['_id'];
394
        }
395
396 9
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
397 9
            'category' => get_class($this),
398 9
            'pm' => $this->process,
399 9
            'options' => $job['options'],
400 9
            'params' => $job['data'],
401
        ]);
402
403 9
        $this->current_job = $job;
404 9
        pcntl_alarm($job['options']['timeout']);
405
406
        try {
407 9
            $this->executeJob($job);
408 6
            $this->current_job = null;
409 3
        } catch (\Exception $e) {
410 3
            pcntl_alarm(0);
411
412 3
            $this->logger->error('failed execute job ['.$job['_id'].']', [
413 3
                'category' => get_class($this),
414 3
                'pm' => $this->process,
415 3
                'exception' => $e,
416
            ]);
417
418 3
            $this->updateJob($job, JobInterface::STATUS_FAILED);
419 3
            $this->current_job = null;
420
421 3
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
422 3
                'job' => $job['_id'],
423
                'status' => JobInterface::STATUS_FAILED,
424 3
                'timestamp' => new UTCDateTime(),
425
                'exception' => [
426 3
                    'class' => get_class($e),
427 3
                    'message' => $e->getMessage(),
428 3
                    'file' => $e->getFile(),
429 3
                    'line' => $e->getLine(),
430 3
                    'code' => $e->getCode(),
431 3
                    'trace' => $e->getTrace(),
432
                ],
433
            ]);
434
435 3
            if ($job['options']['retry'] >= 0) {
436 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['options']['retry'].']', [
437 1
                    'category' => get_class($this),
438 1
                    'pm' => $this->process,
439
                ]);
440
441 1
                --$job['options']['retry'];
442 1
                $job['options']['at'] = time() + $job['options']['at'];
443 1
                $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
444
445 1
                return $job->getId();
446
            }
447
        }
448
449 8
        pcntl_alarm(0);
450
451 8
        if ($job['options']['interval'] >= 0) {
452 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['options']['interval'].'s]', [
453 1
                'category' => get_class($this),
454 1
                'pm' => $this->process,
455
            ]);
456
457 1
            $job['options']['at'] = time() + $job['options']['interval'];
458 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], $job['options']);
459
460 1
            return $job->getId();
461
        }
462
463 7
        return $job['_id'];
464
    }
465
466
    /**
467
     * Execute job.
468
     */
469 10
    protected function executeJob(array $job): bool
470
    {
471 10
        if (!class_exists($job['class'])) {
472 1
            throw new InvalidJobException('job class does not exists');
473
        }
474
475 9
        if (null === $this->container) {
476 8
            $instance = new $job['class']();
477
        } else {
478 1
            $instance = $this->container->get($job['class']);
479
        }
480
481 9
        if (!($instance instanceof JobInterface)) {
482
            throw new InvalidJobException('job must implement JobInterface');
483
        }
484
485
        $result = $instance
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
486 9
            ->setData($job['data'])
487 9
            ->setId($job['_id'])
488 9
            ->start();
489
490 6
        $return = $this->updateJob($job, JobInterface::STATUS_DONE);
491
492 6
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
493 6
            'job' => $job['_id'],
494
            'status' => JobInterface::STATUS_DONE,
495 6
            'timestamp' => new UTCDateTime(),
496
        ]);
497
498 6
        return $return;
499
    }
500
}
501