Completed
Pull Request — master (#345)
by
unknown
26:56
created

SocketIO   A

Complexity

Total Complexity 27

Size/Duplication

Total Lines 199
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Test Coverage

Coverage 54.02%

Importance

Changes 9
Bugs 3 Features 1
Metric Value
wmc 27
c 9
b 3
f 1
lcom 1
cbo 4
dl 0
loc 199
ccs 47
cts 87
cp 0.5402
rs 10

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 7 1
C connect() 0 34 7
A getSocket() 0 4 1
A reconnect() 0 5 1
B read() 0 30 6
B write() 0 33 5
A close() 0 7 2
A select() 0 8 1
A enable_keepalive() 0 8 3
1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPIOException;
5
use PhpAmqpLib\Exception\AMQPRuntimeException;
6
use PhpAmqpLib\Helper\MiscHelper;
7
8
class SocketIO extends AbstractIO
9
{
10
    /** @var string */
11
    protected $host;
12
13
    /** @var int */
14
    protected $port;
15
16
    /** @var float */
17
    protected $timeout;
18
19
    /** @var resource */
20
    private $sock;
21
22
    /** @var bool */
23
    private $keepalive;
24
25
    /**
26
     * @param string $host
27
     * @param int $port
28
     * @param float $timeout
29
     * @param bool $keepalive
30 25
     */
31
    public function __construct($host, $port, $timeout, $keepalive = false)
32 25
    {
33 25
        $this->host = $host;
34 25
        $this->port = $port;
35 25
        $this->timeout = $timeout;
36 25
        $this->keepalive = $keepalive;
37
    }
38
39
    /**
40
     * Sets up the socket connection
41
     *
42
     * @throws \Exception
43 25
     */
44
    public function connect()
45 25
    {
46
        $this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
47 25
48 25
        $useFloat = defined('AMQP_FLOAT_TIMEOUT_FOR_SOCKET') && AMQP_FLOAT_TIMEOUT_FOR_SOCKET === true;
49
        if (!$useFloat && is_float($this->timeout))
50 25
        {
51
            trigger_error(
52
                "Timeout value is implicitly cast to integer. Please define AMQP_FLOAT_TIMEOUT_FOR_SOCKET=true if you would like to use float value timeouts. This implicit cast will be eventually removed.",
53
                E_USER_DEPRECATED
54
            );
55
        }
56
57
        list($sec, $uSec) = $useFloat ? MiscHelper::splitSecondsMicroseconds($this->timeout) : [intval($this->timeout), 0];
58
        socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec));
59
        socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec));
60 25
61 25
        if (!socket_connect($this->sock, $this->host, $this->port)) {
62
            $errno = socket_last_error($this->sock);
63 25
            $errstr = socket_strerror($errno);
64
            throw new AMQPIOException(sprintf(
65
                'Error Connecting to server (%s): %s',
66 25
                $errno,
67
                $errstr
68
            ), $errno);
69
        }
70
71
        socket_set_block($this->sock);
72
        socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1);
73
74
        if ($this->keepalive) {
75
            $this->enable_keepalive();
76
        }
77
    }
78
79 10
    /**
80
     * @return resource
81 10
     */
82 10
    public function getSocket()
83 10
    {
84
        return $this->sock;
85
    }
86
87
    /**
88
     * Reconnects the socket
89
     */
90
    public function reconnect()
91 25
    {
92
        $this->close();
93 25
        $this->connect();
94 25
    }
95
96 25
    /**
97 25
     * @param $n
98
     * @return mixed|string
99 25
     * @throws \PhpAmqpLib\Exception\AMQPIOException
100
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
101
     */
102
    public function read($n)
103
    {
104
        $res = '';
105
        $read = 0;
106 25
107 25
        $buf = socket_read($this->sock, $n);
108 25
        while ($read < $n && $buf !== '' && $buf !== false) {
109 20
            // Null sockets are invalid, throw exception
110
            if (is_null($this->sock)) {
111 25
                throw new AMQPRuntimeException(sprintf(
112
                    'Socket was null! Last SocketError was: %s',
113
                    socket_strerror(socket_last_error())
114
                ));
115
            }
116
117
            $read += mb_strlen($buf, 'ASCII');
118
            $res .= $buf;
119 25
            $buf = socket_read($this->sock, $n - $read);
120
        }
121
122
        if (mb_strlen($res, 'ASCII') != $n) {
123
            throw new AMQPIOException(sprintf(
124
                'Error reading data. Received %s instead of expected %s bytes',
125
                mb_strlen($res, 'ASCII'),
126
                $n
127
            ));
128 25
        }
129
130 25
        return $res;
131
    }
132 25
133
    /**
134 25
     * @param $data
135
     * @return mixed|void
136
     * @throws \PhpAmqpLib\Exception\AMQPIOException
137
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
138
     */
139
    public function write($data)
140
    {
141 25
        $len = mb_strlen($data, 'ASCII');
142 25
143
        while (true) {
144
            // Null sockets are invalid, throw exception
145
            if (is_null($this->sock)) {
146
                throw new AMQPRuntimeException(sprintf(
147
                    'Socket was null! Last SocketError was: %s',
148
                    socket_strerror(socket_last_error())
149
                ));
150 25
            }
151
152
            $sent = socket_write($this->sock, $data, $len);
153
            if ($sent === false) {
154
                throw new AMQPIOException(sprintf(
155
                    'Error sending data. Last SocketError: %s',
156
                    socket_strerror(socket_last_error())
157 25
                ));
158
            }
159
160 25
            // Check if the entire message has been sent
161
            if ($sent < $len) {
162 25
                // If not sent the entire message.
163
                // Get the part of the message that has not yet been sent as message
164 25
                $data = mb_substr($data, $sent, mb_strlen($data, 'ASCII') - $sent, 'ASCII');
165 25
                // Get the length of the not sent part
166 20
                $len -= $sent;
167 25
            } else {
168 25
                break;
169
            }
170
        }
171
    }
172
173
    public function close()
174
    {
175
        if (is_resource($this->sock)) {
176
            socket_close($this->sock);
177
        }
178
        $this->sock = null;
179
    }
180
181
    /**
182
     * @param $sec
183
     * @param $usec
184
     * @return int|mixed
185
     */
186
    public function select($sec, $usec)
187
    {
188
        $read = array($this->sock);
189
        $write = null;
190
        $except = null;
191
192
        return socket_select($read, $write, $except, $sec, $usec);
193
    }
194
195
    /**
196 4
     * @throws \PhpAmqpLib\Exception\AMQPIOException
197
     */
198
    protected function enable_keepalive()
199
    {
200
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
201
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
202
        }
203
204
        socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
205
    }
206
}
207