Test Failed
Push — develop ( aae594...b21277 )
by Brent
02:43
created

Manager::wait()   D

Complexity

Conditions 10
Paths 14

Size

Total Lines 43
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 26
nc 14
nop 1
dl 0
loc 43
rs 4.8196
c 0
b 0
f 0

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Pageon\Pcntl;
4
5
use Brendt\Stitcher\Event\Event;
6
use Symfony\Component\EventDispatcher\EventDispatcher;
7
8
class Manager
9
{
10
    /**
11
     * @var EventDispatcher
12
     */
13
    private $eventDispatcher;
14
15
    private $children;
16
17
    public function __construct(EventDispatcher $eventDispatcher) {
18
        pcntl_signal_dispatch();
19
        $this->eventDispatcher = $eventDispatcher;
20
        $this->children = [];
21
22
        pcntl_signal(SIGCHLD, function () {
23
24
        });
25
    }
26
27
    public function async(Process $process) : Process {
28
        socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $sockets);
29
30
        list($parentSocket, $childSocket) = $sockets;
31
32
        if (($pid = pcntl_fork()) == 0) {
33
            socket_close($childSocket);
34
            $output = serialize($process->execute());
35
            socket_write($parentSocket, substr($output, 0, 4095));
36
            socket_close($parentSocket);
37
38
            exit;
39
        }
40
41
        socket_close($parentSocket);
42
43
        $process
44
            ->setStartTime(time())
45
            ->setPid($pid)
46
            ->setSocket($childSocket);
47
48
        return $process;
49
    }
50
51
    public function wait(ProcessCollection $processCollection) {
52
        $output = [];
53
        $passes = 1;
54
        $processes = $processCollection->toArray();
55
56
        while (count($processes)) {
57
            /** @var Process $process */
58
            foreach ($processes as $key => $process) {
59
                $processStatus = pcntl_waitpid($process->getPid(), $status, WNOHANG | WUNTRACED);
60
61
                if ($processStatus == $process->getPid()) {
62
                    $output[] = unserialize(socket_read($process->getSocket(), 4096));
63
                    socket_close($process->getSocket());
64
65
                    $success = $process->getSuccess();
66
                    if ($success) {
67
                        call_user_func_array($success, [$process]);
68
                    }
69
70
                    unset($processes[$key]);
71
                } else if ($processStatus == 0) {
72
                    if ($process->getStartTime() + $process->getMaxRunTime() < time() || pcntl_wifstopped($status)) {
73
                        if (!posix_kill($process->getPid(), SIGKILL)) {
74
                            throw new \Exception('Failed to kill ' . $process->getPid() . ': ' . posix_strerror(posix_get_last_error()), E_USER_WARNING);
75
                        }
76
77
                        unset($processes[$key]);
78
                    }
79
                } else {
80
                    trigger_error('Something went terribly wrong with process ' . $process->getPid(), E_USER_WARNING);
81
                }
82
            }
83
84
            if (!count($processes)) {
85
                break;
86
            }
87
88
            ++$passes;
89
            usleep(100000);
90
        }
91
92
        return $output;
93
    }
94
}
95