Completed
Push — master ( 9b90e7...a2b649 )
by John
17:01 queued 14:42
created

SocketIO   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 189
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 3

Test Coverage

Coverage 53.49%

Importance

Changes 5
Bugs 3 Features 0
Metric Value
wmc 23
c 5
b 3
f 0
lcom 1
cbo 3
dl 0
loc 189
ccs 46
cts 86
cp 0.5349
rs 10

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
B connect() 0 24 3
A getSocket() 0 4 1
A reconnect() 0 5 1
A close() 0 7 2
A enable_keepalive() 0 8 3
B read() 0 30 6
B write() 0 33 5
A select() 0 8 1
1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPIOException;
5
use PhpAmqpLib\Exception\AMQPRuntimeException;
6
7
class SocketIO extends AbstractIO
8
{
9
    /** @var string */
10
    protected $host;
11
12
    /** @var int */
13
    protected $port;
14
15
    /** @var int */
16
    protected $timeout;
17
18
    /** @var resource */
19
    private $sock;
20
21
    /** @var bool */
22
    private $keepalive;
23
24
    /**
25
     * @param string $host
26
     * @param int $port
27
     * @param int $timeout
28
     * @param bool $keepalive
29
     */
30 18
    public function __construct($host, $port, $timeout, $keepalive = false)
31
    {
32 18
        $this->host = $host;
33 18
        $this->port = $port;
34 18
        $this->timeout = $timeout;
35 18
        $this->keepalive = $keepalive;
36 18
    }
37
38
    /**
39
     * Sets up the socket connection
40
     *
41
     * @throws \Exception
42
     */
43 18
    public function connect()
44
    {
45 18
        $this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
46
47 18
        socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $this->timeout, 'usec' => 0));
48 18
        socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $this->timeout, 'usec' => 0));
49
50 18
        if (!socket_connect($this->sock, $this->host, $this->port)) {
51
            $errno = socket_last_error($this->sock);
52
            $errstr = socket_strerror($errno);
53
            throw new AMQPIOException(sprintf(
54
                'Error Connecting to server (%s): %s',
55
                $errno,
56
                $errstr
57
            ), $errno);
58
        }
59
60 18
        socket_set_block($this->sock);
61 18
        socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1);
62
63 18
        if ($this->keepalive) {
64
            $this->enable_keepalive();
65
        }
66 18
    }
67
68
    /**
69
     * @return resource
70
     */
71
    public function getSocket()
72
    {
73
        return $this->sock;
74
    }
75
76
    /**
77
     * Reconnects the socket
78
     */
79 12
    public function reconnect()
80
    {
81 12
        $this->close();
82 12
        $this->connect();
83 12
    }
84
85
    /**
86
     * @param $n
87
     * @return mixed|string
88
     * @throws \PhpAmqpLib\Exception\AMQPIOException
89
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
90
     */
91 18
    public function read($n)
92
    {
93 18
        $res = '';
94 18
        $read = 0;
95
96 18
        $buf = socket_read($this->sock, $n);
97 18
        while ($read < $n && $buf !== '' && $buf !== false) {
98
            // Null sockets are invalid, throw exception
99 18
            if (is_null($this->sock)) {
100
                throw new AMQPRuntimeException(sprintf(
101
                    'Socket was null! Last SocketError was: %s',
102
                    socket_strerror(socket_last_error())
103
                ));
104
            }
105
106 18
            $read += mb_strlen($buf, 'ASCII');
107 18
            $res .= $buf;
108 18
            $buf = socket_read($this->sock, $n - $read);
109 12
        }
110
111 18
        if (mb_strlen($res, 'ASCII') != $n) {
112
            throw new AMQPIOException(sprintf(
113
                'Error reading data. Received %s instead of expected %s bytes',
114
                mb_strlen($res, 'ASCII'),
115
                $n
116
            ));
117
        }
118
119 18
        return $res;
120
    }
121
122
    /**
123
     * @param $data
124
     * @return mixed|void
125
     * @throws \PhpAmqpLib\Exception\AMQPIOException
126
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
127
     */
128 18
    public function write($data)
129
    {
130 18
        $len = mb_strlen($data, 'ASCII');
131
132 18
        while (true) {
133
            // Null sockets are invalid, throw exception
134 18
            if (is_null($this->sock)) {
135
                throw new AMQPRuntimeException(sprintf(
136
                    'Socket was null! Last SocketError was: %s',
137
                    socket_strerror(socket_last_error())
138
                ));
139
            }
140
141 18
            $sent = socket_write($this->sock, $data, $len);
142 18
            if ($sent === false) {
143
                throw new AMQPIOException(sprintf(
144
                    'Error sending data. Last SocketError: %s',
145
                    socket_strerror(socket_last_error())
146
                ));
147
            }
148
149
            // Check if the entire message has been sent
150 18
            if ($sent < $len) {
151
                // If not sent the entire message.
152
                // Get the part of the message that has not yet been sent as message
153
                $data = mb_substr($data, $sent, mb_strlen($data, 'ASCII') - $sent, 'ASCII');
154
                // Get the length of the not sent part
155
                $len -= $sent;
156
            } else {
157 18
                break;
158
            }
159
        }
160 18
    }
161
162 18
    public function close()
163
    {
164 18
        if (is_resource($this->sock)) {
165 18
            socket_close($this->sock);
166 12
        }
167 18
        $this->sock = null;
168 18
    }
169
170
    /**
171
     * @param $sec
172
     * @param $usec
173
     * @return int|mixed
174
     */
175
    public function select($sec, $usec)
176
    {
177
        $read = array($this->sock);
178
        $write = null;
179
        $except = null;
180
181
        return socket_select($read, $write, $except, $sec, $usec);
182
    }
183
184
    /**
185
     * @throws \PhpAmqpLib\Exception\AMQPIOException
186
     */
187
    protected function enable_keepalive()
188
    {
189
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
190
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
191
        }
192
193
        socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
194
    }
195
}
196