1 | <?php |
||
8 | class SocketIO extends AbstractIO |
||
9 | { |
||
10 | /** @var string */ |
||
11 | protected $host; |
||
12 | |||
13 | /** @var int */ |
||
14 | protected $port; |
||
15 | |||
16 | /** @var float */ |
||
17 | protected $timeout; |
||
18 | |||
19 | /** @var resource */ |
||
20 | private $sock; |
||
21 | |||
22 | /** @var bool */ |
||
23 | private $keepalive; |
||
24 | |||
25 | /** |
||
26 | * @param string $host |
||
27 | * @param int $port |
||
28 | * @param float $timeout |
||
29 | * @param bool $keepalive |
||
30 | */ |
||
31 | 25 | public function __construct($host, $port, $timeout, $keepalive = false) |
|
32 | { |
||
33 | 25 | $this->host = $host; |
|
34 | 25 | $this->port = $port; |
|
35 | 25 | $this->timeout = $timeout; |
|
36 | 25 | $this->keepalive = $keepalive; |
|
37 | 25 | } |
|
38 | |||
39 | /** |
||
40 | * Sets up the socket connection |
||
41 | * |
||
42 | * @throws \Exception |
||
43 | */ |
||
44 | 25 | public function connect() |
|
45 | { |
||
46 | 25 | $this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); |
|
47 | |||
48 | 25 | $useFloat = defined('AMQP_FLOAT_TIMEOUT_FOR_SOCKET') && AMQP_FLOAT_TIMEOUT_FOR_SOCKET === true; |
|
49 | 25 | if (!$useFloat && is_float($this->timeout)) { |
|
50 | trigger_error( |
||
51 | "Timeout value is implicitly cast to integer. Please define AMQP_FLOAT_TIMEOUT_FOR_SOCKET=true if you would like to use float value timeouts. This implicit cast will be eventually removed.", |
||
52 | E_USER_DEPRECATED |
||
53 | ); |
||
54 | } |
||
55 | |||
56 | 5 | list($sec, $uSec) = $useFloat |
|
57 | 20 | ? MiscHelper::splitSecondsMicroseconds($this->timeout) |
|
58 | 25 | : array(intval($this->timeout), 0); |
|
59 | 25 | socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec)); |
|
60 | 25 | socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec)); |
|
61 | |||
62 | 25 | if (!socket_connect($this->sock, $this->host, $this->port)) { |
|
63 | $errno = socket_last_error($this->sock); |
||
64 | $errstr = socket_strerror($errno); |
||
65 | throw new AMQPIOException(sprintf( |
||
66 | 'Error Connecting to server (%s): %s', |
||
67 | $errno, |
||
68 | $errstr |
||
69 | ), $errno); |
||
70 | } |
||
71 | |||
72 | 25 | socket_set_block($this->sock); |
|
73 | 25 | socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1); |
|
74 | |||
75 | 25 | if ($this->keepalive) { |
|
76 | $this->enable_keepalive(); |
||
77 | } |
||
78 | 25 | } |
|
79 | |||
80 | /** |
||
81 | * @return resource |
||
82 | */ |
||
83 | public function getSocket() |
||
84 | { |
||
85 | return $this->sock; |
||
86 | } |
||
87 | |||
88 | /** |
||
89 | * Reconnects the socket |
||
90 | */ |
||
91 | 10 | public function reconnect() |
|
92 | { |
||
93 | 10 | $this->close(); |
|
94 | 10 | $this->connect(); |
|
95 | 10 | } |
|
96 | |||
97 | /** |
||
98 | * @param $n |
||
99 | * @return mixed|string |
||
100 | * @throws \PhpAmqpLib\Exception\AMQPIOException |
||
101 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
102 | */ |
||
103 | 25 | public function read($n) |
|
104 | { |
||
105 | 25 | $res = ''; |
|
106 | 25 | $read = 0; |
|
107 | |||
108 | 25 | $buf = socket_read($this->sock, $n); |
|
109 | 25 | while ($read < $n && $buf !== '' && $buf !== false) { |
|
110 | // Null sockets are invalid, throw exception |
||
111 | 25 | if (is_null($this->sock)) { |
|
112 | throw new AMQPRuntimeException(sprintf( |
||
113 | 'Socket was null! Last SocketError was: %s', |
||
114 | socket_strerror(socket_last_error()) |
||
115 | )); |
||
116 | } |
||
117 | |||
118 | 25 | $read += mb_strlen($buf, 'ASCII'); |
|
119 | 25 | $res .= $buf; |
|
120 | 25 | $buf = socket_read($this->sock, $n - $read); |
|
121 | 20 | } |
|
122 | |||
123 | 25 | if (mb_strlen($res, 'ASCII') != $n) { |
|
124 | throw new AMQPIOException(sprintf( |
||
125 | 'Error reading data. Received %s instead of expected %s bytes', |
||
126 | mb_strlen($res, 'ASCII'), |
||
127 | $n |
||
128 | )); |
||
129 | } |
||
130 | |||
131 | 25 | return $res; |
|
132 | } |
||
133 | |||
134 | /** |
||
135 | * @param $data |
||
136 | * @return mixed|void |
||
137 | * @throws \PhpAmqpLib\Exception\AMQPIOException |
||
138 | * @throws \PhpAmqpLib\Exception\AMQPRuntimeException |
||
139 | */ |
||
140 | 25 | public function write($data) |
|
141 | { |
||
142 | 25 | $len = mb_strlen($data, 'ASCII'); |
|
143 | |||
144 | 25 | while (true) { |
|
145 | // Null sockets are invalid, throw exception |
||
146 | 25 | if (is_null($this->sock)) { |
|
147 | throw new AMQPRuntimeException(sprintf( |
||
148 | 'Socket was null! Last SocketError was: %s', |
||
149 | socket_strerror(socket_last_error()) |
||
150 | )); |
||
151 | } |
||
152 | |||
153 | 25 | $sent = socket_write($this->sock, $data, $len); |
|
154 | 25 | if ($sent === false) { |
|
155 | throw new AMQPIOException(sprintf( |
||
156 | 'Error sending data. Last SocketError: %s', |
||
157 | socket_strerror(socket_last_error()) |
||
158 | )); |
||
159 | } |
||
160 | |||
161 | // Check if the entire message has been sent |
||
162 | 25 | if ($sent < $len) { |
|
163 | // If not sent the entire message. |
||
164 | // Get the part of the message that has not yet been sent as message |
||
165 | $data = mb_substr($data, $sent, mb_strlen($data, 'ASCII') - $sent, 'ASCII'); |
||
166 | // Get the length of the not sent part |
||
167 | $len -= $sent; |
||
168 | } else { |
||
169 | 25 | break; |
|
170 | } |
||
171 | } |
||
172 | 25 | } |
|
173 | |||
174 | 25 | public function close() |
|
175 | { |
||
176 | 25 | if (is_resource($this->sock)) { |
|
177 | 25 | socket_close($this->sock); |
|
178 | 20 | } |
|
179 | 25 | $this->sock = null; |
|
180 | 25 | } |
|
181 | |||
182 | /** |
||
183 | * @param $sec |
||
184 | * @param $usec |
||
185 | * @return int|mixed |
||
186 | */ |
||
187 | public function select($sec, $usec) |
||
195 | |||
196 | /** |
||
197 | * @throws \PhpAmqpLib\Exception\AMQPIOException |
||
198 | */ |
||
199 | protected function enable_keepalive() |
||
200 | { |
||
201 | if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) { |
||
202 | throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined'); |
||
207 | } |
||
208 |