Completed
Pull Request — master (#345)
by
unknown
13:49
created

SocketIO   A

Complexity

Total Complexity 27

Size/Duplication

Total Lines 200
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Test Coverage

Coverage 53.13%

Importance

Changes 11
Bugs 4 Features 1
Metric Value
wmc 27
c 11
b 4
f 1
lcom 1
cbo 4
dl 0
loc 200
ccs 51
cts 96
cp 0.5313
rs 10

9 Methods

Rating   Name   Duplication   Size   Complexity  
A select() 0 8 1
A __construct() 0 7 1
C connect() 0 35 7
A getSocket() 0 4 1
A reconnect() 0 5 1
B read() 0 30 6
B write() 0 33 5
A close() 0 7 2
A enable_keepalive() 0 8 3
1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPIOException;
5
use PhpAmqpLib\Exception\AMQPRuntimeException;
6
use PhpAmqpLib\Helper\MiscHelper;
7
8
class SocketIO extends AbstractIO
9
{
10
    /** @var string */
11
    protected $host;
12
13
    /** @var int */
14
    protected $port;
15
16
    /** @var float */
17
    protected $timeout;
18
19
    /** @var resource */
20
    private $sock;
21
22
    /** @var bool */
23
    private $keepalive;
24
25
    /**
26
     * @param string $host
27
     * @param int $port
28
     * @param float $timeout
29
     * @param bool $keepalive
30
     */
31 25
    public function __construct($host, $port, $timeout, $keepalive = false)
32
    {
33 25
        $this->host = $host;
34 25
        $this->port = $port;
35 25
        $this->timeout = $timeout;
36 25
        $this->keepalive = $keepalive;
37 25
    }
38
39
    /**
40
     * Sets up the socket connection
41
     *
42
     * @throws \Exception
43
     */
44 25
    public function connect()
45
    {
46 25
        $this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
47
48 25
        $useFloat = defined('AMQP_FLOAT_TIMEOUT_FOR_SOCKET') && AMQP_FLOAT_TIMEOUT_FOR_SOCKET === true;
49 25
        if (!$useFloat && is_float($this->timeout)) {
50
            trigger_error(
51
                "Timeout value is implicitly cast to integer. Please define AMQP_FLOAT_TIMEOUT_FOR_SOCKET=true if you would like to use float value timeouts. This implicit cast will be eventually removed.",
52
                E_USER_DEPRECATED
53
            );
54
        }
55
56 5
        list($sec, $uSec) = $useFloat
57 20
            ? MiscHelper::splitSecondsMicroseconds($this->timeout)
58 25
            : array(intval($this->timeout), 0);
59 25
        socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec));
60 25
        socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec));
61
62 25
        if (!socket_connect($this->sock, $this->host, $this->port)) {
63
            $errno = socket_last_error($this->sock);
64
            $errstr = socket_strerror($errno);
65
            throw new AMQPIOException(sprintf(
66
                'Error Connecting to server (%s): %s',
67
                $errno,
68
                $errstr
69
            ), $errno);
70
        }
71
72 25
        socket_set_block($this->sock);
73 25
        socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1);
74
75 25
        if ($this->keepalive) {
76
            $this->enable_keepalive();
77
        }
78 25
    }
79
80
    /**
81
     * @return resource
82
     */
83
    public function getSocket()
84
    {
85
        return $this->sock;
86
    }
87
88
    /**
89
     * Reconnects the socket
90
     */
91 10
    public function reconnect()
92
    {
93 10
        $this->close();
94 10
        $this->connect();
95 10
    }
96
97
    /**
98
     * @param $n
99
     * @return mixed|string
100
     * @throws \PhpAmqpLib\Exception\AMQPIOException
101
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
102
     */
103 25
    public function read($n)
104
    {
105 25
        $res = '';
106 25
        $read = 0;
107
108 25
        $buf = socket_read($this->sock, $n);
109 25
        while ($read < $n && $buf !== '' && $buf !== false) {
110
            // Null sockets are invalid, throw exception
111 25
            if (is_null($this->sock)) {
112
                throw new AMQPRuntimeException(sprintf(
113
                    'Socket was null! Last SocketError was: %s',
114
                    socket_strerror(socket_last_error())
115
                ));
116
            }
117
118 25
            $read += mb_strlen($buf, 'ASCII');
119 25
            $res .= $buf;
120 25
            $buf = socket_read($this->sock, $n - $read);
121 20
        }
122
123 25
        if (mb_strlen($res, 'ASCII') != $n) {
124
            throw new AMQPIOException(sprintf(
125
                'Error reading data. Received %s instead of expected %s bytes',
126
                mb_strlen($res, 'ASCII'),
127
                $n
128
            ));
129
        }
130
131 25
        return $res;
132
    }
133
134
    /**
135
     * @param $data
136
     * @return mixed|void
137
     * @throws \PhpAmqpLib\Exception\AMQPIOException
138
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
139
     */
140 25
    public function write($data)
141
    {
142 25
        $len = mb_strlen($data, 'ASCII');
143
144 25
        while (true) {
145
            // Null sockets are invalid, throw exception
146 25
            if (is_null($this->sock)) {
147
                throw new AMQPRuntimeException(sprintf(
148
                    'Socket was null! Last SocketError was: %s',
149
                    socket_strerror(socket_last_error())
150
                ));
151
            }
152
153 25
            $sent = socket_write($this->sock, $data, $len);
154 25
            if ($sent === false) {
155
                throw new AMQPIOException(sprintf(
156
                    'Error sending data. Last SocketError: %s',
157
                    socket_strerror(socket_last_error())
158
                ));
159
            }
160
161
            // Check if the entire message has been sent
162 25
            if ($sent < $len) {
163
                // If not sent the entire message.
164
                // Get the part of the message that has not yet been sent as message
165
                $data = mb_substr($data, $sent, mb_strlen($data, 'ASCII') - $sent, 'ASCII');
166
                // Get the length of the not sent part
167
                $len -= $sent;
168
            } else {
169 25
                break;
170
            }
171
        }
172 25
    }
173
174 25
    public function close()
175
    {
176 25
        if (is_resource($this->sock)) {
177 25
            socket_close($this->sock);
178 20
        }
179 25
        $this->sock = null;
180 25
    }
181
182
    /**
183
     * @param $sec
184
     * @param $usec
185
     * @return int|mixed
186
     */
187
    public function select($sec, $usec)
188
    {
189
        $read = array($this->sock);
190
        $write = null;
191
        $except = null;
192
193
        return socket_select($read, $write, $except, $sec, $usec);
194
    }
195
196
    /**
197
     * @throws \PhpAmqpLib\Exception\AMQPIOException
198
     */
199
    protected function enable_keepalive()
200
    {
201
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
202
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
203
        }
204
205
        socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
206
    }
207
}
208