Passed
Push — develop ( c945b6...180be4 )
by Brent
03:06
created

Manager::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nc 1
nop 1
dl 0
loc 9
rs 9.6666
c 0
b 0
f 0
1
<?php
2
3
namespace Pageon\Pcntl;
4
5
use Symfony\Component\EventDispatcher\EventDispatcher;
6
7
/**
8
 * This Manager is used to create and wait for forked processes which can be executed in parallel.
9
 * It's a wrapper around `ext-pcntl`.
10
 *
11
 * Class Manager
12
 * @package Pageon\Pcntl
13
 */
14
class Manager
15
{
16
    /**
17
     * Create an asynchronous process.
18
     *
19
     * @param Process $process The process to run asynchronous.
20
     *
21
     * @return Process The asynchronous process.
22
     */
23
    public function async(Process $process) : Process {
24
        socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $sockets);
25
26
        list($parentSocket, $childSocket) = $sockets;
27
28
        if (($pid = pcntl_fork()) == 0) {
29
            socket_close($childSocket);
30
            $output = serialize($process->execute());
31
            socket_write($parentSocket, substr($output, 0, 4095));
32
            socket_close($parentSocket);
33
34
            exit;
35
        }
36
37
        socket_close($parentSocket);
38
39
        $process
40
            ->setStartTime(time())
41
            ->setPid($pid)
42
            ->setSocket($childSocket);
43
44
        return $process;
45
    }
46
47
    /**
48
     * Wait for a collection of processes to finish.
49
     *
50
     * @param ProcessCollection $processCollection
51
     *
52
     * @return array
53
     * @throws \Exception
54
     */
55
    public function wait(ProcessCollection $processCollection) {
56
        $output = [];
57
        $processes = $processCollection->toArray();
58
59
        do {
60
            usleep(100000);
61
62
            /** @var Process $process */
63
            foreach ($processes as $key => $process) {
64
                $processStatus = pcntl_waitpid($process->getPid(), $status, WNOHANG | WUNTRACED);
65
66
                switch ($processStatus) {
67
                    case $process->getPid():
68
                        $this->handleProcessSuccess($process);
69
                        unset($processes[$key]);
70
71
                        break;
72
                    case 0:
73
                        if ($process->getStartTime() + $process->getMaxRunTime() < time() || pcntl_wifstopped($status)) {
74
                            $this->handleProcessStop($process);
75
76
                            unset($processes[$key]);
77
                        }
78
79
                        break;
80
                    default:
81
                        throw new \Exception("Could not reliably manage {$process->getPid()}");
82
                }
83
            }
84
        } while (count($processes));
85
86
        return $output;
87
    }
88
89
    /**
90
     * Handle a successful process.
91
     *
92
     * @param Process $process
93
     */
94
    private function handleProcessSuccess(Process $process) {
95
        $output[] = unserialize(socket_read($process->getSocket(), 4096));
0 ignored issues
show
Coding Style Comprehensibility introduced by
$output was never initialized. Although not strictly required by PHP, it is generally a good practice to add $output = array(); before regardless.

Adding an explicit array definition is generally preferable to implicit array definition as it guarantees a stable state of the code.

Let’s take a look at an example:

foreach ($collection as $item) {
    $myArray['foo'] = $item->getFoo();

    if ($item->hasBar()) {
        $myArray['bar'] = $item->getBar();
    }

    // do something with $myArray
}

As you can see in this example, the array $myArray is initialized the first time when the foreach loop is entered. You can also see that the value of the bar key is only written conditionally; thus, its value might result from a previous iteration.

This might or might not be intended. To make your intention clear, your code more readible and to avoid accidental bugs, we recommend to add an explicit initialization $myArray = array() either outside or inside the foreach loop.

Loading history...
96
        socket_close($process->getSocket());
97
98
        $success = $process->getSuccess();
99
        if ($success) {
100
            call_user_func_array($success, [$process]);
101
        }
102
    }
103
104
    /**
105
     * Handle a stopped process.
106
     *
107
     * @param Process $process
108
     *
109
     * @throws \Exception
110
     */
111
    private function handleProcessStop(Process $process) {
112
        if (!posix_kill($process->getPid(), SIGKILL)) {
113
            throw new \Exception('Failed to kill ' . $process->getPid() . ': ' . posix_strerror(posix_get_last_error()));
114
        }
115
    }
116
}
117