Passed
Push — develop ( 63565e...e47965 )
by Brent
07:18
created

Manager::wait()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 10
nc 2
nop 1
dl 0
loc 18
ccs 9
cts 9
cp 1
crap 3
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
namespace Pageon\Pcntl;
4
5
/**
6
 * This Manager is used to create and wait for forked processes which can be executed in parallel.
7
 * It's a wrapper around `ext-pcntl`.
8
 *
9
 * Class Manager
10
 * @package Pageon\Pcntl
11
 */
12
class Manager
13
{
14
    /**
15
     * Create an asynchronous process.
16
     *
17
     * @param Process $process The process to run asynchronous.
18
     *
19
     * @return Process The asynchronous process.
20
     */
21 1
    public function async(Process $process) : Process
22
    {
23 1
        $sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
24
25 1
        list($parentSocket, $childSocket) = $sockets;
26
27 1
        if (($pid = pcntl_fork()) == 0) {
28
            fclose($childSocket);
29
            $output = serialize($process->execute());
30
            fwrite($parentSocket, $output);
31
            fclose($parentSocket);
32
33
            exit;
34
        }
35
36 1
        fclose($parentSocket);
37
38
        $process
39 1
            ->setStartTime(time())
40 1
            ->setPid($pid)
41 1
            ->setSocket($childSocket);
42
43 1
        return $process;
44
    }
45
46
    /**
47
     * Wait for a collection of processes to finish.
48
     *
49
     * @param ProcessCollection $processCollection
50
     *
51
     * @return array
52
     * @throws \Exception
53
     */
54 1
    public function wait(ProcessCollection $processCollection)
55
    {
56 1
        $output = [];
57 1
        $processes = $processCollection->toArray();
58
59
        do {
60 1
            usleep(100000);
61
62
            /** @var Process $process */
63 1
            foreach ($processes as $key => $process) {
64 1
                $processStatus = pcntl_waitpid($process->getPid(), $processStatus, WNOHANG | WUNTRACED);
65
66 1
                $this->handleProcessStatus($processStatus, $process, $processes, $key);
67
            }
68 1
        } while (count($processes));
69
70 1
        return $output;
71
    }
72
73
    /**
74
     * @param         $processStatus
75
     * @param Process $process
76
     * @param array   $processes
77
     * @param         $key
78
     *
79
     * @throws \Exception
80
     */
81 1
    private function handleProcessStatus($processStatus, Process $process, array &$processes, $key)
82
    {
83
        switch ($processStatus) {
84 1
            case $process->getPid():
85 1
                $this->handleProcessSuccess($process);
86 1
                unset($processes[$key]);
87
88 1
                break;
89 1
            case 0:
90 1
                if ($process->getStartTime() + $process->getMaxRunTime() < time() || pcntl_wifstopped($processStatus)) {
91
                    $this->handleProcessStop($process);
92
93
                    unset($processes[$key]);
94
                }
95
96 1
                break;
97
            default:
98
                throw new \Exception("Could not reliably manage {$process->getPid()}");
99
        }
100 1
    }
101
102
    /**
103
     * Handle a successful process.
104
     *
105
     * @param Process $process
106
     */
107 1
    private function handleProcessSuccess(Process $process)
108
    {
109 1
        $output = '';
110 1
        while (!feof($process->getSocket())) {
111 1
            $output .= fgets($process->getSocket());
112
        }
113
114 1
        fclose($process->getSocket());
115
116 1
        if ($success = $process->getSuccess()) {
117 1
            call_user_func_array($success, [unserialize($output), $process]);
118
        }
119 1
    }
120
121
    /**
122
     * Handle a stopped process.
123
     *
124
     * @param Process $process
125
     *
126
     * @throws \Exception
127
     */
128
    private function handleProcessStop(Process $process)
129
    {
130
        if (!posix_kill($process->getPid(), SIGKILL)) {
131
            throw new \Exception('Failed to kill ' . $process->getPid() . ': ' . posix_strerror(posix_get_last_error()));
132
        }
133
    }
134
}
135