Complex classes like StreamIO often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use StreamIO, and based on these observations, apply Extract Interface, too.
1 | <?php |
||
10 | class StreamIO extends AbstractIO |
||
11 | { |
||
12 | /** @var string */ |
||
13 | protected $protocol; |
||
14 | |||
15 | /** @var string */ |
||
16 | protected $host; |
||
17 | |||
18 | /** @var int */ |
||
19 | protected $port; |
||
20 | |||
21 | /** @var int */ |
||
22 | protected $connection_timeout; |
||
23 | |||
24 | /** @var int */ |
||
25 | protected $read_write_timeout; |
||
26 | |||
27 | /** @var resource */ |
||
28 | protected $context; |
||
29 | |||
30 | /** @var bool */ |
||
31 | protected $keepalive; |
||
32 | |||
33 | /** @var int */ |
||
34 | protected $heartbeat; |
||
35 | |||
36 | /** @var float */ |
||
37 | protected $last_read; |
||
38 | |||
39 | /** @var float */ |
||
40 | protected $last_write; |
||
41 | |||
42 | /** @var array */ |
||
43 | protected $last_error; |
||
44 | |||
45 | /** @var resource */ |
||
46 | private $sock; |
||
47 | |||
48 | /** @var bool */ |
||
49 | private $canSelectNull; |
||
50 | |||
51 | /** @var bool */ |
||
52 | private $canDispatchPcntlSignal; |
||
53 | |||
54 | /** |
||
55 | * @param string $host |
||
56 | * @param int $port |
||
57 | * @param int $connection_timeout |
||
58 | * @param int $read_write_timeout |
||
59 | * @param null $context |
||
60 | * @param bool $keepalive |
||
61 | * @param int $heartbeat |
||
62 | */ |
||
63 | 40 | public function __construct( |
|
64 | $host, |
||
65 | $port, |
||
66 | $connection_timeout, |
||
67 | $read_write_timeout, |
||
68 | $context = null, |
||
69 | $keepalive = false, |
||
70 | $heartbeat = 0 |
||
71 | ) { |
||
72 | 40 | $this->protocol = 'tcp'; |
|
73 | 40 | $this->host = $host; |
|
74 | 40 | $this->port = $port; |
|
75 | 40 | $this->connection_timeout = $connection_timeout; |
|
76 | 40 | $this->read_write_timeout = $read_write_timeout; |
|
77 | 40 | $this->context = $context; |
|
78 | 40 | $this->keepalive = $keepalive; |
|
79 | 40 | $this->heartbeat = $heartbeat; |
|
80 | 40 | $this->canSelectNull = true; |
|
81 | 40 | $this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled(); |
|
82 | |||
83 | 40 | if (is_null($this->context)) { |
|
84 | 40 | $this->context = stream_context_create(); |
|
85 | 32 | } else { |
|
86 | $this->protocol = 'ssl'; |
||
87 | // php bugs 41631 & 65137 prevent select null from working on ssl streams |
||
88 | if (PHP_VERSION_ID < 50436) { |
||
89 | $this->canSelectNull = false; |
||
90 | } |
||
91 | } |
||
92 | 40 | } |
|
93 | |||
94 | /** |
||
95 | * @return bool |
||
96 | */ |
||
97 | 40 | private function isPcntlSignalEnabled() |
|
103 | |||
104 | /** |
||
105 | * Sets up the stream connection |
||
106 | * |
||
107 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
108 | * @throws \Exception |
||
109 | */ |
||
110 | 40 | public function connect() |
|
111 | { |
||
112 | 40 | $errstr = $errno = null; |
|
113 | |||
114 | 40 | $remote = sprintf( |
|
115 | 40 | '%s://%s:%s', |
|
116 | 40 | $this->protocol, |
|
117 | 40 | $this->host, |
|
118 | 40 | $this->port |
|
119 | 32 | ); |
|
120 | |||
121 | 40 | set_error_handler(array($this, 'error_handler')); |
|
122 | |||
123 | 40 | $this->sock = stream_socket_client( |
|
124 | 32 | $remote, |
|
125 | 32 | $errno, |
|
126 | 32 | $errstr, |
|
127 | 40 | $this->connection_timeout, |
|
128 | 40 | STREAM_CLIENT_CONNECT, |
|
129 | 40 | $this->context |
|
130 | 32 | ); |
|
131 | |||
132 | 40 | restore_error_handler(); |
|
133 | |||
134 | 40 | if (false === $this->sock) { |
|
135 | throw new AMQPRuntimeException( |
||
136 | sprintf( |
||
137 | 'Error Connecting to server(%s): %s ', |
||
138 | $errno, |
||
139 | $errstr |
||
140 | ), |
||
141 | $errno |
||
142 | ); |
||
143 | } |
||
144 | |||
145 | 40 | if (false === stream_socket_get_name($this->sock, true)) { |
|
146 | throw new AMQPRuntimeException( |
||
147 | sprintf( |
||
148 | 'Connection refused: %s ', |
||
149 | $remote |
||
150 | ) |
||
151 | ); |
||
152 | } |
||
153 | |||
154 | 40 | list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_write_timeout); |
|
155 | 40 | if (!stream_set_timeout($this->sock, $sec, $uSec)) { |
|
156 | throw new AMQPIOException('Timeout could not be set'); |
||
157 | } |
||
158 | |||
159 | // php cannot capture signals while streams are blocking |
||
160 | 40 | if ($this->canDispatchPcntlSignal) { |
|
161 | stream_set_blocking($this->sock, 0); |
||
162 | stream_set_write_buffer($this->sock, 0); |
||
163 | if (function_exists('stream_set_read_buffer')) { |
||
164 | stream_set_read_buffer($this->sock, 0); |
||
165 | } |
||
166 | } else { |
||
167 | 40 | stream_set_blocking($this->sock, 1); |
|
168 | } |
||
169 | |||
170 | 40 | if ($this->keepalive) { |
|
171 | $this->enable_keepalive(); |
||
172 | } |
||
173 | 40 | } |
|
174 | |||
175 | /** |
||
176 | * Reconnects the socket |
||
177 | */ |
||
178 | 10 | public function reconnect() |
|
179 | { |
||
180 | 10 | $this->close(); |
|
181 | 10 | $this->connect(); |
|
182 | 10 | } |
|
183 | |||
184 | /** |
||
185 | * @param $len |
||
186 | * @throws \PhpAmqpLib\Exception\AMQPIOException |
||
187 | * @return mixed|string |
||
188 | */ |
||
189 | 40 | public function read($len) |
|
190 | { |
||
191 | 40 | $read = 0; |
|
192 | 40 | $data = ''; |
|
193 | |||
194 | 40 | while ($read < $len) { |
|
195 | 40 | $this->check_heartbeat(); |
|
196 | |||
197 | 40 | if (!is_resource($this->sock) || feof($this->sock)) { |
|
198 | throw new AMQPRuntimeException('Broken pipe or closed connection'); |
||
199 | } |
||
200 | |||
201 | 40 | set_error_handler(array($this, 'error_handler')); |
|
202 | 40 | $buffer = fread($this->sock, ($len - $read)); |
|
203 | 40 | restore_error_handler(); |
|
204 | |||
205 | 40 | if ($buffer === false) { |
|
206 | throw new AMQPRuntimeException('Error receiving data'); |
||
207 | } |
||
208 | |||
209 | 40 | if ($buffer === '') { |
|
210 | if ($this->canDispatchPcntlSignal) { |
||
211 | // prevent cpu from being consumed while waiting |
||
212 | if ($this->canSelectNull) { |
||
213 | $this->select(null, null); |
||
214 | pcntl_signal_dispatch(); |
||
215 | } else { |
||
216 | usleep(100000); |
||
217 | pcntl_signal_dispatch(); |
||
218 | } |
||
219 | } |
||
220 | continue; |
||
221 | } |
||
222 | |||
223 | 40 | $read += mb_strlen($buffer, 'ASCII'); |
|
224 | 40 | $data .= $buffer; |
|
225 | 32 | } |
|
226 | |||
227 | 40 | if (mb_strlen($data, 'ASCII') !== $len) { |
|
228 | throw new AMQPRuntimeException( |
||
229 | sprintf( |
||
230 | 'Error reading data. Received %s instead of expected %s bytes', |
||
231 | mb_strlen($data, 'ASCII'), |
||
232 | $len |
||
233 | ) |
||
234 | ); |
||
235 | } |
||
236 | |||
237 | 40 | $this->last_read = microtime(true); |
|
238 | 40 | return $data; |
|
239 | } |
||
240 | |||
241 | /** |
||
242 | * @param $data |
||
243 | * @return mixed|void |
||
244 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
245 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException |
||
246 | */ |
||
247 | 40 | public function write($data) |
|
248 | { |
||
249 | 40 | $written = 0; |
|
250 | 40 | $len = mb_strlen($data, 'ASCII'); |
|
251 | |||
252 | 40 | while ($written < $len) { |
|
253 | |||
254 | 40 | if (!is_resource($this->sock)) { |
|
255 | throw new AMQPRuntimeException('Broken pipe or closed connection'); |
||
256 | } |
||
257 | |||
258 | 40 | set_error_handler(array($this, 'error_handler')); |
|
259 | // OpenSSL's C library function SSL_write() can balk on buffers > 8192 |
||
260 | // bytes in length, so we're limiting the write size here. On both TLS |
||
261 | // and plaintext connections, the write loop will continue until the |
||
262 | // buffer has been fully written. |
||
263 | // This behavior has been observed in OpenSSL dating back to at least |
||
264 | // September 2002: |
||
265 | // http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361 |
||
266 | 40 | $buffer = fwrite($this->sock, $data, 8192); |
|
267 | 40 | restore_error_handler(); |
|
268 | |||
269 | 40 | if ($buffer === false) { |
|
270 | throw new AMQPRuntimeException('Error sending data'); |
||
271 | } |
||
272 | |||
273 | 40 | if ($buffer === 0 && feof($this->sock)) { |
|
274 | throw new AMQPRuntimeException('Broken pipe or closed connection'); |
||
275 | } |
||
276 | |||
277 | 40 | if ($this->timed_out()) { |
|
278 | throw new AMQPTimeoutException('Error sending data. Socket connection timed out'); |
||
279 | } |
||
280 | |||
281 | 40 | $written += $buffer; |
|
282 | |||
283 | 40 | if ($buffer > 0) { |
|
284 | 40 | $data = mb_substr($data, $buffer, mb_strlen($data, 'ASCII') - $buffer, 'ASCII'); |
|
285 | 32 | } |
|
286 | 32 | } |
|
287 | |||
288 | 40 | $this->last_write = microtime(true); |
|
289 | 40 | } |
|
290 | |||
291 | /** |
||
292 | * Internal error handler to deal with stream and socket errors that need to be ignored |
||
293 | * |
||
294 | * @param int $errno |
||
295 | * @param string $errstr |
||
296 | * @param string $errfile |
||
297 | * @param int $errline |
||
298 | * @param array $errcontext |
||
299 | * @return null |
||
300 | * @throws \ErrorException |
||
301 | */ |
||
302 | public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null) |
||
303 | { |
||
304 | $this->last_error = compact('errno', 'errstr', 'errfile', 'errline', 'errcontext'); |
||
305 | |||
306 | // fwrite notice that the stream isn't ready |
||
307 | if (strstr($errstr, 'Resource temporarily unavailable')) { |
||
308 | // it's allowed to retry |
||
309 | return null; |
||
310 | } |
||
311 | |||
312 | // stream_select warning that it has been interrupted by a signal |
||
313 | if (strstr($errstr, 'Interrupted system call')) { |
||
314 | // it's allowed while processing signals |
||
315 | return null; |
||
316 | } |
||
317 | |||
318 | restore_error_handler(); |
||
319 | |||
320 | // raise all other issues to exceptions |
||
321 | throw new \ErrorException($errstr, 0, $errno, $errfile, $errline); |
||
322 | } |
||
323 | |||
324 | /** |
||
325 | * Heartbeat logic: check connection health here |
||
326 | */ |
||
327 | 40 | protected function check_heartbeat() |
|
328 | { |
||
329 | // ignore unless heartbeat interval is set |
||
330 | 40 | if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) { |
|
331 | $t = microtime(true); |
||
332 | $t_read = round($t - $this->last_read); |
||
333 | $t_write = round($t - $this->last_write); |
||
334 | |||
335 | // server has gone away |
||
336 | if (($this->heartbeat * 2) < $t_read) { |
||
337 | $this->reconnect(); |
||
338 | } |
||
339 | |||
340 | // time for client to send a heartbeat |
||
341 | if (($this->heartbeat / 2) < $t_write) { |
||
342 | $this->write_heartbeat(); |
||
343 | } |
||
344 | } |
||
345 | 40 | } |
|
346 | |||
347 | /** |
||
348 | * Sends a heartbeat message |
||
349 | */ |
||
350 | protected function write_heartbeat() |
||
351 | { |
||
352 | $pkt = new AMQPWriter(); |
||
353 | $pkt->write_octet(8); |
||
354 | $pkt->write_short(0); |
||
355 | $pkt->write_long(0); |
||
356 | $pkt->write_octet(0xCE); |
||
357 | $this->write($pkt->getvalue()); |
||
358 | } |
||
359 | |||
360 | 40 | public function close() |
|
361 | { |
||
362 | 40 | if (is_resource($this->sock)) { |
|
363 | 40 | fclose($this->sock); |
|
364 | 32 | } |
|
365 | 40 | $this->sock = null; |
|
366 | 40 | } |
|
367 | |||
368 | /** |
||
369 | * @return resource |
||
370 | */ |
||
371 | public function get_socket() |
||
375 | |||
376 | /** |
||
377 | * @return resource |
||
378 | */ |
||
379 | public function getSocket() |
||
383 | |||
384 | /** |
||
385 | * @param $sec |
||
386 | * @param $usec |
||
387 | * @return int|mixed |
||
388 | */ |
||
389 | public function select($sec, $usec) |
||
390 | { |
||
391 | $read = array($this->sock); |
||
392 | $write = null; |
||
393 | $except = null; |
||
394 | $result = false; |
||
|
|||
395 | |||
396 | set_error_handler(array($this, 'error_handler')); |
||
397 | $result = stream_select($read, $write, $except, $sec, $usec); |
||
398 | restore_error_handler(); |
||
399 | |||
400 | return $result; |
||
401 | } |
||
402 | |||
403 | /** |
||
404 | * @return mixed |
||
405 | */ |
||
406 | 40 | protected function timed_out() |
|
407 | { |
||
408 | // get status of socket to determine whether or not it has timed out |
||
409 | 40 | $info = stream_get_meta_data($this->sock); |
|
410 | |||
411 | 40 | return $info['timed_out']; |
|
412 | } |
||
413 | |||
414 | /** |
||
415 | * @throws \PhpAmqpLib\Exception\AMQPIOException |
||
416 | */ |
||
417 | protected function enable_keepalive() |
||
430 | } |
||
431 |
This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.
Both the
$myVar
assignment in line 1 and the$higher
assignment in line 2 are dead. The first because$myVar
is never used and the second because$higher
is always overwritten for every possible time line.