Server::read()   B
last analyzed

Complexity

Conditions 7
Paths 7

Size

Total Lines 28
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 56

Importance

Changes 0
Metric Value
cc 7
eloc 13
nc 7
nop 1
dl 0
loc 28
ccs 0
cts 16
cp 0
crap 56
rs 8.8333
c 0
b 0
f 0
1
<?php namespace Comodojo\Daemon\Socket;
2
3
use \Comodojo\Daemon\Process;
4
use \Comodojo\Daemon\Events\SocketEvent;
5
use \Comodojo\Foundation\Events\EventsTrait;
6
use \Comodojo\Foundation\Logging\LoggerTrait;
7
use \Comodojo\Foundation\Events\Manager as EventsManager;
8
use \Comodojo\Foundation\Validation\DataFilter;
9
use \Comodojo\RpcServer\RpcServer;
10
use \Psr\Log\LoggerInterface;
11
use \Comodojo\Exception\SocketException;
12
use \Exception;
13
14
/**
15
 * @package     Comodojo Daemon
16
 * @author      Marco Giovinazzi <[email protected]>
17
 * @license     MIT
18
 *
19
 * LICENSE:
20
 *
21
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27
 * THE SOFTWARE.
28
 */
29
30
class Server extends AbstractSocket {
31
32
    use EventsTrait;
33
    use LoggerTrait;
34
35
    /*
36
     * Socket default timeout
37
     * @var int
38
     */
39
    const DEFAULT_TIMEOUT = 10;
40
41
    /*
42
     * Socket default max client
43
     * @var int
44
     */
45
    const DEFAULT_MAX_CLIENTS = 10;
46
47
    private $active = false;
48
49
    private $process;
50
51
    private $timeout;
52
53
    private $connections = [];
54
55
    protected $rpc_server;
56
57
    protected $max_connections;
58
59
    /*
60
     * Class constructor
61
     *
62
     * @param int
63
     */
64
    public function __construct(
65
        $handler,
66
        LoggerInterface $logger,
67
        EventsManager $events,
68
        Process $process,
69
        $read_buffer = null,
70
        $timeout = null,
71
        $max_connections = null
72
    ) {
73
74
        parent::__construct($handler, $read_buffer);
75
76
        $this->logger = $logger;
77
        $this->events = $events;
78
        $this->process = $process;
79
80
        $this->timeout = is_null($timeout)
81
            ? self::DEFAULT_TIMEOUT
82
            : DataFilter::filterInteger($timeout, 0, 600, self::DEFAULT_TIMEOUT);
0 ignored issues
show
Bug introduced by
self::DEFAULT_TIMEOUT of type integer is incompatible with the type array expected by parameter $default of Comodojo\Foundation\Vali...Filter::filterInteger(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

82
            : DataFilter::filterInteger($timeout, 0, 600, /** @scrutinizer ignore-type */ self::DEFAULT_TIMEOUT);
Loading history...
Bug introduced by
600 of type integer is incompatible with the type array expected by parameter $max of Comodojo\Foundation\Vali...Filter::filterInteger(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

82
            : DataFilter::filterInteger($timeout, 0, /** @scrutinizer ignore-type */ 600, self::DEFAULT_TIMEOUT);
Loading history...
Bug introduced by
0 of type integer is incompatible with the type array expected by parameter $min of Comodojo\Foundation\Vali...Filter::filterInteger(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

82
            : DataFilter::filterInteger($timeout, /** @scrutinizer ignore-type */ 0, 600, self::DEFAULT_TIMEOUT);
Loading history...
83
84
        $this->max_connections = is_null($max_connections)
85
            ? self::DEFAULT_MAX_CLIENTS
86
            : DataFilter::filterInteger($max_connections, 1, 1024, self::DEFAULT_MAX_CLIENTS);
87
88
        $this->rpc_server = new RpcServer(RpcServer::XMLRPC, $logger);
89
90
        MethodsInjector::inject($this->rpc_server, $process);
91
92
    }
93
94
    public static function create(
95
        $handler,
96
        LoggerInterface $logger,
97
        EventsManager $events,
98
        Process $process,
99
        $read_buffer = null,
100
        $timeout = null
101
    ) {
102
103
        $socket = new Server($handler, $logger, $events, $process, $read_buffer, $timeout);
104
105
        return $socket->connect();
106
107
    }
108
109
    public function getRpcServer() {
110
111
        return $this->rpc_server;
112
113
    }
114
115
    public function connect() {
116
117
        $this->socket = @socket_create(
118
            $this->socket_domain,
119
            $this->socket_type,
120
            $this->socket_protocol
121
        );
122
123
        if ( $this->socket === false ) {
124
            $error = self::getSocketError();
125
            throw new SocketException("Socket unavailable: $error");
126
        }
127
128
        $bind = @socket_bind(
129
            $this->socket,
130
            $this->socket_resource,
131
            $this->socket_port
132
        );
133
134
        if ( $bind === false ) {
135
            $error = self::getSocketError($this->socket);
136
            throw new SocketException("Cannot bind socket: $error");
137
        }
138
139
        socket_set_nonblock($this->socket);
140
141
        return $this;
142
143
    }
144
145
    public function close() {
146
147
        $this->stop();
148
149
        @socket_shutdown($this->socket, 2);
0 ignored issues
show
Security Best Practice introduced by
It seems like you do not handle an error condition for socket_shutdown(). This can introduce security issues, and is generally not recommended. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unhandled  annotation

149
        /** @scrutinizer ignore-unhandled */ @socket_shutdown($this->socket, 2);

If you suppress an error, we recommend checking for the error condition explicitly:

// For example instead of
@mkdir($dir);

// Better use
if (@mkdir($dir) === false) {
    throw new \RuntimeException('The directory '.$dir.' could not be created.');
}
Loading history...
150
151
        $this->clean();
152
153
    }
154
155
    public function listen() {
156
157
        $listen = socket_listen($this->socket);
158
159
        if ( $listen === false ) {
160
            $error = self::getSocketError($this->socket);
161
            throw new SocketException("Cannot put socket in listening mode: $error");
162
        }
163
164
        $this->logger->debug("Socket listening on ".$this->handler);
165
166
        $this->active = true;
167
168
        try {
169
170
            do {
171
                $this->loop();
172
            } while ( $this->active );
173
174
        } catch (Exception $e) {
175
176
            $this->close();
177
178
            throw $e;
179
180
        }
181
182
    }
183
184
    public function stop() {
185
186
        $this->active = false;
187
188
    }
189
190
    public function clean() {
191
192
        if ( $this->socket_domain == AF_UNIX && file_exists($this->socket_resource) ) {
193
            unlink($this->socket_resource);
194
        }
195
196
    }
197
198
    protected function loop() {
199
200
        $sockets[0] = $this->socket;
0 ignored issues
show
Comprehensibility Best Practice introduced by
$sockets was never initialized. Although not strictly required by PHP, it is generally a good practice to add $sockets = array(); before regardless.
Loading history...
201
202
        $sockets = array_merge($sockets, array_map(function($connection) {
203
            return $connection->getSocket();
204
        }, $this->connections));
205
206
        $select = @socket_select($sockets, $write, $except, $this->timeout);
207
208
        if ( $select === false ) {
209
210
            if ( $this->checkSocketError() && $this->active ) {
211
                $this->logger->debug("Socket reset due to incoming signal");
212
                pcntl_signal_dispatch();
213
                return;
214
            }
215
216
            $socket_error_message = self::getSocketError($this->socket);
217
218
            throw new SocketException("Error selecting socket: $socket_error_message");
219
220
        }
221
222
        if ( $select < 1 ) {
223
            return;
224
        }
225
226
        if ( in_array($this->socket, $sockets) ) {
227
228
            for ( $i = 0; $i < $select; $i++ ) {
229
230
                if ( empty($this->connections[$i]) ) {
231
232
                    try {
233
234
                        $this->logger->debug("New incoming connection ($i)");
235
236
                        $this->connections[$i] = new Connection($this->socket, $i);
237
238
                        $this->open($this->connections[$i], 'connected');
239
240
                    } catch (SocketException $se) {
241
242
                        $this->logger->warning("Error accepting client: ".$se->getMessage());
243
244
                    }
245
246
                    unset($sockets[$i]);
247
248
                }
249
250
            }
251
252
        }
253
254
        for ( $i = 0; $i < $this->max_connections; $i++ ) {
255
256
            if ( isset($this->connections[$i]) ) {
257
258
                $client = $this->connections[$i];
259
260
                if ( in_array($client->getSocket(), $sockets) ) {
261
262
                    $message = $this->read($client);
263
264
                    if ( $message === null ) {
265
                    // if ($message == null) {
266
                         $this->hangup($client);
267
                    } else if ( $message === false ) {
268
                         continue;
269
                    } else {
270
                        $output = $this->serve($message);
271
                        $this->write($client, $output);
272
                    }
273
274
                }
275
276
            }
277
278
        }
279
280
    }
281
282
    private function write(Connection $connection, AbstractMessage $message) {
283
284
        $socket = $connection->getSocket();
285
        $datagram = $message->serialize()."\r\n";
286
287
        return @socket_write($socket, $datagram, strlen($datagram));
288
289
    }
290
291
    private function read(Connection $connection) {
292
293
        $datagram = '';
294
        $socket = $connection->getSocket();
295
296
        while ( true ) {
297
            $recv = @socket_read($socket, $this->read_buffer, PHP_NORMAL_READ);
298
            if ( $recv === false ) return null;
299
            $datagram .= $recv;
300
            if ( empty($recv) || strstr($recv, PHP_EOL) ) break;
301
        }
302
303
        $datagram = trim($datagram);
304
305
        if ( !empty($datagram) && $datagram !== false ) {
306
307
            $message = new Request();
308
309
            $message->unserialize($datagram);
310
311
            return $message;
312
313
        }
314
315
        // the null return is because of socket_read strange behaviour in
316
        // darwin/bsd OS. It will never match the PHP_EOL keeping socket channel
317
        // open indefinitely.
318
        return null;
319
320
    }
321
322
    private function serve(Request $request) {
323
324
        $response = new Response();
325
326
        if ( $request->content_type == 'application/json' ) {
0 ignored issues
show
Bug Best Practice introduced by
The property content_type does not exist on Comodojo\Daemon\Socket\Request. Since you implemented __get, consider adding a @property annotation.
Loading history...
327
            $this->rpc_server->setProtocol(RpcServer::JSONRPC);
328
        }
329
330
        try {
331
332
            $response->message = $this->rpc_server
0 ignored issues
show
Bug Best Practice introduced by
The property message does not exist on Comodojo\Daemon\Socket\Response. Since you implemented __set, consider adding a @property annotation.
Loading history...
333
                ->setPayload($request->message)
0 ignored issues
show
Bug Best Practice introduced by
The property message does not exist on Comodojo\Daemon\Socket\Request. Since you implemented __get, consider adding a @property annotation.
Loading history...
334
                ->serve();
335
336
            $response->status = true;
0 ignored issues
show
Bug Best Practice introduced by
The property status does not exist on Comodojo\Daemon\Socket\Response. Since you implemented __set, consider adding a @property annotation.
Loading history...
337
338
        } catch (Exception $e) {
339
340
            $response->message = $e->getMessage();
341
            $response->status = false;
342
343
        }
344
345
        return $response;
346
347
    }
348
349
    private function open(Connection $client, $status) {
350
351
        $idx = $client->getIndex();
352
353
        $this->logger->debug("Opening connection ($idx), sending greeter");
354
355
        $this->events->emit(new SocketEvent('client.connect', $this->process));
356
357
        $message = new Greeter();
358
359
        $message->status = $status;
0 ignored issues
show
Bug Best Practice introduced by
The property status does not exist on Comodojo\Daemon\Socket\Greeter. Since you implemented __set, consider adding a @property annotation.
Loading history...
360
361
        $this->write($client, $message);
362
363
    }
364
365
    private function hangup(Connection $connection) {
366
367
        $index = $connection->getIndex();
368
369
        $this->logger->debug("Client hangup ($index)");
370
371
        $this->connections[$index]->destroy();
372
        unset($this->connections[$index]);
373
374
        $this->events->emit(new SocketEvent('client.hangup', $this->process));
375
376
    }
377
378
    private function checkSocketError() {
379
380
        // this method is taken as-is from symphony ProcessPipes
381
        $lastError = error_get_last();
382
        // stream_select returns false when the `select` system call is interrupted by an incoming signal
383
        return isset($lastError['message']) && false !== stripos($lastError['message'], 'interrupted system call');
384
385
    }
386
387
}
388