Completed
Push — master ( c3ed14...f41031 )
by
unknown
13s
created

SocketIO::check_heartbeat()   B

Complexity

Conditions 6
Paths 5

Size

Total Lines 19
Code Lines 9

Duplication

Lines 19
Ratio 100 %

Code Coverage

Tests 3
CRAP Score 22.384

Importance

Changes 0
Metric Value
dl 19
loc 19
ccs 3
cts 13
cp 0.2308
rs 8.8571
c 0
b 0
f 0
cc 6
eloc 9
nc 5
nop 0
crap 22.384
1
<?php
2
namespace PhpAmqpLib\Wire\IO;
3
4
use PhpAmqpLib\Exception\AMQPIOException;
5
use PhpAmqpLib\Exception\AMQPRuntimeException;
6
use PhpAmqpLib\Helper\MiscHelper;
7
use PhpAmqpLib\Wire\AMQPWriter;
8
9
class SocketIO extends AbstractIO
10
{
11
    /** @var string */
12
    protected $host;
13
14
    /** @var int */
15
    protected $port;
16
17
    /** @var float */
18
    protected $send_timeout;
19
20
    /** @var float */
21
    protected $read_timeout;
22
23
    /** @var int */
24
    protected $heartbeat;
25
26
    /** @var float */
27
    protected $last_read;
28
29
    /** @var float */
30
    protected $last_write;
31
32
    /** @var resource */
33
    private $sock;
34
35
    /** @var bool */
36
    private $keepalive;
37
38
    /**
39
     * @param string $host
40
     * @param int $port
41
     * @param float $read_timeout
42
     * @param bool $keepalive
43
     * @param float|null $write_timeout if null defaults to read timeout
44
     * @param int $heartbeat how often to send heartbeat. 0 means off
45
     */
46 66
    public function __construct($host, $port, $read_timeout, $keepalive = false, $write_timeout = null, $heartbeat = 0)
47
    {
48 66
        $this->host = $host;
49 66
        $this->port = $port;
50 66
        $this->read_timeout = $read_timeout;
51 66
        $this->send_timeout = $write_timeout ?: $read_timeout;
52 66
        $this->heartbeat = $heartbeat;
53 66
        $this->keepalive = $keepalive;
54 66
    }
55
56
    /**
57
     * Sets up the socket connection
58
     *
59
     * @throws \Exception
60
     */
61 66
    public function connect()
62
    {
63 66
        $this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
64
65 66
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->send_timeout);
66 66
        socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec));
67 66
        list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
68 66
        socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec));
69
70 66
        if (!socket_connect($this->sock, $this->host, $this->port)) {
71 6
            $errno = socket_last_error($this->sock);
72 6
            $errstr = socket_strerror($errno);
73 6
            throw new AMQPIOException(sprintf(
74 6
                'Error Connecting to server (%s): %s',
75 4
                $errno,
76 2
                $errstr
77 6
            ), $errno);
78
        }
79
80 60
        socket_set_block($this->sock);
81 60
        socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1);
82
83 60
        if ($this->keepalive) {
84 6
            $this->enable_keepalive();
85 4
        }
86 60
    }
87
88
    /**
89
     * @return resource
90
     */
91
    public function getSocket()
92
    {
93
        return $this->sock;
94
    }
95
96
    /**
97
     * Reconnects the socket
98
     */
99 24
    public function reconnect()
100
    {
101 24
        $this->close();
102 24
        $this->connect();
103 24
    }
104
105
    /**
106
     * @param int $n
107
     * @return mixed|string
108
     * @throws \PhpAmqpLib\Exception\AMQPIOException
109
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
110
     */
111 60
    public function read($n)
112
    {
113 60
        if (is_null($this->sock)) {
114 6
            throw new AMQPRuntimeException(sprintf(
115 6
                'Socket was null! Last SocketError was: %s',
116 6
                socket_strerror(socket_last_error())
117 4
            ));
118
        }
119 54
        $res = '';
120 54
        $read = 0;
121 54
        $buf = socket_read($this->sock, $n);
122 54
        while ($read < $n && $buf !== '' && $buf !== false) {
123 54
            $this->check_heartbeat();
124
125 54
            $read += mb_strlen($buf, 'ASCII');
126 54
            $res .= $buf;
127 54
            $buf = socket_read($this->sock, $n - $read);
128 36
        }
129
130 54
        if (mb_strlen($res, 'ASCII') != $n) {
131
            throw new AMQPIOException(sprintf(
132
                'Error reading data. Received %s instead of expected %s bytes',
133
                mb_strlen($res, 'ASCII'),
134
                $n
135
            ));
136
        }
137
138 54
        $this->last_read = microtime(true);
139
140 54
        return $res;
141
    }
142
143
    /**
144
     * @param string $data
145
     * @return void
146
     *
147
     * @throws \PhpAmqpLib\Exception\AMQPIOException
148
     * @throws \PhpAmqpLib\Exception\AMQPRuntimeException
149
     */
150 60
    public function write($data)
151
    {
152 60
        $len = mb_strlen($data, 'ASCII');
153
154 60
        while (true) {
155
            // Null sockets are invalid, throw exception
156 60
            if (is_null($this->sock)) {
157 6
                throw new AMQPRuntimeException(sprintf(
158 6
                    'Socket was null! Last SocketError was: %s',
159 6
                    socket_strerror(socket_last_error())
160 4
                ));
161
            }
162
163 54
            $sent = socket_write($this->sock, $data, $len);
164 54
            if ($sent === false) {
165
                throw new AMQPIOException(sprintf(
166
                    'Error sending data. Last SocketError: %s',
167
                    socket_strerror(socket_last_error())
168
                ));
169
            }
170
171
            // Check if the entire message has been sent
172 54
            if ($sent < $len) {
173
                // If not sent the entire message.
174
                // Get the part of the message that has not yet been sent as message
175
                $data = mb_substr($data, $sent, mb_strlen($data, 'ASCII') - $sent, 'ASCII');
176
                // Get the length of the not sent part
177
                $len -= $sent;
178
            } else {
179 54
                break;
180
            }
181
        }
182
183 54
        $this->last_write = microtime(true);
184 54
    }
185
186 60 View Code Duplication
    public function close()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
187
    {
188 60
        if (is_resource($this->sock)) {
189 60
            socket_close($this->sock);
190 40
        }
191 60
        $this->sock = null;
192 60
        $this->last_read = null;
193 60
        $this->last_write = null;
194 60
    }
195
196
    /**
197
     * @param int $sec
198
     * @param int $usec
199
     * @return int|mixed
200
     */
201 6
    public function select($sec, $usec)
202
    {
203 6
        $read = array($this->sock);
204 6
        $write = null;
205 6
        $except = null;
206
207 6
        return socket_select($read, $write, $except, $sec, $usec);
208
    }
209
210
    /**
211
     * @throws \PhpAmqpLib\Exception\AMQPIOException
212
     */
213 6
    protected function enable_keepalive()
214
    {
215 6
        if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
216
            throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
217
        }
218
219 6
        socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
220 6
    }
221
222
    /**
223
     * Heartbeat logic: check connection health here
224
     */
225 54 View Code Duplication
    protected function check_heartbeat()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
226
    {
227
        // ignore unless heartbeat interval is set
228 54
        if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) {
229
            $t = microtime(true);
230
            $t_read = round($t - $this->last_read);
231
            $t_write = round($t - $this->last_write);
232
233
            // server has gone away
234
            if (($this->heartbeat * 2) < $t_read) {
235
                $this->reconnect();
236
            }
237
238
            // time for client to send a heartbeat
239
            if (($this->heartbeat / 2) < $t_write) {
240
                $this->write_heartbeat();
241
            }
242
        }
243 54
    }
244
245
    /**
246
     * Sends a heartbeat message
247
     */
248 View Code Duplication
    protected function write_heartbeat()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
249
    {
250
        $pkt = new AMQPWriter();
251
        $pkt->write_octet(8);
252
        $pkt->write_short(0);
253
        $pkt->write_long(0);
254
        $pkt->write_octet(0xCE);
255
        $this->write($pkt->getvalue());
256
    }
257
}
258