1
|
|
|
<?php namespace Comodojo\Daemon\Socket; |
2
|
|
|
|
3
|
|
|
use \Comodojo\Daemon\Process; |
4
|
|
|
use \Comodojo\Daemon\Events\SocketEvent; |
5
|
|
|
use \Comodojo\Foundation\Events\EventsTrait; |
6
|
|
|
use \Comodojo\Foundation\Logging\LoggerTrait; |
7
|
|
|
use \Comodojo\Foundation\Events\Manager as EventsManager; |
8
|
|
|
use \Comodojo\Foundation\Validation\DataFilter; |
9
|
|
|
use \Comodojo\RpcServer\RpcServer; |
10
|
|
|
use \Psr\Log\LoggerInterface; |
11
|
|
|
use \Comodojo\Exception\SocketException; |
12
|
|
|
use \Exception; |
13
|
|
|
|
14
|
|
|
/** |
15
|
|
|
* @package Comodojo Daemon |
16
|
|
|
* @author Marco Giovinazzi <[email protected]> |
17
|
|
|
* @license MIT |
18
|
|
|
* |
19
|
|
|
* LICENSE: |
20
|
|
|
* |
21
|
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
22
|
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
23
|
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
24
|
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
25
|
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
26
|
|
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
27
|
|
|
* THE SOFTWARE. |
28
|
|
|
*/ |
29
|
|
|
|
30
|
|
|
class Server extends AbstractSocket { |
31
|
|
|
|
32
|
|
|
use EventsTrait; |
33
|
|
|
use LoggerTrait; |
34
|
|
|
|
35
|
|
|
const DEFAULT_TIMEOUT = 10; |
36
|
|
|
|
37
|
|
|
const DEFAULT_MAX_CLIENTS = 10; |
38
|
|
|
|
39
|
|
|
private $active = false; |
40
|
|
|
|
41
|
|
|
private $process; |
42
|
|
|
|
43
|
|
|
private $timeout; |
44
|
|
|
|
45
|
|
|
private $connections = []; |
46
|
|
|
|
47
|
|
|
// protected $commands; |
48
|
|
|
protected $rpc_server; |
49
|
|
|
|
50
|
|
|
protected $max_connections; |
51
|
|
|
|
52
|
|
|
public function __construct( |
53
|
|
|
$handler, |
54
|
|
|
LoggerInterface $logger, |
55
|
|
|
EventsManager $events, |
56
|
|
|
Process $process, |
57
|
|
|
$read_buffer = null, |
58
|
|
|
$timeout = null, |
59
|
|
|
$max_connections = null |
60
|
|
|
) { |
61
|
|
|
|
62
|
|
|
parent::__construct($handler, $read_buffer); |
63
|
|
|
|
64
|
|
|
$this->logger = $logger; |
65
|
|
|
$this->events = $events; |
66
|
|
|
$this->process = $process; |
67
|
|
|
|
68
|
|
|
$this->timeout = is_null($timeout) |
69
|
|
|
? self::DEFAULT_TIMEOUT |
70
|
|
|
: DataFilter::filterInteger($timeout, 0, 600, self::DEFAULT_TIMEOUT); |
71
|
|
|
|
72
|
|
|
$this->max_connections = is_null($max_connections) |
73
|
|
|
? self::DEFAULT_MAX_CLIENTS |
74
|
|
|
: DataFilter::filterInteger($max_connections, 1, 1024, self::DEFAULT_MAX_CLIENTS); |
75
|
|
|
|
76
|
|
|
// $this->commands = new Commands(); |
|
|
|
|
77
|
|
|
$this->rpc_server = new RpcServer(RpcServer::XMLRPC); |
78
|
|
|
|
79
|
|
|
MethodsInjector::inject($this->rpc_server, $process); |
|
|
|
|
80
|
|
|
|
81
|
|
|
} |
82
|
|
|
|
83
|
|
|
public static function create( |
84
|
|
|
$handler, |
85
|
|
|
LoggerInterface $logger, |
86
|
|
|
EventsManager $events, |
87
|
|
|
Process $process, |
88
|
|
|
$read_buffer = null, |
89
|
|
|
$timeout = null |
90
|
|
|
) { |
91
|
|
|
|
92
|
|
|
$socket = new Server($handler, $logger, $events, $process, $read_buffer, $timeout); |
93
|
|
|
|
94
|
|
|
return $socket->connect(); |
95
|
|
|
|
96
|
|
|
} |
97
|
|
|
|
98
|
|
|
// public function getCommands() { |
|
|
|
|
99
|
|
|
// |
100
|
|
|
// return $this->commands; |
|
|
|
|
101
|
|
|
// |
102
|
|
|
// } |
103
|
|
|
|
104
|
|
|
public function getRpcServer() { |
105
|
|
|
|
106
|
|
|
return $this->rpc_server; |
107
|
|
|
|
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
public function connect() { |
111
|
|
|
|
112
|
|
|
$this->socket = @socket_create( |
113
|
|
|
$this->socket_domain, |
114
|
|
|
$this->socket_type, |
115
|
|
|
$this->socket_protocol |
116
|
|
|
); |
117
|
|
|
|
118
|
|
|
if ( $this->socket === false ) { |
119
|
|
|
$error = self::getSocketError(); |
120
|
|
|
throw new SocketException("Socket unavailable: $error"); |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
$bind = @socket_bind( |
124
|
|
|
$this->socket, |
125
|
|
|
$this->socket_resource, |
126
|
|
|
$this->socket_port |
127
|
|
|
); |
128
|
|
|
|
129
|
|
View Code Duplication |
if ( $bind === false ) { |
|
|
|
|
130
|
|
|
$error = self::getSocketError($this->socket); |
131
|
|
|
throw new SocketException("Cannot bind socket: $error"); |
132
|
|
|
} |
133
|
|
|
|
134
|
|
|
socket_set_nonblock($this->socket); |
135
|
|
|
|
136
|
|
|
return $this; |
137
|
|
|
|
138
|
|
|
} |
139
|
|
|
|
140
|
|
|
public function close() { |
141
|
|
|
|
142
|
|
|
$this->stop(); |
143
|
|
|
|
144
|
|
|
@socket_shutdown($this->socket, 2); |
|
|
|
|
145
|
|
|
|
146
|
|
|
$this->clean(); |
147
|
|
|
|
148
|
|
|
} |
149
|
|
|
|
150
|
|
|
public function listen() { |
151
|
|
|
|
152
|
|
|
$listen = socket_listen($this->socket); |
153
|
|
|
|
154
|
|
View Code Duplication |
if ( $listen === false ) { |
|
|
|
|
155
|
|
|
$error = self::getSocketError($this->socket); |
156
|
|
|
throw new SocketException("Cannot put socket in listening mode: $error"); |
157
|
|
|
} |
158
|
|
|
|
159
|
|
|
$this->logger->debug("Socket listening on ".$this->handler); |
160
|
|
|
|
161
|
|
|
$this->active = true; |
162
|
|
|
|
163
|
|
|
try { |
164
|
|
|
|
165
|
|
|
do { |
166
|
|
|
$this->loop(); |
167
|
|
|
} while ($this->active); |
168
|
|
|
|
169
|
|
|
} catch (Exception $e) { |
170
|
|
|
|
171
|
|
|
$this->close(); |
172
|
|
|
|
173
|
|
|
throw $e; |
174
|
|
|
|
175
|
|
|
} |
176
|
|
|
|
177
|
|
|
} |
178
|
|
|
|
179
|
|
|
public function stop() { |
180
|
|
|
|
181
|
|
|
$this->active = false; |
182
|
|
|
|
183
|
|
|
} |
184
|
|
|
|
185
|
|
|
public function clean() { |
186
|
|
|
|
187
|
|
|
if ( $this->socket_domain == AF_UNIX && file_exists($this->socket_resource) ) { |
188
|
|
|
unlink($this->socket_resource); |
189
|
|
|
} |
190
|
|
|
|
191
|
|
|
} |
192
|
|
|
|
193
|
|
|
protected function loop() { |
194
|
|
|
|
195
|
|
|
$sockets[0] = $this->socket; |
|
|
|
|
196
|
|
|
|
197
|
|
|
$sockets = array_merge($sockets, array_map(function($connection) { |
198
|
|
|
return $connection->getSocket(); |
199
|
|
|
}, $this->connections)); |
200
|
|
|
|
201
|
|
|
// for ($i = 0; $i < $this->max_connections; $i++) { |
|
|
|
|
202
|
|
|
// if (isset($this->connections[$i])) { |
|
|
|
|
203
|
|
|
// $sockets[$i + 1] = $this->connections[$i]->getSocket(); |
|
|
|
|
204
|
|
|
// } |
205
|
|
|
// } |
206
|
|
|
|
207
|
|
|
$select = @socket_select($sockets, $write, $except, $this->timeout); |
208
|
|
|
|
209
|
|
|
if ($select === false) { |
210
|
|
|
|
211
|
|
|
if ( $this->checkSocketError() && $this->active ) { |
212
|
|
|
$this->logger->debug("Socket reset due to incoming signal"); |
213
|
|
|
pcntl_signal_dispatch(); |
214
|
|
|
return; |
215
|
|
|
} |
216
|
|
|
|
217
|
|
|
$socket_error_message = self::getSocketError($this->socket); |
218
|
|
|
|
219
|
|
|
throw new SocketException("Error selecting socket: $socket_error_message"); |
220
|
|
|
|
221
|
|
|
} |
222
|
|
|
|
223
|
|
|
if( $select < 1 ) { |
224
|
|
|
return; |
225
|
|
|
} |
226
|
|
|
|
227
|
|
|
if( in_array($this->socket, $sockets) ) { |
228
|
|
|
|
229
|
|
|
for ($i=0; $i < $select; $i++) { |
230
|
|
|
|
231
|
|
|
if ( empty($this->connections[$i]) ) { |
232
|
|
|
|
233
|
|
|
try { |
234
|
|
|
|
235
|
|
|
$this->logger->debug("New incoming connection ($i)"); |
236
|
|
|
|
237
|
|
|
$this->connections[$i] = new Connection($this->socket, $i); |
238
|
|
|
|
239
|
|
|
$this->open($this->connections[$i], 'connected'); |
240
|
|
|
|
241
|
|
|
} catch (SocketException $se) { |
242
|
|
|
|
243
|
|
|
$this->logger->warning("Error accepting client: ".$se->getMessage()); |
244
|
|
|
|
245
|
|
|
} |
246
|
|
|
|
247
|
|
|
unset($sockets[$i]); |
248
|
|
|
|
249
|
|
|
} |
250
|
|
|
|
251
|
|
|
} |
252
|
|
|
|
253
|
|
|
// for ($i=0; $i < $this->max_connections; $i++) { |
|
|
|
|
254
|
|
|
// |
255
|
|
|
// if ( empty($this->connections[$i]) ) { |
|
|
|
|
256
|
|
|
// |
257
|
|
|
// try { |
258
|
|
|
// |
259
|
|
|
// $this->logger->info("New incoming connection ($i)"); |
|
|
|
|
260
|
|
|
// |
261
|
|
|
// $this->connections[$i] = new Connection($this->socket, $i); |
|
|
|
|
262
|
|
|
// |
263
|
|
|
// $this->open($this->connections[$i], 'connected'); |
|
|
|
|
264
|
|
|
// |
265
|
|
|
// } catch (SocketException $se) { |
|
|
|
|
266
|
|
|
// |
267
|
|
|
// $this->logger->warning("Error accepting client: ".$se->getMessage()); |
|
|
|
|
268
|
|
|
// |
269
|
|
|
// } |
270
|
|
|
// |
271
|
|
|
// unset($sockets[$i]); |
|
|
|
|
272
|
|
|
// |
273
|
|
|
// } |
274
|
|
|
// |
275
|
|
|
// } |
276
|
|
|
|
277
|
|
|
} |
278
|
|
|
|
279
|
|
|
for ($i = 0; $i < $this->max_connections; $i++) { |
280
|
|
|
|
281
|
|
|
if (isset($this->connections[$i])) { |
282
|
|
|
|
283
|
|
|
$client = $this->connections[$i]; |
284
|
|
|
|
285
|
|
|
if (in_array($client->getSocket(), $sockets)) { |
286
|
|
|
|
287
|
|
|
$message = $this->read($client); |
288
|
|
|
|
289
|
|
|
if ($message === null) { |
290
|
|
|
// if ($message == null) { |
|
|
|
|
291
|
|
|
$this->hangup($client); |
292
|
|
|
} else if ( $message === false ) { |
293
|
|
|
continue; |
294
|
|
|
} else { |
295
|
|
|
$output = $this->serve($message); |
296
|
|
|
$this->write($client, $output); |
297
|
|
|
} |
298
|
|
|
|
299
|
|
|
} |
300
|
|
|
|
301
|
|
|
} |
302
|
|
|
|
303
|
|
|
} |
304
|
|
|
|
305
|
|
|
} |
306
|
|
|
|
307
|
|
|
private function write(Connection $connection, AbstractMessage $message) { |
308
|
|
|
|
309
|
|
|
$socket = $connection->getSocket(); |
310
|
|
|
$datagram = $message->serialize()."\r\n"; |
311
|
|
|
|
312
|
|
|
return @socket_write($socket, $datagram, strlen($datagram)); |
313
|
|
|
|
314
|
|
|
} |
315
|
|
|
|
316
|
|
|
private function read(Connection $connection) { |
317
|
|
|
|
318
|
|
|
$datagram = ''; |
319
|
|
|
$socket = $connection->getSocket(); |
320
|
|
|
|
321
|
|
View Code Duplication |
while (true) { |
|
|
|
|
322
|
|
|
$recv = @socket_read($socket, $this->read_buffer, PHP_NORMAL_READ); |
323
|
|
|
// if ( $recv === false ) break; |
|
|
|
|
324
|
|
|
// if ( $recv === 0 ) return null; |
|
|
|
|
325
|
|
|
if ( $recv === false ) return null; |
326
|
|
|
$datagram .= $recv; |
327
|
|
|
if (empty($recv) || strstr($recv, PHP_EOL)) break; |
328
|
|
|
} |
329
|
|
|
|
330
|
|
|
$datagram = trim($datagram); |
331
|
|
|
|
332
|
|
|
if ( !empty($datagram) && $datagram !== false) { |
333
|
|
|
|
334
|
|
|
$message = new Request(); |
335
|
|
|
|
336
|
|
|
$message->unserialize($datagram); |
337
|
|
|
|
338
|
|
|
return $message; |
339
|
|
|
|
340
|
|
|
} |
341
|
|
|
|
342
|
|
|
return false; |
343
|
|
|
|
344
|
|
|
} |
345
|
|
|
|
346
|
|
|
private function serve(Request $request) { |
347
|
|
|
|
348
|
|
|
$response = new Response(); |
349
|
|
|
|
350
|
|
|
if ( $request->content_type == 'application/json') { |
|
|
|
|
351
|
|
|
$this->rpc_server->setProtocol(RpcServer::JSONRPC); |
352
|
|
|
} |
353
|
|
|
|
354
|
|
|
try { |
355
|
|
|
|
356
|
|
|
$response->message = $this->rpc_server |
|
|
|
|
357
|
|
|
->setPayload($request->message) |
|
|
|
|
358
|
|
|
->serve(); |
359
|
|
|
|
360
|
|
|
$response->status = true; |
|
|
|
|
361
|
|
|
|
362
|
|
|
} catch (Exception $e) { |
363
|
|
|
|
364
|
|
|
$response->message = $e->getMessage(); |
|
|
|
|
365
|
|
|
$response->status = false; |
|
|
|
|
366
|
|
|
|
367
|
|
|
} |
368
|
|
|
|
369
|
|
|
// if ( $this->commands->has($request->command) ) { |
|
|
|
|
370
|
|
|
// |
371
|
|
|
// $callable = $this->commands->get($request->command); |
|
|
|
|
372
|
|
|
// |
373
|
|
|
// try { |
374
|
|
|
// |
375
|
|
|
// $response->message = call_user_func( |
376
|
|
|
// $callable, |
377
|
|
|
// $this->process, |
378
|
|
|
// $request->payload |
379
|
|
|
// ); |
380
|
|
|
// |
381
|
|
|
// $response->status = true; |
|
|
|
|
382
|
|
|
// |
383
|
|
|
// } catch (Exception $e) { |
|
|
|
|
384
|
|
|
// |
385
|
|
|
// $response->status = false; |
|
|
|
|
386
|
|
|
// |
387
|
|
|
// $response->message = $e->getMessage(); |
|
|
|
|
388
|
|
|
// |
389
|
|
|
// } |
390
|
|
|
// |
391
|
|
|
// return $response; |
392
|
|
|
// |
393
|
|
|
// } |
394
|
|
|
// |
395
|
|
|
// $response->status = false; |
|
|
|
|
396
|
|
|
// |
397
|
|
|
// $response->message = "Unknown command"; |
|
|
|
|
398
|
|
|
|
399
|
|
|
return $response; |
400
|
|
|
|
401
|
|
|
} |
402
|
|
|
|
403
|
|
|
private function open(Connection $client, $status) { |
404
|
|
|
|
405
|
|
|
$idx = $client->getIndex(); |
406
|
|
|
|
407
|
|
|
$this->logger->debug("Opening connection ($idx), sending greeter"); |
408
|
|
|
|
409
|
|
|
$this->events->emit( new SocketEvent('client.connect', $this->process) ); |
410
|
|
|
|
411
|
|
|
$message = new Greeter(); |
412
|
|
|
|
413
|
|
|
$message->status = $status; |
|
|
|
|
414
|
|
|
|
415
|
|
|
$this->write($client, $message); |
416
|
|
|
|
417
|
|
|
} |
418
|
|
|
|
419
|
|
|
private function hangup(Connection $connection) { |
420
|
|
|
|
421
|
|
|
$index = $connection->getIndex(); |
422
|
|
|
|
423
|
|
|
$this->logger->debug("Client hangup ($index)"); |
424
|
|
|
|
425
|
|
|
$this->connections[$index]->destroy(); |
426
|
|
|
unset($this->connections[$index]); |
427
|
|
|
|
428
|
|
|
$this->events->emit( new SocketEvent('client.hangup', $this->process) ); |
429
|
|
|
|
430
|
|
|
} |
431
|
|
|
|
432
|
|
|
private function checkSocketError() { |
433
|
|
|
|
434
|
|
|
// this method is taken as-is from symphony ProcessPipes |
435
|
|
|
$lastError = error_get_last(); |
436
|
|
|
// stream_select returns false when the `select` system call is interrupted by an incoming signal |
437
|
|
|
return isset($lastError['message']) && false !== stripos($lastError['message'], 'interrupted system call'); |
438
|
|
|
|
439
|
|
|
} |
440
|
|
|
|
441
|
|
|
} |
442
|
|
|
|
Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.
The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.
This check looks for comments that seem to be mostly valid code and reports them.