Passed
Branch dev (b82c9a)
by Raffael
05:11
created

Worker::executeJob()   A

Complexity

Conditions 4
Paths 5

Size

Total Lines 30
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 4.0039

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 30
c 0
b 0
f 0
ccs 15
cts 16
cp 0.9375
rs 9.6666
cc 4
nc 5
nop 1
crap 4.0039
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Container\ContainerInterface;
19
use Psr\Log\LoggerInterface;
20
use TaskScheduler\Exception\InvalidJobException;
21
22
class Worker
23
{
24
    /**
25
     * Scheduler.
26
     *
27
     * @var Scheduler
28
     */
29
    protected $scheduler;
30
31
    /**
32
     * Database.
33
     *
34
     * @var Database
35
     */
36
    protected $db;
37
38
    /**
39
     * Logger.
40
     *
41
     * @var LoggerInterface
42
     */
43
    protected $logger;
44
45
    /**
46
     * Container.
47
     *
48
     * @var ContainerInterface
49
     */
50
    protected $container;
51
52
    /**
53
     * Local queue.
54
     *
55
     * @var array
56
     */
57
    protected $queue = [];
58
59
    /**
60
     * Current processing job.
61
     *
62
     * @var null|array
63
     */
64
    protected $current_job;
65
66
    /**
67
     * Process ID (fork posix pid).
68
     *
69
     * @var string
70
     */
71
    protected $process;
72
73
    /**
74
     * Jobs queue.
75
     *
76
     * @var MessageQueue
77
     */
78
    protected $jobs;
79
80
    /**
81
     * Init queue.
82
     *
83
     * @param ContainerInterface $container
84
     */
85 20
    public function __construct(Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
86
    {
87 20
        $this->process = (string) getmypid();
88 20
        $this->scheduler = $scheduler;
89 20
        $this->db = $db;
90 20
        $this->logger = $logger;
91 20
        $this->container = $container;
92 20
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
93 20
    }
94
95
    /**
96
     * Start worker.
97
     */
98
    public function start()
99
    {
100
        $this->main();
101
    }
102
103
    /**
104
     * Cleanup and exit.
105
     */
106
    public function cleanup(int $sig)
107
    {
108
        $this->handleSignal($sig);
109
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
110
    }
111
112
    /**
113
     * Start worker.
114
     */
115
    protected function main(): void
116
    {
117
        $cursor = $this->jobs->getCursor();
118
        $this->catchSignal();
119
120
        while (true) {
121
            $this->processLocalQueue();
122
123
            if (null === $cursor->current()) {
124
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

124
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
125
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
126
                        'category' => get_class($this),
127
                        'pm' => $this->process,
128
                    ]);
129
130
                    $this->jobs->create();
131
132
                    $this->main();
133
134
                    break;
135
                }
136
137
                $this->jobs->next($cursor, function () {
138
                    $this->main();
139
                });
140
141
                continue;
142
            }
143
144
            $job = $cursor->current();
145
            $this->jobs->next($cursor, function () {
146
                $this->main();
147
            });
148
149
            $this->queueJob($job);
150
        }
151
    }
152
153
    /**
154
     * Catch signals and cleanup.
155
     *
156
     * @return Queue
157
     */
158 1
    protected function catchSignal(): self
159
    {
160 1
        pcntl_async_signals(true);
161 1
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
162 1
        pcntl_signal(SIGINT, [$this, 'cleanup']);
163
164 1
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type TaskScheduler\Worker which is incompatible with the documented return type TaskScheduler\Queue.
Loading history...
165
    }
166
167
    /**
168
     * Cleanup and exit.
169
     *
170
     *
171
     * @return Process
172
     */
173 2
    protected function handleSignal(int $sig): ?Process
174
    {
175 2
        if (null === $this->current_job) {
176 2
            $this->logger->debug('received signal ['.$sig.'], no job is currently processing, exit now', [
177 2
                'category' => get_class($this),
178 2
                'pm' => $this->process,
179
            ]);
180
181 2
            return null;
182
        }
183
184
        $this->logger->debug('received signal ['.$sig.'], reschedule current processing job ['.$this->current_job['_id'].']', [
185
            'category' => get_class($this),
186
            'pm' => $this->process,
187
        ]);
188
189
        $this->updateJob($this->current_job, JobInterface::STATUS_CANCELED);
190
191
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
192
            'job' => $this->current_job['_id'],
193
            'status' => JobInterface::STATUS_CANCELED,
194
            'timestamp' => new UTCDateTime(),
195
        ]);
196
197
        return $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], [
198
            Scheduler::OPTION_AT => $this->current_job['retry_interval'],
199
            Scheduler::OPTION_INTERVAL => $this->current_job['interval'],
200
            Scheduler::OPTION_RETRY => --$this->current_job['retry'],
201
            Scheduler::OPTION_RETRY_INTERVAL => $this->current_job['retry_interval'],
202
            Scheduler::OPTION_IGNORE_MAX_CHILDREN => $this->current_job['ignore_max_children'],
203
        ]);
204
    }
205
206
    /**
207
     * Queue job.
208
     */
209
    protected function queueJob(array $job): bool
210
    {
211
        if (!isset($job['status'])) {
212
            return false;
213
        }
214
215
        if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING)) {
216
            $this->processJob($job);
217
        } elseif (JobInterface::STATUS_POSTPONED === $job['status']) {
218
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
219
                'category' => get_class($this),
220
                'pm' => $this->process,
221
            ]);
222
223
            $this->queue[] = $job;
224
        }
225
226
        return true;
227
    }
228
229
    /**
230
     * Update job status.
231
     */
232 4
    protected function collectJob(array $job, int $status, $from_status = JobInterface::STATUS_WAITING): bool
233
    {
234
        $set = [
235 4
             'status' => $status,
236
        ];
237
238
        //isset($job['started']) required due compatibility between 1.x and 2.x
239 4
        if (JobInterface::STATUS_PROCESSING === $status && isset($job['started'])) {
240 4
            $set['started'] = new UTCDateTime();
241
        }
242
243 4
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
244 4
            '_id' => $job['_id'],
245 4
            'status' => $from_status,
246
            '$isolated' => true,
247
        ], [
248 4
            '$set' => $set,
249
        ]);
250
251 4
        if (1 === $result->getModifiedCount()) {
252 4
            $this->logger->debug('job ['.$job['_id'].'] updated to status ['.$status.']', [
253 4
                'category' => get_class($this),
254 4
                'pm' => $this->process,
255
            ]);
256
257 4
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
258 4
                'job' => $job['_id'],
259 4
                'status' => $status,
260 4
                'timestamp' => new UTCDateTime(),
261
            ]);
262
263 4
            return true;
264
        }
265
266 1
        $this->logger->debug('job ['.$job['_id'].'] is already collected with status ['.$status.']', [
267 1
            'category' => get_class($this),
268 1
            'pm' => $this->process,
269
        ]);
270
271 1
        return false;
272
    }
273
274
    /**
275
     * Update job status.
276
     */
277 13
    protected function updateJob(array $job, int $status): bool
278
    {
279
        $set = [
280 13
            'status' => $status,
281
        ];
282
283
        //isset($job['ended']) required due compatibility between 1.x and 2.x
284 13
        if ($status >= JobInterface::STATUS_DONE && isset($job['ended'])) {
285 10
            $set['ended'] = new UTCDateTime();
286
        }
287
288 13
        $result = $this->db->{$this->scheduler->getJobQueue()}->updateMany([
289 13
            '_id' => $job['_id'],
290
            '$isolated' => true,
291
        ], [
292 13
            '$set' => $set,
293
        ]);
294
295 13
        return $result->isAcknowledged();
296
    }
297
298
    /**
299
     * Check local queue for postponed jobs.
300
     */
301 3
    protected function processLocalQueue(): bool
302
    {
303 3
        $now = new UTCDateTime();
304 3
        foreach ($this->queue as $key => $job) {
305 3
            if ($job['at'] <= $now) {
306 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
307 2
                    'category' => get_class($this),
308 2
                    'pm' => $this->process,
309
                ]);
310
311 2
                unset($this->queue[$key]);
312 2
                $job['at'] = null;
313
314 2
                if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING, JobInterface::STATUS_POSTPONED)) {
315 3
                    $this->processJob($job);
316
                }
317
            }
318
        }
319
320 3
        return true;
321
    }
322
323
    /**
324
     * Process job.
325
     */
326 8
    protected function processJob(array $job): ObjectId
327
    {
328 8
        if ($job['at'] instanceof UTCDateTime) {
329 1
            $this->updateJob($job, JobInterface::STATUS_POSTPONED);
330 1
            $this->queue[] = $job;
331
332 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
333 1
                'category' => get_class($this),
334 1
                'pm' => $this->process,
335
            ]);
336
337 1
            return $job['_id'];
338
        }
339
340 7
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
341 7
            'category' => get_class($this),
342 7
            'pm' => $this->process,
343 7
            'params' => $job['data'],
344
        ]);
345
346 7
        $this->current_job = $job;
347
348
        try {
349 7
            $this->executeJob($job);
350 2
            $this->current_job = null;
351 5
        } catch (\Exception $e) {
352 5
            $this->logger->error('failed execute job ['.$job['_id'].']', [
353 5
                'category' => get_class($this),
354 5
                'pm' => $this->process,
355 5
                'exception' => $e,
356
            ]);
357
358 5
            $this->updateJob($job, JobInterface::STATUS_FAILED);
359 5
            $this->current_job = null;
360
361 5
            $this->db->{$this->scheduler->getEventQueue()}->insertOne([
362 5
                'job' => $job['_id'],
363
                'status' => JobInterface::STATUS_FAILED,
364 5
                'timestamp' => new UTCDateTime(),
365 5
                'data' => serialize($e),
366
            ]);
367
368 5
            if ($job['retry'] >= 0) {
369 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
370 1
                    'category' => get_class($this),
371 1
                    'pm' => $this->process,
372
                ]);
373
374 1
                $job = $this->scheduler->addJob($job['class'], $job['data'], [
375 1
                    Scheduler::OPTION_AT => time() + $job['retry_interval'],
376 1
                    Scheduler::OPTION_INTERVAL => $job['interval'],
377 1
                    Scheduler::OPTION_RETRY => --$job['retry'],
378 1
                    Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
379 1
                    Scheduler::OPTION_IGNORE_MAX_CHILDREN => $job['ignore_max_children'],
380
                ]);
381
382 1
                return $job->getId();
383
            }
384
        }
385
386 6
        if ($job['interval'] >= 0) {
387 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['interval'].'s]', [
388 1
                'category' => get_class($this),
389 1
                'pm' => $this->process,
390
            ]);
391
392 1
            $job = $this->scheduler->addJob($job['class'], $job['data'], [
393 1
                Scheduler::OPTION_AT => time() + $job['interval'],
394 1
                Scheduler::OPTION_INTERVAL => $job['interval'],
395 1
                Scheduler::OPTION_RETRY => $job['retry'],
396 1
                Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
397 1
                Scheduler::OPTION_IGNORE_MAX_CHILDREN => $job['ignore_max_children'],
398
            ]);
399
400 1
            return $job->getId();
401
        }
402
403 5
        return $job['_id'];
404
    }
405
406
    /**
407
     * Execute job.
408
     */
409 12
    protected function executeJob(array $job): bool
410
    {
411 12
        if (!class_exists($job['class'])) {
412 3
            throw new InvalidJobException('job class does not exists');
413
        }
414
415 9
        if (null === $this->container) {
416 8
            $instance = new $job['class']();
417
        } else {
418 1
            $instance = $this->container->get($job['class']);
419
        }
420
421 9
        if (!($instance instanceof JobInterface)) {
422
            throw new InvalidJobException('job must implement JobInterface');
423
        }
424
425
        $result = $instance
0 ignored issues
show
Unused Code introduced by
The assignment to $result is dead and can be removed.
Loading history...
426 9
            ->setData($job['data'])
427 9
            ->setId($job['_id'])
428 9
            ->start();
429
430 5
        $return = $this->updateJob($job, JobInterface::STATUS_DONE);
431
432 5
        $this->db->{$this->scheduler->getEventQueue()}->insertOne([
433 5
            'job' => $job['_id'],
434
            'status' => JobInterface::STATUS_DONE,
435 5
            'timestamp' => new UTCDateTime(),
436
        ]);
437
438 5
        return $return;
439
    }
440
}
441