Passed
Push — master ( 7b7005...55ec05 )
by Brent
02:29
created

Manager   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 118
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 2

Importance

Changes 0
Metric Value
dl 0
loc 118
rs 10
c 0
b 0
f 0
wmc 15
lcom 1
cbo 2

5 Methods

Rating   Name   Duplication   Size   Complexity  
A async() 0 23 2
A wait() 0 17 3
B handleProcessStatus() 0 19 5
A handleProcessSuccess() 0 12 3
A handleProcessStop() 0 5 2
1
<?php
2
3
namespace Pageon\Pcntl;
4
5
/**
6
 * This Manager is used to create and wait for forked processes which can be executed in parallel.
7
 * It's a wrapper around `ext-pcntl`.
8
 *
9
 * Class Manager
10
 * @package Pageon\Pcntl
11
 */
12
class Manager
13
{
14
    /**
15
     * Create an asynchronous process.
16
     *
17
     * @param Process $process The process to run asynchronous.
18
     *
19
     * @return Process The asynchronous process.
20
     */
21
    public function async(Process $process) : Process {
22
        $sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
23
24
        list($parentSocket, $childSocket) = $sockets;
25
26
        if (($pid = pcntl_fork()) == 0) {
27
            fclose($childSocket);
28
            $output = serialize($process->execute());
29
            fwrite($parentSocket, $output);
30
            fclose($parentSocket);
31
32
            exit;
33
        }
34
35
        fclose($parentSocket);
36
37
        $process
38
            ->setStartTime(time())
39
            ->setPid($pid)
40
            ->setSocket($childSocket);
41
42
        return $process;
43
    }
44
45
    /**
46
     * Wait for a collection of processes to finish.
47
     *
48
     * @param ProcessCollection $processCollection
49
     *
50
     * @return array
51
     * @throws \Exception
52
     */
53
    public function wait(ProcessCollection $processCollection) {
54
        $output = [];
55
        $processes = $processCollection->toArray();
56
57
        do {
58
            usleep(100000);
59
60
            /** @var Process $process */
61
            foreach ($processes as $key => $process) {
62
                $processStatus = pcntl_waitpid($process->getPid(), $processStatus, WNOHANG | WUNTRACED);
63
64
                $this->handleProcessStatus($processStatus, $process, $processes, $key);
65
            }
66
        } while (count($processes));
67
68
        return $output;
69
    }
70
71
    /**
72
     * @param         $processStatus
73
     * @param Process $process
74
     * @param array   $processes
75
     * @param         $key
76
     *
77
     * @throws \Exception
78
     */
79
    private function handleProcessStatus($processStatus, Process $process, array &$processes, $key) {
80
        switch ($processStatus) {
81
            case $process->getPid():
82
                $this->handleProcessSuccess($process);
83
                unset($processes[$key]);
84
85
                break;
86
            case 0:
87
                if ($process->getStartTime() + $process->getMaxRunTime() < time() || pcntl_wifstopped($processStatus)) {
88
                    $this->handleProcessStop($process);
89
90
                    unset($processes[$key]);
91
                }
92
93
                break;
94
            default:
95
                throw new \Exception("Could not reliably manage {$process->getPid()}");
96
        }
97
    }
98
99
    /**
100
     * Handle a successful process.
101
     *
102
     * @param Process $process
103
     */
104
    private function handleProcessSuccess(Process $process) {
105
        $output = '';
106
        while (!feof($process->getSocket())) {
107
            $output .= fgets($process->getSocket());
108
        }
109
110
        fclose($process->getSocket());
111
112
        if ($success = $process->getSuccess()) {
113
            call_user_func_array($success, [unserialize($output), $process]);
114
        }
115
    }
116
117
    /**
118
     * Handle a stopped process.
119
     *
120
     * @param Process $process
121
     *
122
     * @throws \Exception
123
     */
124
    private function handleProcessStop(Process $process) {
125
        if (!posix_kill($process->getPid(), SIGKILL)) {
126
            throw new \Exception('Failed to kill ' . $process->getPid() . ': ' . posix_strerror(posix_get_last_error()));
127
        }
128
    }
129
}
130