Complex classes like StreamIO often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use StreamIO, and based on these observations, apply Extract Interface, too.
1 | <?php |
||
10 | class StreamIO extends AbstractIO |
||
11 | { |
||
12 | /** @var string */ |
||
13 | protected $protocol; |
||
14 | |||
15 | /** @var string */ |
||
16 | protected $host; |
||
17 | |||
18 | /** @var int */ |
||
19 | protected $port; |
||
20 | |||
21 | /** @var int */ |
||
22 | protected $connection_timeout; |
||
23 | |||
24 | /** @var int */ |
||
25 | protected $read_write_timeout; |
||
26 | |||
27 | /** @var resource */ |
||
28 | protected $context; |
||
29 | |||
30 | /** @var bool */ |
||
31 | protected $keepalive; |
||
32 | |||
33 | /** @var int */ |
||
34 | protected $heartbeat; |
||
35 | |||
36 | /** @var float */ |
||
37 | protected $last_read; |
||
38 | |||
39 | /** @var float */ |
||
40 | protected $last_write; |
||
41 | |||
42 | /** @var array */ |
||
43 | protected $last_error; |
||
44 | |||
45 | /** @var resource */ |
||
46 | private $sock; |
||
47 | |||
48 | /** @var bool */ |
||
49 | private $canSelectNull; |
||
50 | |||
51 | /** @var bool */ |
||
52 | private $canDispatchPcntlSignal; |
||
53 | |||
54 | /** |
||
55 | * @param string $host |
||
56 | * @param int $port |
||
57 | * @param int $connection_timeout |
||
58 | * @param int $read_write_timeout |
||
59 | * @param null $context |
||
60 | * @param bool $keepalive |
||
61 | * @param int $heartbeat |
||
62 | */ |
||
63 | 48 | public function __construct( |
|
93 | |||
94 | /** |
||
95 | * @return bool |
||
96 | */ |
||
97 | 48 | private function isPcntlSignalEnabled() |
|
103 | |||
104 | /** |
||
105 | * Sets up the stream connection |
||
106 | * |
||
107 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
108 | * @throws \Exception |
||
109 | */ |
||
110 | 48 | public function connect() |
|
111 | { |
||
112 | 48 | $errstr = $errno = null; |
|
113 | |||
114 | 48 | $remote = sprintf( |
|
115 | 48 | '%s://%s:%s', |
|
116 | 48 | $this->protocol, |
|
117 | 48 | $this->host, |
|
118 | 48 | $this->port |
|
119 | 32 | ); |
|
120 | |||
121 | 48 | set_error_handler(array($this, 'error_handler')); |
|
122 | |||
123 | 48 | $this->sock = stream_socket_client( |
|
124 | 32 | $remote, |
|
125 | 32 | $errno, |
|
126 | 32 | $errstr, |
|
127 | 48 | $this->connection_timeout, |
|
128 | 48 | STREAM_CLIENT_CONNECT, |
|
129 | 48 | $this->context |
|
130 | 32 | ); |
|
131 | |||
132 | 48 | restore_error_handler(); |
|
133 | |||
134 | 48 | if (false === $this->sock) { |
|
135 | throw new AMQPRuntimeException( |
||
136 | sprintf( |
||
137 | 'Error Connecting to server(%s): %s ', |
||
138 | $errno, |
||
139 | $errstr |
||
140 | ), |
||
141 | $errno |
||
142 | ); |
||
143 | } |
||
144 | |||
145 | 48 | if (false === stream_socket_get_name($this->sock, true)) { |
|
146 | throw new AMQPRuntimeException( |
||
147 | sprintf( |
||
148 | 'Connection refused: %s ', |
||
149 | $remote |
||
150 | ) |
||
151 | ); |
||
152 | } |
||
153 | |||
154 | 48 | list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_write_timeout); |
|
155 | 48 | if (!stream_set_timeout($this->sock, $sec, $uSec)) { |
|
156 | throw new AMQPIOException('Timeout could not be set'); |
||
157 | } |
||
158 | |||
159 | // php cannot capture signals while streams are blocking |
||
160 | 48 | if ($this->canDispatchPcntlSignal) { |
|
161 | stream_set_blocking($this->sock, 0); |
||
162 | stream_set_write_buffer($this->sock, 0); |
||
163 | if (function_exists('stream_set_read_buffer')) { |
||
164 | stream_set_read_buffer($this->sock, 0); |
||
165 | } |
||
166 | } else { |
||
167 | 48 | stream_set_blocking($this->sock, 1); |
|
168 | } |
||
169 | |||
170 | 48 | if ($this->keepalive) { |
|
171 | $this->enable_keepalive(); |
||
172 | } |
||
173 | 48 | } |
|
174 | |||
175 | /** |
||
176 | * Reconnects the socket |
||
177 | */ |
||
178 | 12 | public function reconnect() |
|
183 | |||
184 | /** |
||
185 | * @param $len |
||
186 | * @throws \PhpAmqpLib\Exception\AMQPIOException |
||
187 | * @return mixed|string |
||
188 | */ |
||
189 | 48 | public function read($len) |
|
190 | { |
||
191 | 48 | $read = 0; |
|
192 | 48 | $data = ''; |
|
193 | |||
194 | 48 | while ($read < $len) { |
|
195 | 48 | $this->check_heartbeat(); |
|
196 | |||
197 | 48 | if (!is_resource($this->sock) || feof($this->sock)) { |
|
198 | throw new AMQPRuntimeException('Broken pipe or closed connection'); |
||
199 | } |
||
200 | |||
201 | 48 | set_error_handler(array($this, 'error_handler')); |
|
202 | 48 | $buffer = fread($this->sock, ($len - $read)); |
|
203 | 48 | restore_error_handler(); |
|
204 | |||
205 | 48 | if ($buffer === false) { |
|
206 | throw new AMQPRuntimeException('Error receiving data'); |
||
207 | } |
||
208 | |||
209 | 48 | if ($buffer === '') { |
|
210 | if ($this->canDispatchPcntlSignal) { |
||
211 | // prevent cpu from being consumed while waiting |
||
212 | if ($this->canSelectNull) { |
||
213 | $this->select(null, null); |
||
214 | pcntl_signal_dispatch(); |
||
215 | } else { |
||
216 | usleep(100000); |
||
217 | pcntl_signal_dispatch(); |
||
218 | } |
||
219 | } |
||
220 | continue; |
||
221 | } |
||
222 | |||
223 | 48 | $read += mb_strlen($buffer, 'ASCII'); |
|
224 | 48 | $data .= $buffer; |
|
225 | 32 | } |
|
226 | |||
227 | 48 | if (mb_strlen($data, 'ASCII') !== $len) { |
|
228 | throw new AMQPRuntimeException( |
||
229 | sprintf( |
||
230 | 'Error reading data. Received %s instead of expected %s bytes', |
||
231 | mb_strlen($data, 'ASCII'), |
||
232 | $len |
||
233 | ) |
||
234 | ); |
||
235 | } |
||
236 | |||
237 | 48 | $this->last_read = microtime(true); |
|
238 | 48 | return $data; |
|
239 | } |
||
240 | |||
241 | /** |
||
242 | * @param $data |
||
243 | * @return mixed|void |
||
244 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
245 | * @throws \PhpAmqpLib\Exception\AMQPTimeoutException |
||
246 | */ |
||
247 | 48 | public function write($data) |
|
248 | { |
||
249 | 48 | $written = 0; |
|
250 | 48 | $len = mb_strlen($data, 'ASCII'); |
|
251 | |||
252 | 48 | while ($written < $len) { |
|
253 | |||
254 | 48 | if (!is_resource($this->sock)) { |
|
255 | throw new AMQPRuntimeException('Broken pipe or closed connection'); |
||
256 | } |
||
257 | |||
258 | 48 | set_error_handler(array($this, 'error_handler')); |
|
259 | // OpenSSL's C library function SSL_write() can balk on buffers > 8192 |
||
260 | // bytes in length, so we're limiting the write size here. On both TLS |
||
261 | // and plaintext connections, the write loop will continue until the |
||
262 | // buffer has been fully written. |
||
263 | // This behavior has been observed in OpenSSL dating back to at least |
||
264 | // September 2002: |
||
265 | // http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361 |
||
266 | 48 | $buffer = fwrite($this->sock, $data, 8192); |
|
267 | 48 | restore_error_handler(); |
|
268 | |||
269 | 48 | if ($buffer === false) { |
|
270 | throw new AMQPRuntimeException('Error sending data'); |
||
271 | } |
||
272 | |||
273 | 48 | if ($buffer === 0 && feof($this->sock)) { |
|
274 | throw new AMQPRuntimeException('Broken pipe or closed connection'); |
||
275 | } |
||
276 | |||
277 | 48 | if ($this->timed_out()) { |
|
278 | throw new AMQPTimeoutException('Error sending data. Socket connection timed out'); |
||
279 | } |
||
280 | |||
281 | 48 | $written += $buffer; |
|
282 | |||
283 | 48 | if ($buffer > 0) { |
|
284 | 48 | $data = mb_substr($data, $buffer, mb_strlen($data, 'ASCII') - $buffer, 'ASCII'); |
|
285 | 32 | } |
|
286 | 32 | } |
|
287 | |||
288 | 48 | $this->last_write = microtime(true); |
|
289 | 48 | } |
|
290 | |||
291 | /** |
||
292 | * Internal error handler to deal with stream and socket errors that need to be ignored |
||
293 | * |
||
294 | * @param int $errno |
||
295 | * @param string $errstr |
||
296 | * @param string $errfile |
||
297 | * @param int $errline |
||
298 | * @param array $errcontext |
||
299 | * @return null |
||
300 | * @throws \ErrorException |
||
301 | */ |
||
302 | public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null) |
||
323 | |||
324 | /** |
||
325 | * Heartbeat logic: check connection health here |
||
326 | */ |
||
327 | 48 | protected function check_heartbeat() |
|
346 | |||
347 | /** |
||
348 | * Sends a heartbeat message |
||
349 | */ |
||
350 | protected function write_heartbeat() |
||
359 | |||
360 | 48 | public function close() |
|
361 | { |
||
362 | 48 | if (is_resource($this->sock)) { |
|
363 | 48 | fclose($this->sock); |
|
364 | 32 | } |
|
365 | 48 | $this->sock = null; |
|
366 | 48 | } |
|
367 | |||
368 | /** |
||
369 | * @return resource |
||
370 | */ |
||
371 | public function get_socket() |
||
372 | { |
||
373 | return $this->sock; |
||
374 | } |
||
375 | |||
376 | /** |
||
377 | * @return resource |
||
378 | */ |
||
379 | public function getSocket() |
||
383 | |||
384 | /** |
||
385 | * @param $sec |
||
386 | * @param $usec |
||
387 | * @return int|mixed |
||
388 | */ |
||
389 | public function select($sec, $usec) |
||
390 | { |
||
391 | $read = array($this->sock); |
||
392 | $write = null; |
||
393 | $except = null; |
||
394 | $result = false; |
||
|
|||
395 | |||
396 | set_error_handler(array($this, 'error_handler')); |
||
397 | $result = stream_select($read, $write, $except, $sec, $usec); |
||
398 | restore_error_handler(); |
||
399 | |||
400 | return $result; |
||
401 | } |
||
402 | |||
403 | /** |
||
404 | * @return mixed |
||
405 | */ |
||
406 | 48 | protected function timed_out() |
|
407 | { |
||
408 | // get status of socket to determine whether or not it has timed out |
||
409 | 48 | $info = stream_get_meta_data($this->sock); |
|
410 | |||
411 | 48 | return $info['timed_out']; |
|
412 | } |
||
413 | |||
414 | /** |
||
415 | * @throws \PhpAmqpLib\Exception\AMQPIOException |
||
416 | */ |
||
417 | protected function enable_keepalive() |
||
430 | |||
431 | 4 | /** |
|
432 | * @return $this |
||
433 | */ |
||
434 | public function disableHeartbeat() |
||
435 | { |
||
436 | $this->heartbeat = 0; |
||
437 | |||
440 | } |
||
441 |
This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.
Both the
$myVar
assignment in line 1 and the$higher
assignment in line 2 are dead. The first because$myVar
is never used and the second because$higher
is always overwritten for every possible time line.