Completed
Branch dev (11269e)
by Raffael
04:12
created

Worker::handleSignal()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 24
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2.9765

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 24
ccs 6
cts 16
cp 0.375
rs 9.7666
c 0
b 0
f 0
cc 2
nc 2
nop 1
crap 2.9765
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\BSON\UTCDateTime;
17
use MongoDB\Database;
18
use Psr\Container\ContainerInterface;
19
use Psr\Log\LoggerInterface;
20
21
class Worker extends AbstractQueue
22
{
23
    /**
24
     * Local queue.
25
     *
26
     * @var array
27
     */
28
    protected $queue = [];
29
30
    /**
31
     * Current processing job.
32
     *
33
     * @var null|array
34
     */
35
    protected $current_job;
36
37
    /**
38
     * Init queue.
39
     *
40
     * @param Scheduler          $scheduler
41
     * @param Database           $db
42
     * @param LoggerInterface    $logger
43
     * @param ContainerInterface $container
44
     */
45 21
    public function __construct(Scheduler $scheduler, Database $db, LoggerInterface $logger, ?ContainerInterface $container = null)
46
    {
47 21
        $this->process = (string) getmypid();
48 21
        $this->scheduler = $scheduler;
49 21
        $this->db = $db;
50 21
        $this->logger = $logger;
51 21
        $this->container = $container;
52 21
        $this->collection_name = $scheduler->getCollection();
53 21
    }
54
55
    /**
56
     * Start worker.
57
     */
58
    public function start()
59
    {
60
        $this->main();
61
    }
62
63
    /**
64
     * Cleanup and exit.
65
     *
66
     * @param int $sig
67
     */
68
    public function cleanup(int $sig)
69
    {
70
        $this->handleSignal($sig);
71
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
72
    }
73
74
    /**
75
     * Start worker.
76
     */
77
    protected function main()
78
    {
79
        $cursor = $this->getCursor();
80
        $this->catchSignal();
81
82
        while (true) {
83
            $this->processLocalQueue();
84
85
            if (null === $cursor->current()) {
86
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

86
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
87
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
88
                        'category' => get_class($this),
89
                        'pm' => $this->process,
90
                    ]);
91
92
                    $this->createQueue();
93
94
                    return $this->main();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->main() targeting TaskScheduler\Worker::main() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
95
                }
96
97
                $this->retrieveNextJob($cursor);
98
99
                continue;
100
            }
101
102
            $job = $cursor->current();
103
            $this->retrieveNextJob($cursor);
104
            $this->queueJob($job);
105
        }
106
    }
107
108
    /**
109
     * Catch signals and cleanup.
110
     *
111
     * @return Queue
112
     */
113 1
    protected function catchSignal(): self
114
    {
115 1
        pcntl_async_signals(true);
116 1
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
117 1
        pcntl_signal(SIGINT, [$this, 'cleanup']);
118
119 1
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type TaskScheduler\Worker which is incompatible with the documented return type TaskScheduler\Queue.
Loading history...
120
    }
121
122
    /**
123
     * Cleanup and exit.
124
     *
125
     * @param int $sig
126
     *
127
     * @return ObjectId
128
     */
129 2
    protected function handleSignal(int $sig): ?ObjectId
130
    {
131 2
        if (null === $this->current_job) {
132 2
            $this->logger->debug('received signal ['.$sig.'], no job is currently processing, exit now', [
133 2
                'category' => get_class($this),
134 2
                'pm' => $this->process,
135
            ]);
136
137 2
            return null;
138
        }
139
140
        $this->logger->debug('received signal ['.$sig.'], reschedule current processing job ['.$this->current_job['_id'].']', [
141
            'category' => get_class($this),
142
            'pm' => $this->process,
143
        ]);
144
145
        $this->updateJob($this->current_job, JobInterface::STATUS_CANCELED);
146
147
        return $this->scheduler->addJob($this->current_job['class'], $this->current_job['data'], [
148
            Scheduler::OPTION_AT => $this->current_job['retry_interval'],
149
            Scheduler::OPTION_INTERVAL => $this->current_job['interval'],
150
            Scheduler::OPTION_RETRY => --$this->current_job['retry'],
151
            Scheduler::OPTION_RETRY_INTERVAL => $this->current_job['retry_interval'],
152
            Scheduler::OPTION_IGNORE_MAX_CHILDREN => $this->current_job['ignore_max_children'],
153
        ]);
154
    }
155
156
    /**
157
     * Queue job.
158
     *
159
     * @param array $job
160
     */
161
    protected function queueJob(array $job): bool
162
    {
163
        if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING)) {
164
            $this->processJob($job);
165
        } elseif (JobInterface::STATUS_POSTPONED === $job['status']) {
166
            $this->logger->debug('found postponed job ['.$job['_id'].'] to requeue', [
167
                'category' => get_class($this),
168
                'pm' => $this->process,
169
            ]);
170
171
            $this->queue[] = $job;
172
        }
173
174
        return true;
175
    }
176
177
    /**
178
     * Update job status.
179
     *
180
     * @param array $job
181
     * @param int   $status
182
     * @param mixed $from_status
183
     *
184
     * @return bool
185
     */
186 4
    protected function collectJob(array $job, int $status, $from_status = JobInterface::STATUS_WAITING): bool
187
    {
188
        $set = [
189 4
             'status' => $status,
190
        ];
191
192
        //isset($job['started']) required due compatibility between 1.x and 2.x
193 4
        if (JobInterface::STATUS_PROCESSING === $status && isset($job['started'])) {
194 4
            $set['started'] = new UTCDateTime();
195
        }
196
197 4
        $result = $this->db->{$this->collection_name}->updateMany([
198 4
            '_id' => $job['_id'],
199 4
            'status' => $from_status,
200
            '$isolated' => true,
201
        ], [
202 4
            '$set' => $set,
203
        ]);
204
205 4
        if (1 === $result->getModifiedCount()) {
206 4
            $this->logger->debug('job ['.$job['_id'].'] updated to status ['.$status.']', [
207 4
                'category' => get_class($this),
208 4
                'pm' => $this->process,
209
            ]);
210
211 4
            return true;
212
        }
213
214 1
        $this->logger->debug('job ['.$job['_id'].'] is already collected with status ['.$status.']', [
215 1
            'category' => get_class($this),
216 1
            'pm' => $this->process,
217
        ]);
218
219 1
        return false;
220
    }
221
222
    /**
223
     * Update job status.
224
     *
225
     * @param array $job
226
     * @param int   $status
227
     *
228
     * @return bool
229
     */
230 12
    protected function updateJob(array $job, int $status): bool
231
    {
232
        $set = [
233 12
            'status' => $status,
234
        ];
235
236
        //isset($job['ended']) required due compatibility between 1.x and 2.x
237 12
        if ($status >= JobInterface::STATUS_DONE && isset($job['ended'])) {
238 9
            $set['ended'] = new UTCDateTime();
239
        }
240
241 12
        $result = $this->db->{$this->collection_name}->updateMany([
242 12
            '_id' => $job['_id'],
243
            '$isolated' => true,
244
        ], [
245 12
            '$set' => $set,
246
        ]);
247
248 12
        return $result->isAcknowledged();
249
    }
250
251
    /**
252
     * Check local queue for postponed jobs.
253
     *
254
     * @return bool
255
     */
256 3
    protected function processLocalQueue(): bool
257
    {
258 3
        $now = new UTCDateTime();
259 3
        foreach ($this->queue as $key => $job) {
260 3
            if ($job['at'] <= $now) {
261 2
                $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [
262 2
                    'category' => get_class($this),
263 2
                    'pm' => $this->process,
264
                ]);
265
266 2
                unset($this->queue[$key]);
267 2
                $job['at'] = null;
268
269 2
                if (true === $this->collectJob($job, JobInterface::STATUS_PROCESSING, JobInterface::STATUS_POSTPONED)) {
270 3
                    $this->processJob($job);
271
                }
272
            }
273
        }
274
275 3
        return true;
276
    }
277
278
    /**
279
     * Process job.
280
     *
281
     * @param array $job
282
     *
283
     * @return ObjectId
284
     */
285 7
    protected function processJob(array $job): ObjectId
286
    {
287 7
        if ($job['at'] instanceof UTCDateTime) {
288 1
            $this->updateJob($job, JobInterface::STATUS_POSTPONED);
289 1
            $this->queue[] = $job;
290
291 1
            $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at']->toDateTime()->format('c').']', [
292 1
                'category' => get_class($this),
293 1
                'pm' => $this->process,
294
            ]);
295
296 1
            return $job['_id'];
297
        }
298
299 6
        $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [
300 6
            'category' => get_class($this),
301 6
            'pm' => $this->process,
302 6
            'params' => $job['data'],
303
        ]);
304
305 6
        $this->current_job = $job;
306
307
        try {
308 6
            $this->executeJob($job);
309 2
            $this->current_job = null;
310 4
        } catch (\Exception $e) {
311 4
            $this->logger->error('failed execute job ['.$job['_id'].']', [
312 4
                'category' => get_class($this),
313 4
                'pm' => $this->process,
314 4
                'exception' => $e,
315
            ]);
316
317 4
            $this->updateJob($job, JobInterface::STATUS_FAILED);
318 4
            $this->current_job = null;
319
320 4
            if ($job['retry'] >= 0) {
321 1
                $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [
322 1
                    'category' => get_class($this),
323 1
                    'pm' => $this->process,
324
                ]);
325
326 1
                return $this->scheduler->addJob($job['class'], $job['data'], [
327 1
                    Scheduler::OPTION_AT => time() + $job['retry_interval'],
328 1
                    Scheduler::OPTION_INTERVAL => $job['interval'],
329 1
                    Scheduler::OPTION_RETRY => --$job['retry'],
330 1
                    Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
331 1
                    Scheduler::OPTION_IGNORE_MAX_CHILDREN => $job['ignore_max_children'],
332
                ]);
333
            }
334
        }
335
336 5
        if ($job['interval'] >= 0) {
337 1
            $this->logger->debug('job ['.$job['_id'].'] has an interval of ['.$job['interval'].'s]', [
338 1
                'category' => get_class($this),
339 1
                'pm' => $this->process,
340
            ]);
341
342 1
            return $this->scheduler->addJob($job['class'], $job['data'], [
343 1
                Scheduler::OPTION_AT => time() + $job['interval'],
344 1
                Scheduler::OPTION_INTERVAL => $job['interval'],
345 1
                Scheduler::OPTION_RETRY => $job['retry'],
346 1
                Scheduler::OPTION_RETRY_INTERVAL => $job['retry_interval'],
347 1
                Scheduler::OPTION_IGNORE_MAX_CHILDREN => $job['ignore_max_children'],
348
            ]);
349
        }
350
351 4
        return $job['_id'];
352
    }
353
354
    /**
355
     * Execute job.
356
     *
357
     * @param array $job
358
     *
359
     * @return bool
360
     */
361 10
    protected function executeJob(array $job): bool
362
    {
363 10
        if (!class_exists($job['class'])) {
364 3
            throw new Exception\InvalidJob('job class does not exists');
365
        }
366
367 7
        if (null === $this->container) {
368 6
            $instance = new $job['class']();
369
        } else {
370 1
            $instance = $this->container->get($job['class']);
371
        }
372
373 7
        if (!($instance instanceof JobInterface)) {
374
            throw new Exception\InvalidJob('job must implement JobInterface');
375
        }
376
377
        $instance
378 7
            ->setData($job['data'])
379 7
            ->setId($job['_id'])
380 7
            ->start();
381
382 4
        return $this->updateJob($job, JobInterface::STATUS_DONE);
383
    }
384
}
385