Completed
Pull Request — master (#345)
by
unknown
45:15 queued 09:00
created

SocketIO::connect()   C

Complexity

Conditions 7
Paths 24

Size

Total Lines 35
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 15.4039

Importance

Changes 7
Bugs 3 Features 0
Metric Value
c 7
b 3
f 0
dl 0
loc 35
ccs 8
cts 18
cp 0.4444
rs 6.7273
cc 7
eloc 24
nc 24
nop 0
crap 15.4039
1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPIOException;
5
use PhpAmqpLib\Exception\AMQPRuntimeException;
6
use PhpAmqpLib\Helper\MiscHelper;
7
8
class SocketIO extends AbstractIO
9
{
10
    /** @var string */
11
    protected $host;
12
13
    /** @var int */
14
    protected $port;
15
16
    /** @var float */
17
    protected $timeout;
18
19
    /** @var resource */
20
    private $sock;
21
22
    /** @var bool */
23
    private $keepalive;
24
25
    /**
26
     * @param string $host
27
     * @param int $port
28
     * @param float $timeout
29
     * @param bool $keepalive
30 25
     */
31
    public function __construct($host, $port, $timeout, $keepalive = false)
32 25
    {
33 25
        $this->host = $host;
34 25
        $this->port = $port;
35 25
        $this->timeout = $timeout;
36 25
        $this->keepalive = $keepalive;
37
    }
38
39
    /**
40
     * Sets up the socket connection
41
     *
42
     * @throws \Exception
43 25
     */
44
    public function connect()
45 25
    {
46
        $this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
47 25
48 25
        $useFloat = defined('AMQP_FLOAT_TIMEOUT_FOR_SOCKET') && AMQP_FLOAT_TIMEOUT_FOR_SOCKET === true;
49
        if (!$useFloat && is_float($this->timeout)) {
50 25
            trigger_error(
51
                "Timeout value is implicitly cast to integer. Please define AMQP_FLOAT_TIMEOUT_FOR_SOCKET=true if you would like to use float value timeouts. This implicit cast will be eventually removed.",
52
                E_USER_DEPRECATED
53
            );
54
        }
55
56
        list($sec, $uSec) = $useFloat
57
            ? MiscHelper::splitSecondsMicroseconds($this->timeout)
58
            : array(intval($this->timeout), 0);
59
        socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec));
60 25
        socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec));
61 25
62
        if (!socket_connect($this->sock, $this->host, $this->port)) {
63 25
            $errno = socket_last_error($this->sock);
64
            $errstr = socket_strerror($errno);
65
            throw new AMQPIOException(sprintf(
66 25
                'Error Connecting to server (%s): %s',
67
                $errno,
68
                $errstr
69
            ), $errno);
70
        }
71
72
        socket_set_block($this->sock);
73
        socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1);
74
75
        if ($this->keepalive) {
76
            $this->enable_keepalive();
77
        }
78
    }
79 10
80
    /**
81 10
     * @return resource
82 10
     */
83 10
    public function getSocket()
84
    {
85
        return $this->sock;
86
    }
87
88
    /**
89
     * Reconnects the socket
90
     */
91 25
    public function reconnect()
92
    {
93 25
        $this->close();
94 25
        $this->connect();
95
    }
96 25
97 25
    /**
98
     * @param $n
99 25
     * @return mixed|string
100
     * @throws \PhpAmqpLib\Exception\AMQPIOException
101
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
102
     */
103
    public function read($n)
104
    {
105
        $res = '';
106 25
        $read = 0;
107 25
108 25
        $buf = socket_read($this->sock, $n);
109 20
        while ($read < $n && $buf !== '' && $buf !== false) {
110
            // Null sockets are invalid, throw exception
111 25
            if (is_null($this->sock)) {
112
                throw new AMQPRuntimeException(sprintf(
113
                    'Socket was null! Last SocketError was: %s',
114
                    socket_strerror(socket_last_error())
115
                ));
116
            }
117
118
            $read += mb_strlen($buf, 'ASCII');
119 25
            $res .= $buf;
120
            $buf = socket_read($this->sock, $n - $read);
121
        }
122
123
        if (mb_strlen($res, 'ASCII') != $n) {
124
            throw new AMQPIOException(sprintf(
125
                'Error reading data. Received %s instead of expected %s bytes',
126
                mb_strlen($res, 'ASCII'),
127
                $n
128 25
            ));
129
        }
130 25
131
        return $res;
132 25
    }
133
134 25
    /**
135
     * @param $data
136
     * @return mixed|void
137
     * @throws \PhpAmqpLib\Exception\AMQPIOException
138
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
139
     */
140
    public function write($data)
141 25
    {
142 25
        $len = mb_strlen($data, 'ASCII');
143
144
        while (true) {
145
            // Null sockets are invalid, throw exception
146
            if (is_null($this->sock)) {
147
                throw new AMQPRuntimeException(sprintf(
148
                    'Socket was null! Last SocketError was: %s',
149
                    socket_strerror(socket_last_error())
150 25
                ));
151
            }
152
153
            $sent = socket_write($this->sock, $data, $len);
154
            if ($sent === false) {
155
                throw new AMQPIOException(sprintf(
156
                    'Error sending data. Last SocketError: %s',
157 25
                    socket_strerror(socket_last_error())
158
                ));
159
            }
160 25
161
            // Check if the entire message has been sent
162 25
            if ($sent < $len) {
163
                // If not sent the entire message.
164 25
                // Get the part of the message that has not yet been sent as message
165 25
                $data = mb_substr($data, $sent, mb_strlen($data, 'ASCII') - $sent, 'ASCII');
166 20
                // Get the length of the not sent part
167 25
                $len -= $sent;
168 25
            } else {
169
                break;
170
            }
171
        }
172
    }
173
174
    public function close()
175
    {
176
        if (is_resource($this->sock)) {
177
            socket_close($this->sock);
178
        }
179
        $this->sock = null;
180
    }
181
182
    /**
183
     * @param $sec
184
     * @param $usec
185
     * @return int|mixed
186
     */
187
    public function select($sec, $usec)
188
    {
189
        $read = array($this->sock);
190
        $write = null;
191
        $except = null;
192
193
        return socket_select($read, $write, $except, $sec, $usec);
194
    }
195
196 4
    /**
197
     * @throws \PhpAmqpLib\Exception\AMQPIOException
198
     */
199
    protected function enable_keepalive()
200
    {
201
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
202
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
203
        }
204
205
        socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
206
    }
207
}
208