Test Failed
Push — master ( c6d925...ba9c61 )
by Raffael
03:12 queued 14s
created

WorkerManager   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 458
Duplicated Lines 0 %

Test Coverage

Coverage 59.88%

Importance

Changes 0
Metric Value
wmc 56
eloc 178
dl 0
loc 458
ccs 100
cts 167
cp 0.5988
rs 5.5199
c 0
b 0
f 0

14 Methods

Rating   Name   Duplication   Size   Complexity  
A getForks() 0 3 1
A cleanup() 0 15 2
B setOptions() 0 31 8
A spawnInitialWorkers() 0 9 4
B handleJob() 0 57 8
A processLocalQueue() 0 14 5
B handleEvent() 0 50 9
A spawnWorker() 0 31 4
A exitWorker() 0 19 4
A __construct() 0 5 1
A count() 0 3 1
A process() 0 6 1
A catchSignal() 0 8 1
B main() 0 28 7

How to fix   Complexity   

Complex Class

Complex classes like WorkerManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use WorkerManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\ObjectId;
16
use MongoDB\Database;
17
use Psr\Log\LoggerInterface;
18
use TaskScheduler\Exception\InvalidArgumentException;
19
use TaskScheduler\Exception\SpawnForkException;
20
21
class WorkerManager
22
{
23
    use InjectTrait;
24
25
    /**
26
     * Queue options.
27
     */
28
    public const OPTION_PM = 'pm';
29
    public const OPTION_MAX_CHILDREN = 'max_children';
30
    public const OPTION_MIN_CHILDREN = 'min_children';
31
32
    /**
33
     * Process handling.
34
     */
35
    public const PM_DYNAMIC = 'dynamic';
36
    public const PM_STATIC = 'static';
37
    public const PM_ONDEMAND = 'ondemand';
38
39
    /**
40
     * Fork handler actions.
41
     */
42
    public const TYPE_JOB = 1;
43
    public const TYPE_EVENT = 2;
44
45
    /**
46
     * Process management.
47
     *
48
     * @var string
49
     */
50
    protected $pm = self::PM_DYNAMIC;
51
52
    /**
53
     * Scheduler.
54
     *
55
     * @var Scheduler
56
     */
57
    protected $scheduler;
58
59
    /**
60
     * Database.
61
     *
62
     * @var Database
63
     */
64
    protected $db;
65
66
    /**
67
     * Logger.
68
     *
69
     * @var LoggerInterface
70
     */
71
    protected $logger;
72
73
    /**
74
     * Max children.
75
     *
76
     * @var int
77
     */
78
    protected $max_children = 2;
79
80
    /**
81
     * Min children.
82
     *
83
     * @var int
84
     */
85
    protected $min_children = 1;
86
87
    /**
88
     * Forks.
89
     *
90
     * @var array
91
     */
92
    protected $forks = [];
93
94
    /**
95
     * Worker/Job mapping.
96
     *
97
     * @var array
98
     */
99
    protected $job_map = [];
100
101
    /**
102
     * Queue (Communication between TaskScheduler\Queue and TaskScheduler\WorkerManager).
103
     *
104
     * @var resource
105
     */
106
    protected $queue;
107
108
    /**
109
     * Hold queue.
110
     *
111
     * @var array
112
     */
113
    protected $onhold = [];
114
115
    /**
116
     * Worker factory
117
     *
118
     * @var WorkerFactoryInterface
119
     */
120
    protected $factory;
121
122
    /**
123
     * Init queue.
124
     */
125 13
    public function __construct(WorkerFactoryInterface $factory, LoggerInterface $logger, array $config = [])
126
    {
127 13
        $this->logger = $logger;
128 13
        $this->setOptions($config);
129 13
        $this->factory = $factory;
130 13
    }
131
132
    /**
133
     * Set options.
134
     */
135 13
    public function setOptions(array $config = []): self
136
    {
137 13
        foreach ($config as $option => $value) {
138
            switch ($option) {
139 12
                case self::OPTION_MAX_CHILDREN:
140 11
                case self::OPTION_MIN_CHILDREN:
141 8
                    if (!is_int($value)) {
142 2
                        throw new InvalidArgumentException($option.' needs to be an integer');
143
                    }
144
145 6
                    $this->{$option} = $value;
146
147 6
                break;
148 7
                case self::OPTION_PM:
149 6
                    if (!defined('self::PM_'.strtoupper($value))) {
150 1
                        throw new InvalidArgumentException($value.' is not a valid process handling type (static, dynamic, ondemand)');
151
                    }
152
153 5
                    $this->{$option} = $value;
154
155 5
                break;
156
                default:
157 9
                    throw new InvalidArgumentException('invalid option '.$option.' given');
158
            }
159
        }
160
161 13
        if ($this->min_children > $this->max_children) {
162 1
            throw new InvalidArgumentException('option min_children must not be greater than option max_children');
163
        }
164
165 13
        return $this;
166
    }
167
168
    /**
169
     * Startup (blocking process).
170
     */
171 8
    public function process(): void
172
    {
173 8
        $this->queue = msg_get_queue(ftok(__DIR__.DIRECTORY_SEPARATOR.'Queue.php', 't'));
174 8
        $this->catchSignal();
175 8
        $this->spawnInitialWorkers();
176 8
        $this->main();
177 8
    }
178
179
    /**
180
     * Wait for child and terminate.
181
     */
182 7
    public function exitWorker(int $sig, array $pid): self
183
    {
184 7
        $this->logger->debug('worker ['.$pid['pid'].'] exit with ['.$sig.']', [
185 7
            'category' => get_class($this),
186
        ]);
187
188 7
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
189
190 7
        foreach ($this->forks as $id => $process) {
191 7
            if ($process === $pid['pid']) {
192 4
                unset($this->forks[$id]);
193
194 4
                if (isset($this->job_map[$id])) {
195 7
                    unset($this->job_map[$id]);
196
                }
197
            }
198
        }
199
200 7
        return $this;
201
    }
202
203
    /**
204
     * Count children.
205
     */
206 8
    public function count(): int
207
    {
208 8
        return count($this->forks);
209
    }
210
211
    /**
212
     * Cleanup.
213
     */
214
    public function cleanup(int $sig): void
215
    {
216
        $this->logger->debug('received signal ['.$sig.']', [
217
            'category' => get_class($this),
218
        ]);
219
220
        foreach ($this->getForks() as $id => $pid) {
221
            $this->logger->debug('forward signal ['.$sig.'] to worker ['.$id.'] running with pid ['.$pid.']', [
222
                'category' => get_class($this),
223
            ]);
224
225
            posix_kill($pid, $sig);
226
        }
227
228
        $this->exit();
229
    }
230
231
    /**
232
     * Start initial workers.
233
     */
234 8
    protected function spawnInitialWorkers()
235
    {
236 8
        $this->logger->debug('spawn initial ['.$this->min_children.'] workers', [
237 8
            'category' => get_class($this),
238
        ]);
239
240 8
        if (self::PM_DYNAMIC === $this->pm || self::PM_STATIC === $this->pm) {
241 6
            for ($i = $this->count(); $i < $this->min_children; ++$i) {
242 5
                $this->spawnWorker();
243
            }
244
        }
245 8
    }
246
247
    /**
248
     * Start worker.
249
     *
250
     * @see https://github.com/mongodb/mongo-php-driver/issues/828
251
     * @see https://github.com/mongodb/mongo-php-driver/issues/174
252
     */
253 7
    protected function spawnWorker(?ObjectId $job = null)
254
    {
255 7
        $this->logger->debug('spawn new worker', [
256 7
            'category' => get_class($this),
257
        ]);
258
259 7
        $id = new ObjectId();
260 7
        $pid = pcntl_fork();
261
262 7
        if (-1 === $pid) {
263
            throw new SpawnForkException('failed to spawn new worker');
264
        }
265
266 7
        if (!$pid) {
267
            $worker = $this->factory->buildWorker($id);
268
269
            if (null === $job) {
270
                $worker->processAll();
271
            } else {
272
                $worker->processOne($job);
273
            }
274
275
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
276
        }
277
278 7
        $this->forks[(string) $id] = $pid;
279 7
        $this->logger->debug('spawned worker ['.$id.'] with pid ['.$pid.']', [
280 7
            'category' => get_class($this),
281
        ]);
282
283 7
        return $pid;
284
    }
285
286
    /**
287
     * Get forks (array of pid's).
288
     */
289 1
    protected function getForks(): array
290
    {
291 1
        return $this->forks;
292
    }
293
294
    /**
295
     * Main.
296
     */
297 8
    protected function main(): void
298
    {
299 8
        while ($this->loop()) {
300 8
            if (count($this->onhold) > 0 || !$this->loop()) {
301 8
                $wait = MSG_IPC_NOWAIT;
302 8
                usleep(200);
303 8
                $this->processLocalQueue();
304
            } else {
305 1
                $wait = 0;
306
            }
307
308 8
            if (msg_receive($this->queue, 0, $type, 16384, $msg, true, $wait)) {
309 3
                $this->logger->debug('received systemv message type ['.$type.']', [
310 3
                    'category' => get_class($this),
311
                ]);
312
313
                switch ($type) {
314 3
                    case self::TYPE_JOB:
315 3
                        $this->handleJob($msg);
316
317 3
                    break;
318
                    case self::TYPE_EVENT:
319
                        $this->handleEvent($msg);
320
321
                    break;
322
                    default:
323
                        $this->logger->warning('received unknown systemv message type ['.$type.']', [
324
                            'category' => get_class($this),
325
                        ]);
326
                }
327
            }
328
        }
329 8
    }
330
331
    /**
332
     * Handle events.
333
     */
334
    protected function handleEvent(array $event): self
335
    {
336
        $this->logger->debug('handle event ['.$event['status'].'] for job ['.$event['job'].']', [
337
            'category' => get_class($this),
338
        ]);
339
340
        switch ($event['status']) {
341
            case JobInterface::STATUS_PROCESSING:
342
                $this->job_map[(string) $event['worker']] = $event['job'];
343
344
                return $this;
345
            case JobInterface::STATUS_DONE:
346
            case JobInterface::STATUS_FAILED:
347
            case JobInterface::STATUS_TIMEOUT:
348
                $worker = array_search((string) $event['job'], $this->job_map);
349
                if (false === $worker) {
350
                    return $this;
351
                }
352
353
                unset($this->job_map[$worker]);
354
355
                return $this;
356
357
            break;
0 ignored issues
show
Unused Code introduced by
break is not strictly necessary here and could be removed.

The break statement is not necessary if it is preceded for example by a return statement:

switch ($x) {
    case 1:
        return 'foo';
        break; // This break is not necessary and can be left off.
}

If you would like to keep this construct to be consistent with other case statements, you can safely mark this issue as a false-positive.

Loading history...
358
            case JobInterface::STATUS_CANCELED:
359
                $worker = array_search($event['job'], $this->job_map);
360
                if (false === $worker) {
361
                    return $this;
362
                }
363
364
                $this->logger->debug('received cancel event for job ['.$event['job'].'] running on worker ['.$worker.']', [
365
                    'category' => get_class($this),
366
                ]);
367
368
                if (isset($this->forks[(string) $worker])) {
369
                    $this->logger->debug('found running worker ['.$worker.'] on this queue node, terminate it now', [
370
                        'category' => get_class($this),
371
                    ]);
372
373
                    unset($this->job_map[(string) $worker]);
374
                    posix_kill($this->forks[(string) $worker], SIGKILL);
375
                }
376
377
                return $this;
378
            default:
379
                $this->logger->warning('received event ['.$event['_id'].'] with unknown status ['.$event['status'].']', [
380
                    'category' => get_class($this),
381
                ]);
382
383
                return $this;
384
        }
385
    }
386
387
    /**
388
     * Process onhold (only used if pm === ondemand or for postponed FORCE_SPAWN jobs).
389
     */
390 8
    protected function processLocalQueue(): self
391
    {
392 8
        foreach ($this->onhold as $id => $job) {
393
            if ($job['options']['at'] <= time() && ($this->count() < $this->max_children || true === $job['options']['force_spawn'])) {
394
                $this->logger->debug('hold ondemand job ['.$id.'] may no be executed', [
395
                    'category' => get_class($this),
396
                ]);
397
398
                unset($this->onhold[$id]);
399
                $this->spawnWorker($job['_id']);
400
            }
401
        }
402
403 8
        return $this;
404
    }
405
406
    /**
407
     * Handle job.
408
     */
409 3
    protected function handleJob(array $job): self
410
    {
411 3
        if (true === $job['options'][Scheduler::OPTION_FORCE_SPAWN]) {
412 1
            if ($job['options']['at'] > time()) {
413
                $this->logger->debug('found postponed job ['.$job['_id'].'] with force_spawn, keep in local queue', [
414
                    'category' => get_class($this),
415
                ]);
416
417
                $this->onhold[(string) $job['_id']] = $job;
418
419
                return $this;
420
            }
421
422 1
            $this->logger->debug('job ['.$job['_id'].'] deployed with force_spawn, spawn new worker', [
423 1
                'category' => get_class($this),
424
            ]);
425
426 1
            $this->spawnWorker($job['_id']);
427
428 1
            return $this;
429
        }
430
431 2
        if (self::PM_ONDEMAND === $this->pm) {
432 1
            if ($job['options']['at'] > time()) {
433
                $this->logger->debug('found ondemand postponed job ['.$job['_id'].'], keep in local queue', [
434
                    'category' => get_class($this),
435
                ]);
436
437
                $this->onhold[(string) $job['_id']] = $job;
438
439
                return $this;
440
            }
441
442 1
            if ($this->count() < $this->max_children) {
443 1
                $this->spawnWorker($job['_id']);
444
            } else {
445
                $this->onhold[(string) $job['_id']] = $job;
446
            }
447
448 1
            return $this;
449
        }
450
451 1
        if ($this->count() < $this->max_children && self::PM_DYNAMIC === $this->pm) {
452 1
            $this->logger->debug('max_children ['.$this->max_children.'] workers not reached ['.$this->count().'], spawn new worker', [
453 1
                'category' => get_class($this),
454
            ]);
455
456 1
            $this->spawnWorker();
457
458 1
            return $this;
459
        }
460
461
        $this->logger->debug('max children ['.$this->max_children.'] reached for job ['.$job['_id'].'], do not spawn new worker', [
462
            'category' => get_class($this),
463
        ]);
464
465
        return $this;
466
    }
467
468
    /**
469
     * Catch signals and cleanup.
470
     */
471 8
    protected function catchSignal(): self
472
    {
473 8
        pcntl_async_signals(true);
474 8
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
475 8
        pcntl_signal(SIGINT, [$this, 'cleanup']);
476 8
        pcntl_signal(SIGCHLD, [$this, 'exitWorker']);
477
478 8
        return $this;
479
    }
480
}
481