Test Failed
Push — develop ( be1b61...2191b6 )
by Brent
03:08
created

Manager::handleProcessStop()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 3
nc 2
nop 1
dl 0
loc 5
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
namespace Pageon\Pcntl;
4
5
/**
6
 * This Manager is used to create and wait for forked processes which can be executed in parallel.
7
 * It's a wrapper around `ext-pcntl`.
8
 *
9
 * Class Manager
10
 * @package Pageon\Pcntl
11
 */
12
class Manager
13
{
14
    /**
15
     * Create an asynchronous process.
16
     *
17
     * @param Process $process The process to run asynchronous.
18
     *
19
     * @return Process The asynchronous process.
20
     */
21
    public function async(Process $process) : Process {
22
        $sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
23
24
        list($parentSocket, $childSocket) = $sockets;
25
26
        if (($pid = pcntl_fork()) == 0) {
27
            fclose($childSocket);
28
            $output = serialize($process->execute());
29
            fwrite($parentSocket, $output);
30
            fclose($parentSocket);
31
32
            exit;
33
        }
34
35
        fclose($parentSocket);
36
37
        $process
38
            ->setStartTime(time())
39
            ->setPid($pid)
40
            ->setSocket($childSocket);
41
42
        return $process;
43
    }
44
45
    /**
46
     * Wait for a collection of processes to finish.
47
     *
48
     * @param ProcessCollection $processCollection
49
     *
50
     * @return array
51
     * @throws \Exception
52
     */
53
    public function wait(ProcessCollection $processCollection) {
54
        $output = [];
55
        $processes = $processCollection->toArray();
56
57
        do {
58
            usleep(100000);
59
60
            /** @var Process $process */
61
            foreach ($processes as $key => $process) {
62
                $processStatus = pcntl_waitpid($process->getPid(), $processStatus, WNOHANG | WUNTRACED);
63
64
                $this->handleProcessStatus($processStatus, $process, $processes, $key);
65
            }
66
        } while (count($processes));
67
68
        return $output;
69
    }
70
71
    private function handleProcessStatus($processStatus, Process $process, array &$processes, $key) {
72
        switch ($processStatus) {
73
            case $process->getPid():
74
                $this->handleProcessSuccess($process);
75
                unset($processes[$key]);
76
77
                break;
78
            case 0:
79
                if ($process->getStartTime() + $process->getMaxRunTime() < time() || pcntl_wifstopped($processStatus)) {
80
                    $this->handleProcessStop($process);
81
82
                    unset($processes[$key]);
83
                }
84
85
                break;
86
            default:
87
                throw new \Exception("Could not reliably manage {$process->getPid()}");
88
        }
89
    }
90
91
    /**
92
     * Handle a successful process.
93
     *
94
     * @param Process $process
95
     */
96
    private function handleProcessSuccess(Process $process) {
97
        $output = '';
98
        while (!feof($process->getSocket())) {
99
            $output .= fgets($process->getSocket());
100
        }
101
102
        fclose($process->getSocket());
103
104
        if ($success = $process->getSuccess()) {
105
            call_user_func_array($success, [unserialize($output), $process]);
106
        }
107
    }
108
109
    /**
110
     * Handle a stopped process.
111
     *
112
     * @param Process $process
113
     *
114
     * @throws \Exception
115
     */
116
    private function handleProcessStop(Process $process) {
117
        if (!posix_kill($process->getPid(), SIGKILL)) {
118
            throw new \Exception('Failed to kill ' . $process->getPid() . ': ' . posix_strerror(posix_get_last_error()));
119
        }
120
    }
121
}
122