Completed
Branch dev (c58bbf)
by Raffael
03:36
created

Queue::process()   A

Complexity

Conditions 2
Paths 3

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 3.1852

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 13
ccs 3
cts 9
cp 0.3333
rs 9.9666
c 0
b 0
f 0
cc 2
nc 3
nop 0
crap 3.1852
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\Database;
16
use Psr\Log\LoggerInterface;
17
use TaskScheduler\Exception\InvalidArgumentException;
18
use TaskScheduler\Exception\QueueRuntimeException;
19
20
class Queue
21
{
22
    /**
23
     * Process identifier.
24
     */
25
    public const MAIN_PROCESS = 'main';
26
27
    /**
28
     * Queue options.
29
     */
30
    public const OPTION_PM = 'pm';
31
    public const OPTION_MAX_CHILDREN = 'max_children';
32
    public const OPTION_MIN_CHILDREN = 'min_children';
33
34
    /**
35
     * Process handling.
36
     */
37
    public const PM_DYNAMIC = 'dynamic';
38
    public const PM_STATIC = 'static';
39
    public const PM_ONDEMAND = 'ondemand';
40
41
    /**
42
     * Process management.
43
     *
44
     * @var string
45
     */
46
    protected $pm = self::PM_DYNAMIC;
47
48
    /**
49
     * Scheduler.
50
     *
51
     * @var Scheduler
52
     */
53
    protected $scheduler;
54
55
    /**
56
     * Database.
57
     *
58
     * @var Database
59
     */
60
    protected $db;
61
62
    /**
63
     * Logger.
64
     *
65
     * @var LoggerInterface
66
     */
67
    protected $logger;
68
69
    /**
70
     * Max children.
71
     *
72
     * @var int
73
     */
74
    protected $max_children = 2;
75
76
    /**
77
     * Min children.
78
     *
79
     * @var int
80
     */
81
    protected $min_children = 1;
82
83
    /**
84
     * Forks.
85
     *
86
     * @var array
87
     */
88
    protected $forks = [];
89
90
    /**
91
     * Worker factory.
92
     *
93
     * @var WorkerFactoryInterface
94
     */
95
    protected $factory;
96
97
    /**
98
     * Jobs queue.
99
     *
100
     * @var MessageQueue
101
     */
102
    protected $jobs;
103
104
    /**
105
     * Events queue.
106
     *
107
     * @var MessageQueue
108
     */
109
    protected $events;
110
111
    /**
112
     * Main process name.
113
     *
114
     * @var string
115
     */
116
    protected $process;
117
118
    /**
119
     * Init queue.
120
     */
121 13
    public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger, array $config = [])
122
    {
123 13
        $this->scheduler = $scheduler;
124 13
        $this->db = $db;
125 13
        $this->logger = $logger;
126 13
        $this->setOptions($config);
127 13
        $this->process = self::MAIN_PROCESS;
128 13
        $this->factory = $factory;
129
130 13
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
131 13
        $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger);
132 13
    }
133
134
    /**
135
     * Set options.
136
     *
137
     *
138
     * @return Queue
139
     */
140 13
    public function setOptions(array $config = []): self
141
    {
142 13
        foreach ($config as $option => $value) {
143
            switch ($option) {
144 10
                case self::OPTION_MAX_CHILDREN:
145 9
                case self::OPTION_MIN_CHILDREN:
146 6
                    if (!is_int($value)) {
147 2
                        throw new InvalidArgumentException($option.' needs to be an integer');
148
                    }
149
150 4
                    $this->{$option} = $value;
151
152 4
                break;
153 5
                case self::OPTION_PM:
154 4
                    if (!defined('self::PM_'.strtoupper($value))) {
155 1
                        throw new InvalidArgumentException($value.' is not a valid process handling type (static, dynamic, ondemand)');
156
                    }
157
158 3
                    $this->{$option} = $value;
159
160 3
                break;
161
                default:
162 7
                    throw new InvalidArgumentException('invalid option '.$option.' given');
163
            }
164
        }
165
166 13
        if ($this->min_children > $this->max_children) {
167 1
            throw new InvalidArgumentException('option min_children must not be greater than option max_children');
168
        }
169
170 13
        return $this;
171
    }
172
173
    /**
174
     * Startup (blocking process).
175
     */
176 8
    public function process(): void
177
    {
178
        try {
179 8
            $this->spawnInitialWorkers();
180 8
            $this->main();
181
        } catch (Exception $e) {
182
            $this->logger->error('main() throw an exception, cleanup and exit', [
183
                'class' => get_class($this),
184
                'exception' => $e,
185
                'pm' => $this->process,
186
            ]);
187
188
            $this->cleanup(SIGTERM);
189
        }
190 8
    }
191
192
    /**
193
     * Cleanup and exit.
194
     */
195
    public function cleanup(int $sig)
196
    {
197
        $this->handleSignal($sig);
198
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
199
    }
200
201
    /**
202
     * Wait for child and terminate.
203
     */
204 6
    public function exitChild(int $sig, array $pid): self
205
    {
206 6
        $this->logger->debug('child process ['.$pid['pid'].'] exit with ['.$sig.']', [
207 6
            'category' => get_class($this),
208 6
            'pm' => $this->process,
209
        ]);
210
211 6
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
212
213 6
        if (isset($this->forks[$pid['pid']])) {
214
            unset($this->forks[$pid['pid']]);
215
        }
216
217 6
        return $this;
218
    }
219
220
    /**
221
     * Count children.
222
     */
223 8
    public function count(): int
224
    {
225 8
        return count($this->forks);
226
    }
227
228
    /**
229
     * Start initial workers.
230
     */
231 8
    protected function spawnInitialWorkers()
232
    {
233 8
        $this->logger->debug('spawn initial ['.$this->min_children.'] child processes', [
234 8
            'category' => get_class($this),
235 8
            'pm' => $this->process,
236
        ]);
237
238 8
        if (self::PM_DYNAMIC === $this->pm || self::PM_STATIC === $this->pm) {
239 6
            for ($i = $this->count(); $i < $this->min_children; ++$i) {
240 6
                $this->spawnWorker();
241
            }
242
        }
243 8
    }
244
245
    /**
246
     * Start worker.
247
     *
248
     * @see https://github.com/mongodb/mongo-php-driver/issues/828
249
     * @see https://github.com/mongodb/mongo-php-driver/issues/174
250
     */
251 7
    protected function spawnWorker()
252
    {
253 7
        $pid = pcntl_fork();
254
255 7
        if (-1 === $pid) {
256
            throw new QueueRuntimeException('failed to spawn new worker');
257
        }
258
259 7
        $this->forks[] = $pid;
260
261 7
        if (!$pid) {
262
            $worker = $this->factory->build()->start();
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $worker is correct as $this->factory->build()->start() targeting TaskScheduler\Worker::start() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
Unused Code introduced by
The assignment to $worker is dead and can be removed.
Loading history...
263
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
264
        }
265
266 7
        $this->logger->debug('spawn worker process ['.$pid.']', [
267 7
            'category' => get_class($this),
268 7
            'pm' => $this->process,
269
        ]);
270
271 7
        return $pid;
272
    }
273
274
    /**
275
     * Get forks (array of pid's).
276
     *
277
     * @return int[]
278
     */
279 1
    protected function getForks(): array
280
    {
281 1
        return $this->forks;
282
    }
283
284
    /**
285
     * This method may seem useless but is actually very useful to mock the loop.
286
     */
287
    protected function loop(): bool
288
    {
289
        return true;
290
    }
291
292
    /**
293
     * Fork handling, blocking process.
294
     */
295 8
    protected function main(): void
296
    {
297 8
        $cursor = $this->jobs->getCursor([
298 8
            '$or' => [
299
                ['status' => JobInterface::STATUS_WAITING],
300
                ['status' => JobInterface::STATUS_POSTPONED],
301
            ],
302
        ]);
303
304 8
        $this->catchSignal();
305
306 8
        while ($this->loop()) {
307 8
            if (null === $cursor->current()) {
308 5
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

308
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
309
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
310
                        'category' => get_class($this),
311
                        'pm' => $this->process,
312
                    ]);
313
314
                    $this->jobs->create();
315
316
                    $this->main();
317
318
                    break;
319
                }
320
321 5
                $this->jobs->next($cursor, function () {
322
                    $this->main();
323 5
                });
324
325 5
                continue;
326
            }
327
328 3
            $job = $cursor->current();
329 3
            $this->jobs->next($cursor, function () {
330
                $this->main();
331 3
            });
332
333 3
            if ($this->count() < $this->max_children && self::PM_STATIC !== $this->pm) {
334 3
                $this->logger->debug('max_children ['.$this->max_children.'] processes not reached ['.$this->count().'], spawn new worker', [
335 3
                    'category' => get_class($this),
336 3
                    'pm' => $this->process,
337
                ]);
338
339 3
                $this->spawnWorker();
340 2
            } elseif (true === $job['options'][Scheduler::OPTION_IGNORE_MAX_CHILDREN]) {
341 1
                $this->logger->debug('job ['.$job['_id'].'] deployed with ignore_max_children, spawn new worker', [
342 1
                    'category' => get_class($this),
343 1
                    'pm' => $this->process,
344
                ]);
345
346 1
                $this->spawnWorker();
347
            } else {
348 1
                $this->logger->debug('max children ['.$this->max_children.'] reached for job ['.$job['_id'].'], do not spawn new worker', [
349 1
                    'category' => get_class($this),
350 1
                    'pm' => $this->process,
351
                ]);
352
            }
353
        }
354 8
    }
355
356
    /**
357
     * Catch signals and cleanup.
358
     *
359
     * @return Queue
360
     */
361 8
    protected function catchSignal(): self
362
    {
363 8
        pcntl_async_signals(true);
364 8
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
365 8
        pcntl_signal(SIGINT, [$this, 'cleanup']);
366 8
        pcntl_signal(SIGCHLD, [$this, 'exitChild']);
367
368 8
        return $this;
369
    }
370
371
    /**
372
     * Cleanup.
373
     */
374 1
    protected function handleSignal(int $sig): void
375
    {
376 1
        $this->logger->debug('received signal ['.$sig.']', [
377 1
            'category' => get_class($this),
378 1
            'pm' => $this->process,
379
        ]);
380
381 1
        foreach ($this->getForks() as $key => $pid) {
382 1
            $this->logger->debug('forward signal ['.$sig.'] to child process ['.$pid.']', [
383 1
                'category' => get_class($this),
384 1
                'pm' => $this->process,
385
            ]);
386
387 1
            posix_kill($pid, $sig);
388
        }
389 1
    }
390
}
391