Passed
Branch dev (b82c9a)
by Raffael
05:11
created

Queue::startWorker()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 19
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 3.1406

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 19
ccs 9
cts 12
cp 0.75
rs 9.9
c 0
b 0
f 0
cc 3
nc 3
nop 1
crap 3.1406
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\Database;
16
use Psr\Log\LoggerInterface;
17
use TaskScheduler\Exception\InvalidArgumentException;
18
use TaskScheduler\Exception\QueueRuntimeException;
19
20
class Queue
21
{
22
    /**
23
     * Process identifier.
24
     */
25
    public const MAIN_PROCESS = 'main';
26
27
    /**
28
     * Queue options.
29
     */
30
    public const OPTION_PM = 'pm';
31
    public const OPTION_MAX_CHILDREN = 'max_children';
32
    public const OPTION_MIN_CHILDREN = 'min_children';
33
34
    /**
35
     * Process handling.
36
     */
37
    public const PM_DYNAMIC = 'dynamic';
38
    public const PM_STATIC = 'static';
39
    public const PM_ONDEMAND = 'ondemand';
40
41
    /**
42
     * Process management.
43
     *
44
     * @var string
45
     */
46
    protected $pm = self::PM_DYNAMIC;
47
48
    /**
49
     * Scheduler.
50
     *
51
     * @var Scheduler
52
     */
53
    protected $scheduler;
54
55
    /**
56
     * Database.
57
     *
58
     * @var Database
59
     */
60
    protected $db;
61
62
    /**
63
     * Logger.
64
     *
65
     * @var LoggerInterface
66
     */
67
    protected $logger;
68
69
    /**
70
     * Max children.
71
     *
72
     * @var int
73
     */
74
    protected $max_children = 2;
75
76
    /**
77
     * Min children.
78
     *
79
     * @var int
80
     */
81
    protected $min_children = 1;
82
83
    /**
84
     * Forks.
85
     *
86
     * @var array
87
     */
88
    protected $forks = [];
89
90
    /**
91
     * Worker factory.
92
     *
93
     * @var WorkerFactoryInterface
94
     */
95
    protected $factory;
96
97
    /**
98
     * Jobs queue.
99
     *
100
     * @var MessageQueue
101
     */
102
    protected $jobs;
103
104
    /**
105
     * Events queue.
106
     *
107
     * @var MessageQueue
108
     */
109
    protected $events;
110
111
    /**
112
     * Main process name.
113
     *
114
     * @var string
115
     */
116
    protected $process;
117
118
    /**
119
     * Init queue.
120
     */
121 10
    public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger, array $config = [])
122
    {
123 10
        $this->scheduler = $scheduler;
124 10
        $this->db = $db;
125 10
        $this->logger = $logger;
126 10
        $this->setOptions($config);
127 10
        $this->process = self::MAIN_PROCESS;
128 10
        $this->factory = $factory;
129
130 10
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
131 10
        $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger);
132 10
    }
133
134
    /**
135
     * Set options.
136
     *
137
     *
138
     * @return Queue
139
     */
140 10
    public function setOptions(array $config = []): self
141
    {
142 10
        foreach ($config as $option => $value) {
143
            switch ($option) {
144 9
                case self::OPTION_MAX_CHILDREN:
145 8
                case self::OPTION_MIN_CHILDREN:
146 6
                    if (!is_int($value)) {
147 2
                        throw new InvalidArgumentException($option.' needs to be an integer');
148
                    }
149
150 4
                    $this->{$option} = $value;
151
152 4
                break;
153 4
                case self::OPTION_PM:
154 3
                    if (!defined('self::PM_'.strtoupper($value))) {
155 1
                        throw new InvalidArgumentException($value.' is not a valid process handling type (static, dynamic, ondemand)');
156
                    }
157
158 2
                    $this->{$option} = $value;
159
160 2
                break;
161
                default:
162 6
                    throw new InvalidArgumentException('invalid option '.$option.' given');
163
            }
164
        }
165
166 10
        if ($this->min_children > $this->max_children) {
167 1
            throw new InvalidArgumentException('option min_children must not be greater than option max_children');
168
        }
169
170 10
        return $this;
171
    }
172
173
    /**
174
     * Startup (blocking process).
175
     */
176
    public function process(): void
177
    {
178
        try {
179
            $this->spawnInitialWorkers();
180
            $this->main();
181
        } catch (\Exception $e) {
182
            $this->cleanup(SIGTERM);
183
        }
184
    }
185
186
    /**
187
     * Cleanup and exit.
188
     */
189
    public function cleanup(int $sig)
190
    {
191
        $this->handleSignal($sig);
192
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
193
    }
194
195
    /**
196
     * Wait for child and terminate.
197
     *
198
     *
199
     * @return Queue
200
     */
201 3
    public function exitChild(int $sig, array $pid): self
202
    {
203 3
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
204
205 3
        if (isset($this->forks[$pid['pid']])) {
206
            unset($this->forks[$pid['pid']]);
207
        }
208
209 3
        return $this;
210
    }
211
212
    /**
213
     * Start initial workers.
214
     */
215 5
    protected function spawnInitialWorkers()
216
    {
217 5
        $this->logger->debug('spawn initial ['.$this->min_children.'] child processes', [
218 5
            'category' => get_class($this),
219 5
            'pm' => $this->process,
220
        ]);
221
222 5
        if (self::PM_DYNAMIC === $this->pm || self::PM_STATIC === $this->pm) {
223 4
            for ($i = 0; $i < $this->min_children; ++$i) {
224 4
                $this->spawnWorker();
225
            }
226
        }
227 5
    }
228
229
    /**
230
     * Start worker.
231
     *
232
     * @see https://github.com/mongodb/mongo-php-driver/issues/828
233
     * @see https://github.com/mongodb/mongo-php-driver/issues/174
234
     */
235 4
    protected function spawnWorker(?array $job = null)
0 ignored issues
show
Unused Code introduced by
The parameter $job is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

235
    protected function spawnWorker(/** @scrutinizer ignore-unused */ ?array $job = null)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
236
    {
237 4
        $pid = pcntl_fork();
238 4
        $this->forks[] = $pid;
239
240 4
        if (-1 === $pid) {
241
            throw new QueueRuntimeException('failed to spawn new worker');
242
        }
243 4
        if (!$pid) {
244
            $worker = $this->factory->build()->start();
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $worker is correct as $this->factory->build()->start() targeting TaskScheduler\Worker::start() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
Unused Code introduced by
The assignment to $worker is dead and can be removed.
Loading history...
245
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
246
        }
247
248 4
        $this->logger->debug('spawn worker process ['.$pid.']', [
249 4
            'category' => get_class($this),
250 4
            'pm' => $this->process,
251
        ]);
252
253 4
        return $pid;
254
    }
255
256
    /**
257
     * Get forks (array of pid's).
258
     *
259
     * @return int[]
260
     */
261 5
    protected function getForks(): array
262
    {
263 5
        return $this->forks;
264
    }
265
266
    /**
267
     * Fork handling, blocking process.
268
     */
269
    protected function main(): void
270
    {
271
        $cursor = $this->jobs->getCursor();
272
        $this->catchSignal();
273
274
        while (true) {
275
            if (null === $cursor->current()) {
276
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

276
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
277
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
278
                        'category' => get_class($this),
279
                        'pm' => $this->process,
280
                    ]);
281
282
                    $this->jobs->create();
283
284
                    $this->main();
285
286
                    break;
287
                }
288
289
                $this->jobs->next($cursor, function () {
290
                    $this->main();
291
                });
292
293
                continue;
294
            }
295
296
            $job = $cursor->current();
297
            $this->jobs->next($cursor, function () {
298
                $this->main();
299
            });
300
301
            if (count($this->forks) < $this->max_children && self::PM_STATIC !== $this->pm) {
302
                $this->logger->debug('max_children ['.$this->max_children.'] processes not reached ['.count($this->forks).'], spawn new worker', [
303
                    'category' => get_class($this),
304
                    'pm' => $this->process,
305
                ]);
306
307
                $this->spawnWorker();
308
            } elseif (isset($job[Scheduler::OPTION_IGNORE_MAX_CHILDREN]) && true === $job[Scheduler::OPTION_IGNORE_MAX_CHILDREN]) {
309
                $this->logger->debug('job ['.$job['_id'].'] deployed with ignore_max_children, spawn new worker', [
310
                    'category' => get_class($this),
311
                    'pm' => $this->process,
312
                ]);
313
314
                $this->spawnWorker($job);
315
            } else {
316
                $this->logger->debug('max children ['.$this->max_children.'] reached for job ['.$job['_id'].'], do not spawn new worker', [
317
                    'category' => get_class($this),
318
                    'pm' => $this->process,
319
                ]);
320
            }
321
        }
322
    }
323
324
    /**
325
     * Catch signals and cleanup.
326
     *
327
     * @return Queue
328
     */
329 5
    protected function catchSignal(): self
330
    {
331 5
        pcntl_async_signals(true);
332 5
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
333 5
        pcntl_signal(SIGINT, [$this, 'cleanup']);
334 5
        pcntl_signal(SIGCHLD, [$this, 'exitChild']);
335
336 5
        return $this;
337
    }
338
339
    /**
340
     * Cleanup.
341
     */
342 1
    protected function handleSignal(int $sig): void
343
    {
344 1
        $this->logger->debug('received signal ['.$sig.']', [
345 1
            'category' => get_class($this),
346 1
            'pm' => $this->process,
347
        ]);
348
349 1
        foreach ($this->getForks() as $key => $pid) {
350 1
            $this->logger->debug('forward signal ['.$sig.'] to child process ['.$pid.']', [
351 1
                'category' => get_class($this),
352 1
                'pm' => $this->process,
353
            ]);
354
355 1
            posix_kill($pid, $sig);
356
        }
357 1
    }
358
}
359