Completed
Push — master ( 08d105...08de01 )
by
unknown
18s
created

SocketIO   A

Complexity

Total Complexity 39

Size/Duplication

Total Lines 276
Duplicated Lines 6.88 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 73.02%

Importance

Changes 0
Metric Value
wmc 39
lcom 1
cbo 6
dl 19
loc 276
ccs 111
cts 152
cp 0.7302
rs 9.28
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 22 2
A connect() 0 35 4
A getSocket() 0 4 1
B read() 9 53 7
C write() 0 60 15
A close() 10 10 2
A do_select() 0 8 1
A select_write() 0 7 1
A enable_keepalive() 0 8 3
A error_handler() 0 10 2
A set_error_handler() 0 5 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
5
use PhpAmqpLib\Exception\AMQPIOException;
6
use PhpAmqpLib\Exception\AMQPSocketException;
7
use PhpAmqpLib\Exception\AMQPTimeoutException;
8
use PhpAmqpLib\Helper\MiscHelper;
9
10
class SocketIO extends AbstractIO
11
{
12
    /** @var resource */
13
    private $sock;
14
15
    /**
16
     * @param string $host
17
     * @param int $port
18
     * @param int|float $read_timeout
19
     * @param bool $keepalive
20
     * @param int|float|null $write_timeout if null defaults to read timeout
21
     * @param int $heartbeat how often to send heartbeat. 0 means off
22
     */
23 114
    public function __construct($host, $port, $read_timeout = 130, $keepalive = false, $write_timeout = null, $heartbeat = 60)
24
    {
25 114
        $this->host = $host;
26 114
        $this->port = $port;
27 114
        $this->read_timeout = $read_timeout;
28 114
        $this->write_timeout = $write_timeout ?: $read_timeout;
29 114
        $this->heartbeat = $heartbeat;
30 114
        $this->initial_heartbeat = $heartbeat;
31 114
        $this->keepalive = $keepalive;
32 114
        $this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
33
34
        /*
35
            TODO FUTURE enable this check
36
            php-amqplib/php-amqplib#648, php-amqplib/php-amqplib#666
37
        if ($this->heartbeat !== 0 && ($this->read_timeout <= ($this->heartbeat * 2))) {
38
            throw new \InvalidArgumentException('read_timeout must be greater than 2x the heartbeat');
39
        }
40
        if ($this->heartbeat !== 0 && ($this->write_timeout <= ($this->heartbeat * 2))) {
41
            throw new \InvalidArgumentException('send_timeout must be greater than 2x the heartbeat');
42
        }
43
         */
44 114
    }
45
46
    /**
47
     * @inheritdoc
48
     */
49 114
    public function connect()
50
    {
51 114
        $this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
52
53 114
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->write_timeout);
54 114
        socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec));
55 114
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
56 114
        socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec));
57
58 114
        $this->set_error_handler();
59
        try {
60 114
            $connected = socket_connect($this->sock, $this->host, $this->port);
61 114
            $this->cleanup_error_handler();
62 60
        } catch (\ErrorException $e) {
63 6
            $connected = false;
64
        }
65 114
        if (!$connected) {
66 6
            $errno = socket_last_error($this->sock);
67 6
            $errstr = socket_strerror($errno);
68 6
            throw new AMQPIOException(sprintf(
69 6
                'Error Connecting to server (%s): %s',
70 5
                $errno,
71 2
                $errstr
72 5
            ), $errno);
73
        }
74
75 108
        socket_set_block($this->sock);
76 108
        socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1);
77
78 108
        if ($this->keepalive) {
79 6
            $this->enable_keepalive();
80 3
        }
81
82 108
        $this->heartbeat = $this->initial_heartbeat;
83 108
    }
84
85
    /**
86
     * @inheritdoc
87
     */
88
    public function getSocket()
89
    {
90
        return $this->sock;
91
    }
92
93
    /**
94
     * @inheritdoc
95
     */
96 108
    public function read($len)
97
    {
98 108
        if (is_null($this->sock)) {
99 6
            throw new AMQPSocketException(sprintf(
100 6
                'Socket was null! Last SocketError was: %s',
101 6
                socket_strerror(socket_last_error())
102 3
            ));
103
        }
104
105 102
        $this->check_heartbeat();
106
107 102
        list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
108 102
        $read_start = microtime(true);
109 102
        $read = 0;
110 102
        $data = '';
111 102
        while ($read < $len) {
112 102
            $buffer = null;
113 102
            $result = socket_recv($this->sock, $buffer, $len - $read, 0);
114 102
            if ($result === 0) {
115
                // From linux recv() manual:
116
                // When a stream socket peer has performed an orderly shutdown,
117
                // the return value will be 0 (the traditional "end-of-file" return).
118
                // http://php.net/manual/en/function.socket-recv.php#47182
119
                $this->close();
120
                throw new AMQPConnectionClosedException('Broken pipe or closed connection');
121
            }
122
123 102 View Code Duplication
            if (empty($buffer)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
124
                $read_now = microtime(true);
125
                $t_read = $read_now - $read_start;
126
                if ($t_read > $this->read_timeout) {
127
                    throw new AMQPTimeoutException('Too many read attempts detected in SocketIO');
128
                }
129
                $this->select($timeout_sec, $timeout_uSec);
130
                continue;
131
            }
132
133 102
            $read += mb_strlen($buffer, 'ASCII');
134 102
            $data .= $buffer;
135 51
        }
136
137 102
        if (mb_strlen($data, 'ASCII') != $len) {
138
            throw new AMQPIOException(sprintf(
139
                'Error reading data. Received %s instead of expected %s bytes',
140
                mb_strlen($data, 'ASCII'),
141
                $len
142
            ));
143
        }
144
145 102
        $this->last_read = microtime(true);
146
147 102
        return $data;
148
    }
149
150
    /**
151
     * @inheritdoc
152
     */
153 114
    public function write($data)
154
    {
155 114
        $written = 0;
156 114
        $len = mb_strlen($data, 'ASCII');
157 114
        $write_start = microtime(true);
158
159 114
        while ($written < $len) {
160
            // Null sockets are invalid, throw exception
161 114
            if (is_null($this->sock)) {
162 6
                throw new AMQPSocketException(sprintf(
163 6
                    'Socket was null! Last SocketError was: %s',
164 6
                    socket_strerror(socket_last_error())
165 3
                ));
166
            }
167
168 108
            $this->set_error_handler();
169
            try {
170 108
                $buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII');
171 108
                $result = socket_write($this->sock, $buffer, self::BUFFER_SIZE);
172 108
                $this->cleanup_error_handler();
173 54
            } catch (\ErrorException $e) {
174
                $code = socket_last_error($this->sock);
175
                switch ($code) {
176
                    case SOCKET_EPIPE:
177
                    case SOCKET_ENETDOWN:
178
                    case SOCKET_ENETUNREACH:
179
                    case SOCKET_ENETRESET:
180
                    case SOCKET_ECONNABORTED:
181
                    case SOCKET_ECONNRESET:
182
                    case SOCKET_ECONNREFUSED:
183
                    case SOCKET_ETIMEDOUT:
184
                        $this->close();
185
                        throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e);
186
                    default:
187
                        throw new AMQPIOException(sprintf(
188
                            'Error sending data. Last SocketError: %s',
189
                            socket_strerror($code)
190
                        ), $code, $e);
191
                }
192
            }
193
194 108
            if ($result === false) {
195
                throw new AMQPIOException(sprintf(
196
                    'Error sending data. Last SocketError: %s',
197
                    socket_strerror(socket_last_error($this->sock))
198
                ));
199
            }
200
201 108
            $now = microtime(true);
202 108
            if ($result > 0) {
203 108
                $this->last_write = $write_start = $now;
204 108
                $written += $result;
205 54
            } else {
206 6
                if (($now - $write_start) > $this->write_timeout) {
207 6
                    throw AMQPTimeoutException::writeTimeout($this->write_timeout);
208
                }
209 6
                $this->select_write();
210
            }
211 54
        }
212 108
    }
213
214
    /**
215
     * @inheritdoc
216
     */
217 96 View Code Duplication
    public function close()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
218
    {
219 96
        $this->disableHeartbeat();
220 96
        if (is_resource($this->sock)) {
221 96
            socket_close($this->sock);
222 48
        }
223 96
        $this->sock = null;
224 96
        $this->last_read = null;
225 96
        $this->last_write = null;
226 96
    }
227
228
    /**
229
     * @inheritdoc
230
     */
231 102
    protected function do_select($sec, $usec)
232
    {
233 102
        $read = array($this->sock);
234 102
        $write = null;
235 102
        $except = null;
236
237 102
        return socket_select($read, $write, $except, $sec, $usec);
238
    }
239
240
    /**
241
     * @return int|bool
242
     */
243
    protected function select_write()
244
    {
245
        $read = $except = null;
246
        $write = array($this->sock);
247
248
        return socket_select($read, $write, $except, 0, 100000);
249
    }
250
251
    /**
252
     * @throws \PhpAmqpLib\Exception\AMQPIOException
253
     */
254 6
    protected function enable_keepalive()
255
    {
256 6
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
257
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
258
        }
259
260 6
        socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
261 6
    }
262
263
    /**
264
     * @inheritdoc
265
     */
266 18
    public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
267
    {
268
        // socket_select warning that it has been interrupted by a signal - EINTR
269 18
        if (false !== strrpos($errstr, socket_strerror(SOCKET_EINTR))) {
270
            // it's allowed while processing signals
271 12
            return;
272
        }
273
274 6
        parent::error_handler($errno, $errstr, $errfile, $errline, $errcontext);
275 6
    }
276
277
    /**
278
     * @inheritdoc
279
     */
280 114
    protected function set_error_handler()
281
    {
282 114
        parent::set_error_handler();
283 114
        socket_clear_error($this->sock);
284 114
    }
285
}
286