Completed
Push — master ( 8c06ff...a2f0f3 )
by Julian
02:34
created

Worker::updateStateFromAvailableOutput()   B

Complexity

Conditions 6
Paths 11

Size

Total Lines 26
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 6.0073

Importance

Changes 0
Metric Value
dl 0
loc 26
ccs 16
cts 17
cp 0.9412
rs 8.439
c 0
b 0
f 0
cc 6
eloc 16
nc 11
nop 0
crap 6.0073
1
<?php
2
3
declare(strict_types=1);
4
5
namespace ParaTest\Runners\PHPUnit;
6
7
use Exception;
8
9
class Worker
10
{
11
    private static $descriptorspec = [
12
       0 => ['pipe', 'r'],
13
       1 => ['pipe', 'w'],
14
       2 => ['pipe', 'w'],
15
    ];
16
    private $proc;
17
    private $pipes;
18
    private $inExecution = 0;
19
    private $isRunning = false;
20
    private $exitCode = null;
21
    private $commands = [];
22
    private $chunks = '';
23
    private $alreadyReadOutput = '';
24
    /**
25
     * @var ExecutableTest
26
     */
27
    private $currentlyExecuting;
28
29 5
    public function start(string $wrapperBinary, $token = 1, $uniqueToken = null)
30
    {
31 5
        $bin = 'PARATEST=1 ';
32 5
        if (is_numeric($token)) {
33 5
            $bin .= "TEST_TOKEN=$token ";
34
        }
35 5
        if ($uniqueToken) {
36
            $bin .= "UNIQUE_TEST_TOKEN=$uniqueToken ";
37
        }
38 5
        $bin .= PHP_BINARY . " \"$wrapperBinary\"";
39 5
        $pipes = [];
40 5
        $this->proc = proc_open($bin, self::$descriptorspec, $pipes);
41 5
        $this->pipes = $pipes;
42 5
        $this->isRunning = true;
43 5
    }
44
45
    public function stdout()
46
    {
47
        return $this->pipes[1];
48
    }
49
50 4
    public function execute(string $testCmd)
51
    {
52 4
        $this->checkStarted();
53 4
        $this->commands[] = $testCmd;
54 4
        fwrite($this->pipes[0], $testCmd . "\n");
55 4
        ++$this->inExecution;
56 4
    }
57
58
    public function assign(ExecutableTest $test, string $phpunit, array $phpunitOptions)
59
    {
60
        if ($this->currentlyExecuting !== null) {
61
            throw new Exception('Worker already has a test assigned - did you forget to call reset()?');
62
        }
63
        $this->currentlyExecuting = $test;
64
        $this->execute($test->command($phpunit, $phpunitOptions));
65
    }
66
67
    public function printFeedback(ResultPrinter $printer)
68
    {
69
        if ($this->currentlyExecuting !== null) {
70
            $printer->printFeedback($this->currentlyExecuting);
71
        }
72
    }
73
74
    public function reset()
75
    {
76
        $this->currentlyExecuting = null;
77
    }
78
79 6
    public function isStarted(): bool
80
    {
81 6
        return $this->proc !== null && $this->pipes !== null;
82
    }
83
84 4
    private function checkStarted()
85
    {
86 4
        if (!$this->isStarted()) {
87
            throw new \RuntimeException('You have to start the Worker first!');
88
        }
89 4
    }
90
91 3
    public function stop()
92
    {
93 3
        fwrite($this->pipes[0], "EXIT\n");
94 3
        fclose($this->pipes[0]);
95 3
    }
96
97
    /**
98
     * This is an utility function for tests.
99
     * Refactor or write it only in the test case.
100
     */
101 2
    public function waitForFinishedJob()
102
    {
103 2
        if ($this->inExecution === 0) {
104
            return;
105
        }
106 2
        $tellsUsItHasFinished = false;
107 2
        stream_set_blocking($this->pipes[1], true);
108 2
        while ($line = fgets($this->pipes[1])) {
109 2
            if (strstr($line, "FINISHED\n")) {
110 2
                $tellsUsItHasFinished = true;
111 2
                --$this->inExecution;
112 2
                break;
113
            }
114
        }
115 2
        if (!$tellsUsItHasFinished) {
116
            throw new \RuntimeException('The Worker terminated without finishing the job.');
117
        }
118 2
    }
119
120 1
    public function isFree(): bool
121
    {
122 1
        $this->checkNotCrashed();
123 1
        $this->updateStateFromAvailableOutput();
124
125 1
        return $this->inExecution === 0;
126
    }
127
128
    /**
129
     * @deprecated
130
     * This function consumes a lot of CPU while waiting for
131
     * the worker to finish. Use it only in testing paratest
132
     * itself.
133
     */
134 3
    public function waitForStop()
135
    {
136 3
        $status = proc_get_status($this->proc);
137 3
        while ($status['running']) {
138 3
            $status = proc_get_status($this->proc);
139 3
            $this->setExitCode($status);
140
        }
141 3
    }
142
143
    public function getCoverageFileName()
144
    {
145
        if ($this->currentlyExecuting !== null) {
146
            return $this->currentlyExecuting->getCoverageFileName();
147
        }
148
    }
149
150 4
    private function setExitCode(array $status)
151
    {
152 4
        if (!$status['running']) {
153 3
            if ($this->exitCode === null) {
154 3
                $this->exitCode = $status['exitcode'];
155
            }
156
        }
157 4
    }
158
159 1
    public function isRunning(): bool
160
    {
161 1
        $this->checkNotCrashed();
162 1
        $this->updateStateFromAvailableOutput();
163
164 1
        return $this->isRunning;
165
    }
166
167 3
    public function isCrashed(): bool
168
    {
169 3
        if (!$this->isStarted()) {
170 1
            return false;
171
        }
172 3
        $status = proc_get_status($this->proc);
173
174 3
        $this->updateStateFromAvailableOutput();
175 3
        if (!$this->isRunning) {
176 2
            return false;
177
        }
178
179 2
        $this->setExitCode($status);
180 2
        if ($this->exitCode === null) {
181 2
            return false;
182
        }
183
184
        return $this->exitCode !== 0;
185
    }
186
187 2
    private function checkNotCrashed()
188
    {
189 2
        if ($this->isCrashed()) {
190
            throw new \RuntimeException(
191
                'This worker has crashed. Last executed command: ' . end($this->commands) . PHP_EOL
192
                . 'Output:' . PHP_EOL
193
                . '----------------------' . PHP_EOL
194
                . $this->alreadyReadOutput . PHP_EOL
195
                . '----------------------' . PHP_EOL
196
                . $this->readAllStderr()
197
            );
198
        }
199 2
    }
200
201
    private function readAllStderr()
202
    {
203
        return stream_get_contents($this->pipes[2]);
204
    }
205
206
    /**
207
     * Have to read even incomplete lines to play nice with stream_select()
208
     * Otherwise it would continue to non-block because there are bytes to be read,
209
     * but fgets() won't pick them up.
210
     */
211 3
    private function updateStateFromAvailableOutput()
212
    {
213 3
        if (isset($this->pipes[1])) {
214 2
            stream_set_blocking($this->pipes[1], false);
215 2
            while ($chunk = fread($this->pipes[1], 4096)) {
216 1
                $this->chunks .= $chunk;
217 1
                $this->alreadyReadOutput .= $chunk;
218
            }
219 2
            $lines = explode("\n", $this->chunks);
220
            // last element is not a complete line,
221
            // becomes part of a line completed later
222 2
            $this->chunks = $lines[count($lines) - 1];
223 2
            unset($lines[count($lines) - 1]);
224
            // delivering complete lines to this Worker
225 2
            foreach ($lines as $line) {
226 1
                $line .= "\n";
227 1
                if (strstr($line, "FINISHED\n")) {
228
                    --$this->inExecution;
229
                }
230 1
                if (strstr($line, "EXITED\n")) {
231 1
                    $this->isRunning = false;
232
                }
233
            }
234 2
            stream_set_blocking($this->pipes[1], true);
235
        }
236 3
    }
237
}
238