Completed
Pull Request — master (#345)
by
unknown
40:28 queued 18:28
created

SocketIO   A

Complexity

Total Complexity 27

Size/Duplication

Total Lines 198
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Test Coverage

Coverage 54.02%

Importance

Changes 10
Bugs 4 Features 1
Metric Value
wmc 27
c 10
b 4
f 1
lcom 1
cbo 4
dl 0
loc 198
ccs 47
cts 87
cp 0.5402
rs 10

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
C connect() 0 33 7
A getSocket() 0 4 1
A reconnect() 0 5 1
B read() 0 30 6
B write() 0 33 5
A close() 0 7 2
A select() 0 8 1
A enable_keepalive() 0 8 3
1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPIOException;
5
use PhpAmqpLib\Exception\AMQPRuntimeException;
6
use PhpAmqpLib\Helper\MiscHelper;
7
8
class SocketIO extends AbstractIO
9
{
10
    /** @var string */
11
    protected $host;
12
13
    /** @var int */
14
    protected $port;
15
16
    /** @var float */
17
    protected $timeout;
18
19
    /** @var resource */
20
    private $sock;
21
22
    /** @var bool */
23
    private $keepalive;
24
25
    /**
26
     * @param string $host
27
     * @param int $port
28
     * @param float $timeout
29
     * @param bool $keepalive
30 25
     */
31
    public function __construct($host, $port, $timeout, $keepalive = false)
32 25
    {
33 25
        $this->host = $host;
34 25
        $this->port = $port;
35 25
        $this->timeout = $timeout;
36 25
        $this->keepalive = $keepalive;
37
    }
38
39
    /**
40
     * Sets up the socket connection
41
     *
42
     * @throws \Exception
43 25
     */
44
    public function connect()
45 25
    {
46
        $this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
47 25
48 25
        $useFloat = defined('AMQP_FLOAT_TIMEOUT_FOR_SOCKET') && AMQP_FLOAT_TIMEOUT_FOR_SOCKET === true;
49
        if (!$useFloat && is_float($this->timeout)) {
50 25
            trigger_error(
51
                "Timeout value is implicitly cast to integer. Please define AMQP_FLOAT_TIMEOUT_FOR_SOCKET=true if you would like to use float value timeouts. This implicit cast will be eventually removed.",
52
                E_USER_DEPRECATED
53
            );
54
        }
55
56
        list($sec, $uSec) = $useFloat ? MiscHelper::splitSecondsMicroseconds($this->timeout) : [intval($this->timeout), 0];
57
        socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec));
58
        socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec));
59
60 25
        if (!socket_connect($this->sock, $this->host, $this->port)) {
61 25
            $errno = socket_last_error($this->sock);
62
            $errstr = socket_strerror($errno);
63 25
            throw new AMQPIOException(sprintf(
64
                'Error Connecting to server (%s): %s',
65
                $errno,
66 25
                $errstr
67
            ), $errno);
68
        }
69
70
        socket_set_block($this->sock);
71
        socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1);
72
73
        if ($this->keepalive) {
74
            $this->enable_keepalive();
75
        }
76
    }
77
78
    /**
79 10
     * @return resource
80
     */
81 10
    public function getSocket()
82 10
    {
83 10
        return $this->sock;
84
    }
85
86
    /**
87
     * Reconnects the socket
88
     */
89
    public function reconnect()
90
    {
91 25
        $this->close();
92
        $this->connect();
93 25
    }
94 25
95
    /**
96 25
     * @param $n
97 25
     * @return mixed|string
98
     * @throws \PhpAmqpLib\Exception\AMQPIOException
99 25
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
100
     */
101
    public function read($n)
102
    {
103
        $res = '';
104
        $read = 0;
105
106 25
        $buf = socket_read($this->sock, $n);
107 25
        while ($read < $n && $buf !== '' && $buf !== false) {
108 25
            // Null sockets are invalid, throw exception
109 20
            if (is_null($this->sock)) {
110
                throw new AMQPRuntimeException(sprintf(
111 25
                    'Socket was null! Last SocketError was: %s',
112
                    socket_strerror(socket_last_error())
113
                ));
114
            }
115
116
            $read += mb_strlen($buf, 'ASCII');
117
            $res .= $buf;
118
            $buf = socket_read($this->sock, $n - $read);
119 25
        }
120
121
        if (mb_strlen($res, 'ASCII') != $n) {
122
            throw new AMQPIOException(sprintf(
123
                'Error reading data. Received %s instead of expected %s bytes',
124
                mb_strlen($res, 'ASCII'),
125
                $n
126
            ));
127
        }
128 25
129
        return $res;
130 25
    }
131
132 25
    /**
133
     * @param $data
134 25
     * @return mixed|void
135
     * @throws \PhpAmqpLib\Exception\AMQPIOException
136
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
137
     */
138
    public function write($data)
139
    {
140
        $len = mb_strlen($data, 'ASCII');
141 25
142 25
        while (true) {
143
            // Null sockets are invalid, throw exception
144
            if (is_null($this->sock)) {
145
                throw new AMQPRuntimeException(sprintf(
146
                    'Socket was null! Last SocketError was: %s',
147
                    socket_strerror(socket_last_error())
148
                ));
149
            }
150 25
151
            $sent = socket_write($this->sock, $data, $len);
152
            if ($sent === false) {
153
                throw new AMQPIOException(sprintf(
154
                    'Error sending data. Last SocketError: %s',
155
                    socket_strerror(socket_last_error())
156
                ));
157 25
            }
158
159
            // Check if the entire message has been sent
160 25
            if ($sent < $len) {
161
                // If not sent the entire message.
162 25
                // Get the part of the message that has not yet been sent as message
163
                $data = mb_substr($data, $sent, mb_strlen($data, 'ASCII') - $sent, 'ASCII');
164 25
                // Get the length of the not sent part
165 25
                $len -= $sent;
166 20
            } else {
167 25
                break;
168 25
            }
169
        }
170
    }
171
172
    public function close()
173
    {
174
        if (is_resource($this->sock)) {
175
            socket_close($this->sock);
176
        }
177
        $this->sock = null;
178
    }
179
180
    /**
181
     * @param $sec
182
     * @param $usec
183
     * @return int|mixed
184
     */
185
    public function select($sec, $usec)
186
    {
187
        $read = array($this->sock);
188
        $write = null;
189
        $except = null;
190
191
        return socket_select($read, $write, $except, $sec, $usec);
192
    }
193
194
    /**
195
     * @throws \PhpAmqpLib\Exception\AMQPIOException
196 4
     */
197
    protected function enable_keepalive()
198
    {
199
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
200
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
201
        }
202
203
        socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
204
    }
205
}
206