SocketIO::write()   F
last analyzed

Complexity

Conditions 17
Paths 574

Size

Total Lines 68
Code Lines 51

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 24
CRAP Score 37.5961

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 17
eloc 51
c 1
b 0
f 0
nc 574
nop 1
dl 0
loc 68
ccs 24
cts 41
cp 0.5854
crap 37.5961
rs 1.6416

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace PhpAmqpLib\Wire\IO;
4
5
use PhpAmqpLib\Connection\AMQPConnectionConfig;
6
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
7
use PhpAmqpLib\Exception\AMQPIOException;
8
use PhpAmqpLib\Exception\AMQPSocketException;
9
use PhpAmqpLib\Exception\AMQPTimeoutException;
10
use PhpAmqpLib\Helper\MiscHelper;
11
use PhpAmqpLib\Helper\SocketConstants;
12
13
class SocketIO extends AbstractIO
14
{
15
    /** @var null|resource|\Socket */
16
    private $sock;
17
18
    /**
19
     * @param string $host
20
     * @param int $port
21
     * @param int|float $read_timeout
22
     * @param bool $keepalive
23
     * @param int|float|null $write_timeout if null defaults to read timeout
24
     * @param int $heartbeat how often to send heartbeat. 0 means off
25 23
     * @param null|AMQPConnectionConfig $config
26
     */
27
    public function __construct(
28
        $host,
29
        $port,
30
        $read_timeout = 3,
31
        $keepalive = false,
32
        $write_timeout = null,
33 23
        $heartbeat = 0,
34 23
        ?AMQPConnectionConfig $config = null
35 23
    ) {
36 23
        $this->config = $config;
37 23
        $this->host = str_replace(['[', ']'], '', $host);
38 23
        $this->port = $port;
39 23
        $this->read_timeout = (float)$read_timeout;
40 23
        $this->write_timeout = (float)($write_timeout ?: $read_timeout);
41
        $this->heartbeat = $heartbeat;
42
        $this->initial_heartbeat = $heartbeat;
43
        $this->keepalive = $keepalive;
44
        $this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
45
46
        /*
47
            TODO FUTURE enable this check
48
            php-amqplib/php-amqplib#648, php-amqplib/php-amqplib#666
49
        if ($this->heartbeat !== 0 && ($this->read_timeout <= ($this->heartbeat * 2))) {
50
            throw new \InvalidArgumentException('read_timeout must be greater than 2x the heartbeat');
51
        }
52
        if ($this->heartbeat !== 0 && ($this->write_timeout <= ($this->heartbeat * 2))) {
53
            throw new \InvalidArgumentException('send_timeout must be greater than 2x the heartbeat');
54
        }
55
         */
56
    }
57 22
58
    /**
59 22
     * @inheritdoc
60
     */
61 22
    public function connect()
62 22
    {
63 22
        $this->sock = socket_create(!$this->isIpv6() ? AF_INET : AF_INET6, SOCK_STREAM, SOL_TCP);
64 22
65
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->write_timeout);
66 22
        socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec));
67
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
68 22
        socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec));
69 22
70 1
        $this->setErrorHandler();
71 1
        try {
72 22
            $connected = socket_connect($this->sock, $this->host, $this->port);
73 22
            $this->throwOnError();
74
        } catch (\ErrorException $e) {
75 22
            $connected = false;
76 1
        } finally {
77 1
            $this->restoreErrorHandler();
78 1
        }
79
        if (!$connected) {
80
            $errno = socket_last_error($this->sock);
81
            $errstr = socket_strerror($errno);
82
            throw new AMQPIOException(sprintf(
83
                'Error Connecting to server (%s): %s',
84
                $errno,
85 21
                $errstr
86 21
            ), $errno);
87
        }
88 21
89 1
        socket_set_block($this->sock);
90
        socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1);
91
        if ($this->config && $this->config->getSendBufferSize() > 0) {
92 21
            socket_set_option($this->sock, SOL_SOCKET, SO_SNDBUF, $this->config->getSendBufferSize());
93
        }
94
95
        if ($this->keepalive) {
96
            $this->enable_keepalive();
97
        }
98
99
        $this->heartbeat = $this->initial_heartbeat;
100
    }
101
102
    /**
103
     * @deprecated
104
     * @return null|resource|\Socket
105
     */
106 21
    public function getSocket()
107
    {
108 21
        return $this->sock;
109 1
    }
110
111 1
    /**
112
     * @inheritdoc
113
     */
114
    public function read($len)
115 20
    {
116
        if (is_null($this->sock)) {
117 20
            throw new AMQPSocketException(sprintf(
118 20
                'Socket was null! Last SocketError was: %s',
119 20
                socket_strerror(socket_last_error())
120 20
            ));
121 20
        }
122 20
123 20
        $this->check_heartbeat();
124 20
125
        list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
126
        $read_start = microtime(true);
127
        $read = 0;
128
        $data = '';
129
        while ($read < $len) {
130
            $buffer = null;
131
            $result = socket_recv($this->sock, $buffer, $len - $read, 0);
132
            if ($result === 0) {
133 20
                // From linux recv() manual:
134
                // When a stream socket peer has performed an orderly shutdown,
135
                // the return value will be 0 (the traditional "end-of-file" return).
136
                // http://php.net/manual/en/function.socket-recv.php#47182
137
                $this->close();
138
                throw new AMQPConnectionClosedException('Broken pipe or closed connection');
139
            }
140
141
            if (empty($buffer)) {
142
                $read_now = microtime(true);
143 20
                $t_read = $read_now - $read_start;
144 20
                if ($t_read > $this->read_timeout) {
145
                    throw new AMQPTimeoutException('Too many read attempts detected in SocketIO');
146
                }
147 20
                $this->select($timeout_sec, $timeout_uSec);
148
                continue;
149
            }
150
151
            $read += mb_strlen($buffer, 'ASCII');
152
            $data .= $buffer;
153
        }
154
155 20
        if (mb_strlen($data, 'ASCII') !== $len) {
156
            throw new AMQPIOException(sprintf(
157 20
                'Error reading data. Received %s instead of expected %s bytes',
158
                mb_strlen($data, 'ASCII'),
159
                $len
160
            ));
161
        }
162
163 26
        $this->last_read = microtime(true);
0 ignored issues
show
Documentation Bug introduced by
It seems like microtime(true) can also be of type string. However, the property $last_read is declared as type double|integer. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
164
165
        return $data;
166 26
    }
167 1
168
    /**
169 1
     * @inheritdoc
170
     */
171
    public function write($data)
172
    {
173 25
        // Null sockets are invalid, throw exception
174
        if (is_null($this->sock)) {
175 25
            throw new AMQPSocketException(sprintf(
176 25
                'Socket was null! Last SocketError was: %s',
177 25
                socket_strerror(socket_last_error())
178
            ));
179 25
        }
180 25
181
        $this->checkBrokerHeartbeat();
182 25
183 25
        $written = 0;
184 25
        $len = mb_strlen($data, 'ASCII');
185 25
        $write_start = microtime(true);
186
187
        while ($written < $len) {
188
            $this->setErrorHandler();
189
            try {
190
                $result = 0;
191
                if ($this->select_write()) {
192
                    // if data is smaller than buffer - no need to cut part of it
193
                    if ($len <= self::BUFFER_SIZE) {
194
                        $buffer = $data;
195
                    } else {
196
                        $buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII');
197
                    }
198
                    $result = socket_write($this->sock, $buffer);
199
                }
200
                $this->throwOnError();
201
            } catch (\ErrorException $e) {
202
                $code = socket_last_error($this->sock);
203
                $constants = SocketConstants::getInstance();
204
                switch ($code) {
205
                    case $constants->SOCKET_EPIPE:
206 25
                    case $constants->SOCKET_ENETDOWN:
207 25
                    case $constants->SOCKET_ENETUNREACH:
208
                    case $constants->SOCKET_ENETRESET:
209
                    case $constants->SOCKET_ECONNABORTED:
210 25
                    case $constants->SOCKET_ECONNRESET:
211
                    case $constants->SOCKET_ECONNREFUSED:
212
                    case $constants->SOCKET_ETIMEDOUT:
213
                        $this->close();
214
                        throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e);
215
                    default:
216
                        throw new AMQPIOException(sprintf(
217 25
                            'Error sending data. Last SocketError: %s',
218 25
                            socket_strerror($code)
219 25
                        ), $code, $e);
220 25
                }
221
            } finally {
222 1
                $this->restoreErrorHandler();
223 1
            }
224
225
            if ($result === false) {
226
                throw new AMQPIOException(sprintf(
227
                    'Error sending data. Last SocketError: %s',
228
                    socket_strerror(socket_last_error($this->sock))
229
                ));
230
            }
231
232 19
            $now = microtime(true);
233
            if ($result > 0) {
234 19
                $this->last_write = $write_start = $now;
0 ignored issues
show
Documentation Bug introduced by
It seems like $write_start = $now can also be of type string. However, the property $last_write is declared as type double|integer. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
235 19
                $written += $result;
236 19
            } else {
237
                if (($now - $write_start) > $this->write_timeout) {
238 19
                    throw AMQPTimeoutException::writeTimeout($this->write_timeout);
239 19
                }
240 19
            }
241
        }
242
    }
243
244
    /**
245
     * @inheritdoc
246 22
     */
247
    public function close()
248 22
    {
249 1
        $this->disableHeartbeat();
250 1
        if (is_resource($this->sock) || is_a($this->sock, \Socket::class)) {
251
            socket_close($this->sock);
252
        }
253 21
        $this->sock = null;
254 21
        $this->last_read = 0;
255 21
        $this->last_write = 0;
256
    }
257 21
258
    /**
259
     * @inheritdoc
260
     */
261
    protected function do_select(?int $sec, int $usec)
262
    {
263 20
        if (!is_resource($this->sock) && !is_a($this->sock, \Socket::class)) {
264
            $this->sock = null;
265 20
            throw new AMQPConnectionClosedException('Broken pipe or closed connection', 0);
266 20
        }
267
268 20
        $read = array($this->sock);
269
        $write = null;
270
        $except = null;
271
272
        return socket_select($read, $write, $except, $sec, $usec);
0 ignored issues
show
Bug introduced by
$write of type null is incompatible with the type array expected by parameter $write of socket_select(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

272
        return socket_select($read, /** @scrutinizer ignore-type */ $write, $except, $sec, $usec);
Loading history...
Bug introduced by
$except of type null is incompatible with the type array expected by parameter $except of socket_select(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

272
        return socket_select($read, $write, /** @scrutinizer ignore-type */ $except, $sec, $usec);
Loading history...
273
    }
274 1
275
    /**
276 1
     * @return int|bool
277
     */
278
    protected function select_write()
279
    {
280 1
        $read = $except = null;
281
        $write = array($this->sock);
282
283
        return socket_select($read, $write, $except, 0, 100000);
0 ignored issues
show
Bug introduced by
$except of type null is incompatible with the type array expected by parameter $except of socket_select(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

283
        return socket_select($read, $write, /** @scrutinizer ignore-type */ $except, 0, 100000);
Loading history...
Bug introduced by
$read of type null is incompatible with the type array expected by parameter $read of socket_select(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

283
        return socket_select(/** @scrutinizer ignore-type */ $read, $write, $except, 0, 100000);
Loading history...
284
    }
285
286 4
    /**
287
     * @throws \PhpAmqpLib\Exception\AMQPIOException
288 4
     */
289
    protected function enable_keepalive(): void
290 4
    {
291
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
292 2
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
293
        }
294
295 2
        socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
296
    }
297
298
    /**
299
     * @inheritdoc
300
     */
301 23
    public function error_handler($errno, $errstr, $errfile, $errline): void
302
    {
303 23
        $constants = SocketConstants::getInstance();
304 23
        // socket_select warning that it has been interrupted by a signal - EINTR
305
        if (isset($constants->SOCKET_EINTR) && false !== strrpos($errstr, socket_strerror($constants->SOCKET_EINTR))) {
306
            // it's allowed while processing signals
307
            return;
308
        }
309
310
        parent::error_handler($errno, $errstr, $errfile, $errline);
311
    }
312
313
    /**
314
     * @inheritdoc
315
     */
316
    protected function setErrorHandler(): void
317
    {
318
        parent::setErrorHandler();
319
        socket_clear_error($this->sock);
320
    }
321
322
    private function isIpv6(): bool
323
    {
324
        $ipv6 = filter_var($this->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6);
325
326
        if ($ipv6 !== false || checkdnsrr($this->host, 'AAAA')) {
327
            return true;
328
        }
329
330
        return false;
331
    }
332
}
333