1
|
|
|
<?php |
2
|
|
|
/** |
3
|
|
|
* Dead simple, high performance, drop-in bridge to Golang RPC with zero dependencies |
4
|
|
|
* |
5
|
|
|
* @author Wolfy-J |
6
|
|
|
*/ |
7
|
|
|
|
8
|
|
|
namespace Spiral\Goridge; |
9
|
|
|
|
10
|
|
|
use Spiral\Goridge\Exceptions\TransportException; |
11
|
|
|
use Spiral\Goridge\Exceptions\PrefixException; |
12
|
|
|
use Spiral\Goridge\Exceptions\RelayException; |
13
|
|
|
|
14
|
|
|
/** |
15
|
|
|
* Communicates with remote server/client over be-directional socket using byte payload: |
16
|
|
|
* |
17
|
|
|
* [ prefix ][ payload ] |
18
|
|
|
* [ 1+8 bytes ][ message length ] |
19
|
|
|
* |
20
|
|
|
* prefix: |
21
|
|
|
* [ flag ][ message length, unsigned int 64bits, LittleEndian ] |
22
|
|
|
*/ |
23
|
|
|
class SocketRelay implements RelayInterface |
24
|
|
|
{ |
25
|
|
|
/** Supported socket types. */ |
26
|
|
|
const SOCK_TPC = 0; |
27
|
|
|
const SOCK_UNIX = 1; |
28
|
|
|
|
29
|
|
|
/** @var string */ |
30
|
|
|
private $address; |
31
|
|
|
|
32
|
|
|
/** @var int|null */ |
33
|
|
|
private $port; |
34
|
|
|
|
35
|
|
|
/** @var int */ |
36
|
|
|
private $type; |
37
|
|
|
|
38
|
|
|
/** @var resource|null */ |
39
|
|
|
private $socket; |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* Example: |
43
|
|
|
* $relay = new SocketRelay("localhost", 7000); |
44
|
|
|
* $relay = new SocketRelay("/tmp/rpc.sock", null, Socket::UNIX_SOCKET); |
45
|
|
|
* |
46
|
|
|
* @param string $address Localhost, ip address or hostname. |
47
|
|
|
* @param int|null $port Ignored for UNIX sockets. |
48
|
|
|
* @param int $type Default: TPC_SOCKET |
49
|
|
|
* |
50
|
|
|
* @throws Exceptions\InvalidArgumentException |
51
|
|
|
*/ |
52
|
|
|
public function __construct(string $address, int $port = null, int $type = self::SOCK_TPC) |
53
|
|
|
{ |
54
|
|
|
switch ($type) { |
55
|
|
|
case self::SOCK_TPC: |
56
|
|
|
if ($port === null) { |
57
|
|
|
throw new Exceptions\InvalidArgumentException(sprintf( |
58
|
|
|
"no port given for TPC socket on '%s'", |
59
|
|
|
$address |
60
|
|
|
)); |
61
|
|
|
} |
62
|
|
|
break; |
63
|
|
|
case self::SOCK_UNIX: |
64
|
|
|
$port = null; |
65
|
|
|
break; |
66
|
|
|
default: |
67
|
|
|
throw new Exceptions\InvalidArgumentException(sprintf( |
68
|
|
|
"undefined connection type %s on '%s'", |
69
|
|
|
$type, |
70
|
|
|
$address |
71
|
|
|
)); |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
$this->address = $address; |
75
|
|
|
$this->port = $port; |
76
|
|
|
$this->type = $type; |
77
|
|
|
} |
78
|
|
|
|
79
|
|
|
/** |
80
|
|
|
* {@inheritdoc} |
81
|
|
|
*/ |
82
|
|
|
public function send($payload, int $flags = null): self |
83
|
|
|
{ |
84
|
|
|
$this->connect(); |
85
|
|
|
|
86
|
|
|
$size = strlen($payload); |
87
|
|
|
if ($flags & self::PAYLOAD_NONE && $size != 0) { |
88
|
|
|
throw new TransportException("unable to send payload with PAYLOAD_NONE flag"); |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
socket_send($this->socket, pack('CP', $flags, $size), 9, 0); |
92
|
|
|
|
93
|
|
|
if (!($flags & self::PAYLOAD_NONE)) { |
94
|
|
|
socket_send($this->socket, $payload, $size, 0); |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
return $this; |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
/** |
101
|
|
|
* {@inheritdoc} |
102
|
|
|
*/ |
103
|
|
|
public function receiveSync(int &$flags = null) |
104
|
|
|
{ |
105
|
|
|
$this->connect(); |
106
|
|
|
|
107
|
|
|
$prefix = $this->fetchPrefix(); |
108
|
|
|
$flags = $prefix['flags']; |
109
|
|
|
$result = null; |
110
|
|
|
|
111
|
|
|
if ($prefix['size'] !== 0) { |
112
|
|
|
$readBytes = $prefix['size']; |
113
|
|
|
$buffer = null; |
114
|
|
|
|
115
|
|
|
//Add ability to write to stream in a future |
116
|
|
|
while ($readBytes > 0) { |
117
|
|
|
$bufferLength = socket_recv( |
118
|
|
|
$this->socket, |
119
|
|
|
$buffer, |
120
|
|
|
min(self::BUFFER_SIZE, $readBytes), |
121
|
|
|
MSG_WAITALL |
122
|
|
|
); |
123
|
|
|
|
124
|
|
|
$result .= $buffer; |
125
|
|
|
$readBytes -= $bufferLength; |
126
|
|
|
} |
127
|
|
|
} |
128
|
|
|
|
129
|
|
|
return $result; |
130
|
|
|
} |
131
|
|
|
|
132
|
|
|
/** |
133
|
|
|
* @return string |
134
|
|
|
*/ |
135
|
|
|
public function getAddress(): string |
136
|
|
|
{ |
137
|
|
|
return $this->address; |
138
|
|
|
} |
139
|
|
|
|
140
|
|
|
/** |
141
|
|
|
* @return int|null |
142
|
|
|
*/ |
143
|
|
|
public function getPort() |
144
|
|
|
{ |
145
|
|
|
return $this->port; |
146
|
|
|
} |
147
|
|
|
|
148
|
|
|
/** |
149
|
|
|
* @return int |
150
|
|
|
*/ |
151
|
|
|
public function getType(): int |
152
|
|
|
{ |
153
|
|
|
return $this->type; |
154
|
|
|
} |
155
|
|
|
|
156
|
|
|
/** |
157
|
|
|
* @return bool |
158
|
|
|
*/ |
159
|
|
|
public function isConnected(): bool |
160
|
|
|
{ |
161
|
|
|
return $this->socket != null; |
162
|
|
|
} |
163
|
|
|
|
164
|
|
|
/** |
165
|
|
|
* Ensure socket connection. Returns true if socket successfully connected |
166
|
|
|
* or have already been connected. |
167
|
|
|
* |
168
|
|
|
* @return bool |
169
|
|
|
* |
170
|
|
|
* @throws RelayException |
171
|
|
|
* @throws \Error When sockets are used in unsupported environment. |
172
|
|
|
*/ |
173
|
|
|
public function connect(): bool |
174
|
|
|
{ |
175
|
|
|
if ($this->isConnected()) { |
176
|
|
|
return true; |
177
|
|
|
} |
178
|
|
|
|
179
|
|
|
$this->socket = $this->createSocket(); |
180
|
|
|
try { |
181
|
|
|
if (socket_connect($this->socket, $this->address, $this->port) === false) { |
182
|
|
|
throw new RelayException(socket_strerror(socket_last_error($this->socket))); |
183
|
|
|
} |
184
|
|
|
} catch (\Exception $e) { |
185
|
|
|
throw new RelayException("unable to establish connection {$this}", 0, $e); |
186
|
|
|
} |
187
|
|
|
|
188
|
|
|
return true; |
189
|
|
|
} |
190
|
|
|
|
191
|
|
|
/** |
192
|
|
|
* Close connection. |
193
|
|
|
* |
194
|
|
|
* @throws RelayException |
195
|
|
|
*/ |
196
|
|
|
public function close() |
197
|
|
|
{ |
198
|
|
|
if (!$this->isConnected()) { |
199
|
|
|
throw new RelayException("unable to close socket '{$this}', socket already closed"); |
200
|
|
|
} |
201
|
|
|
|
202
|
|
|
socket_close($this->socket); |
203
|
|
|
$this->socket = null; |
204
|
|
|
} |
205
|
|
|
|
206
|
|
|
/** |
207
|
|
|
* Destruct connection and disconnect. |
208
|
|
|
*/ |
209
|
|
|
public function __destruct() |
210
|
|
|
{ |
211
|
|
|
if ($this->isConnected()) { |
212
|
|
|
$this->close(); |
213
|
|
|
} |
214
|
|
|
} |
215
|
|
|
|
216
|
|
|
/** |
217
|
|
|
* @return string |
218
|
|
|
*/ |
219
|
|
|
public function __toString(): string |
220
|
|
|
{ |
221
|
|
|
if ($this->type == self::SOCK_TPC) { |
222
|
|
|
return "tcp://{$this->address}:{$this->port}"; |
223
|
|
|
} |
224
|
|
|
|
225
|
|
|
return "unix://{$this->address}"; |
226
|
|
|
} |
227
|
|
|
|
228
|
|
|
/** |
229
|
|
|
* @return array Prefix [flag, length] |
230
|
|
|
* |
231
|
|
|
* @throws PrefixException |
232
|
|
|
*/ |
233
|
|
|
private function fetchPrefix(): array |
234
|
|
|
{ |
235
|
|
|
$prefixLength = socket_recv($this->socket, $prefixBody, 9, MSG_WAITALL); |
236
|
|
|
if ($prefixBody === false || $prefixLength !== 9) { |
237
|
|
|
throw new PrefixException(sprintf( |
238
|
|
|
"unable to read prefix from socket: %s", |
239
|
|
|
socket_strerror(socket_last_error($this->socket)) |
240
|
|
|
)); |
241
|
|
|
} |
242
|
|
|
|
243
|
|
|
$result = unpack("Cflags/Psize", $prefixBody); |
244
|
|
|
if (!is_array($result)) { |
245
|
|
|
throw new Exceptions\PrefixException("invalid prefix"); |
246
|
|
|
} |
247
|
|
|
|
248
|
|
|
return $result; |
249
|
|
|
} |
250
|
|
|
|
251
|
|
|
/** |
252
|
|
|
* @return resource |
253
|
|
|
* @throws \Error |
254
|
|
|
*/ |
255
|
|
|
private function createSocket() |
256
|
|
|
{ |
257
|
|
|
if ($this->type === self::SOCK_UNIX) { |
258
|
|
|
if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') { |
259
|
|
|
throw new \Error("socket {$this} unavailable on Windows"); |
|
|
|
|
260
|
|
|
} |
261
|
|
|
|
262
|
|
|
return socket_create(1, SOCK_STREAM, SOL_SOCKET); |
263
|
|
|
} |
264
|
|
|
|
265
|
|
|
return socket_create(AF_INET, SOCK_STREAM, SOL_TCP); |
266
|
|
|
} |
267
|
|
|
} |
268
|
|
|
|
This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.
If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.
In this case you can add the
@ignore
PhpDoc annotation to the duplicate definition and it will be ignored.