Completed
Push — master ( 05544f...34252d )
by Maxime
02:24
created

Connection::getHandler()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 15
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 15
ccs 8
cts 8
cp 1
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 8
nc 3
nop 0
crap 3
1
<?php
2
3
/**
4
 * This file is a part of Woketo package.
5
 *
6
 * (c) Nekland <[email protected]>
7
 *
8
 * For the full license, take a look to the LICENSE file
9
 * on the root directory of this project
10
 */
11
12
namespace Nekland\Woketo\Server;
13
14
use Nekland\Woketo\Exception\NoHandlerException;
15
use Nekland\Woketo\Exception\RuntimeException;
16
use Nekland\Woketo\Exception\WebsocketException;
17
use Nekland\Woketo\Http\Request;
18
use Nekland\Woketo\Http\Response;
19
use Nekland\Woketo\Message\MessageHandlerInterface;
20
use Nekland\Woketo\Rfc6455\Frame;
21
use Nekland\Woketo\Rfc6455\Message;
22
use Nekland\Woketo\Rfc6455\MessageProcessor;
23
use Nekland\Woketo\Rfc6455\ServerHandshake;
24
use Psr\Log\LoggerAwareTrait;
25
use React\EventLoop\LoopInterface;
26
use React\EventLoop\Timer\TimerInterface;
27
use React\Socket\ConnectionInterface;
28
29
class Connection
30
{
31
    use LoggerAwareTrait;
32
33
    /**
34
     * 5 seconds
35
     */
36
    const DEFAULT_TIMEOUT = 5;
37
38
    /**
39
     * @var ConnectionInterface
40
     */
41
    private $socketStream;
42
43
    /**
44
     * @var MessageHandlerInterface|\Closure
45
     */
46
    private $handler;
47
48
    /**
49
     * @var bool
50
     */
51
    private $handshakeDone;
52
53
    /**
54
     * @var ServerHandshake
55
     */
56
    private $handshake;
57
58
    /**
59
     * @var Message
60
     */
61
    private $currentMessage;
62
63
    /**
64
     * @var MessageProcessor
65
     */
66
    private $messageProcessor;
67
68
    /**
69
     * @var LoopInterface
70
     */
71
    private $loop;
72
73
    /**
74
     * @var TimerInterface
75
     */
76
    private $timeout;
77
78
    /**
79
     * @var string
80
     */
81
    private $uri;
82
    
83 5
    public function __construct(
84
        ConnectionInterface $socketStream,
85
        \Closure $messageHandler,
86
        LoopInterface $loop,
87
        MessageProcessor $messageProcessor,
88
        ServerHandshake $handshake = null
89
    ) {
90 5
        $this->socketStream = $socketStream;
91 5
        $this->initListeners();
92 5
        $this->handler = $messageHandler;
93 5
        $this->handshake = $handshake ?: new ServerHandshake;
94 5
        $this->loop = $loop;
95 5
        $this->messageProcessor = $messageProcessor;
96 5
    }
97
98 5
    private function initListeners()
99
    {
100
        $this->socketStream->on('data', function ($data) {
101 5
            $this->processData($data);
102 5
        });
103
        $this->socketStream->on('error', function ($data) {
104
            $this->error($data);
105 5
        });
106 5
    }
107
108 5
    private function processData($data)
109
    {
110
        try {
111 5
            if (!$this->handshakeDone) {
112 5
                $this->processHandcheck($data);
113
            } else {
114 2
                $this->processMessage($data);
115
            }
116
117 4
            return;
118 1
        } catch (WebsocketException $e) {
119
            $this->messageProcessor->close($this->socketStream);
120
            $this->logger->notice('Connection to ' . $this->getIp() . ' closed with error : ' . $e->getMessage());
121
            $this->getHandler()->onError($e, $this);
122 1
        } catch (NoHandlerException $e) {
123 1
            $this->getLogger()->info(sprintf('No handler found for uri %s. Connection closed.', $this->uri));
124 1
            $this->close();
125
        }
126 1
    }
127
128
    /**
129
     * This method build a message and buffer data in case of incomplete data.
130
     *
131
     * @param string $data
132
     */
133 2
    protected function processMessage($data)
134
    {
135
        // It may be a timeout going (we were waiting for data), let's clear it.
136 2
        if ($this->timeout !== null) {
137
            $this->timeout->cancel();
138
            $this->timeout = null;
139
        }
140
141 2
        foreach ($this->messageProcessor->onData($data, $this->socketStream, $this->currentMessage) as $message) {
142 2
            $this->currentMessage = $message;
143 2
            if ($this->currentMessage->isComplete()) {
144
                // Sending the message through the woketo API.
145 2
                switch($this->currentMessage->getOpcode()) {
146 2
                    case Frame::OP_TEXT:
147 1
                        $this->getHandler()->onMessage($this->currentMessage->getContent(), $this);
148 1
                        break;
149 1
                    case Frame::OP_BINARY:
150 1
                        $this->getHandler()->onBinary($this->currentMessage->getContent(), $this);
151 1
                        break;
152
                }
153 2
                $this->currentMessage = null;
154
155
            } else {
156
                // We wait for more data so we start a timeout.
157 2
                $this->timeout = $this->loop->addTimer(Connection::DEFAULT_TIMEOUT, function () {
158
                    $this->logger->notice('Connection to ' . $this->getIp() . ' timed out.');
159
                    $this->messageProcessor->timeout($this->socketStream);
160 2
                });
161
            }
162
        }
163 2
    }
164
165
    /**
166
     * @param string|Frame $frame
167
     * @param int          $opCode An int representing binary or text data (const of Frame class)
168
     * @throws \Nekland\Woketo\Exception\RuntimeException
169
     */
170
    public function write($frame, int $opCode = Frame::OP_TEXT)
171
    {
172
        try {
173
            $this->messageProcessor->write($frame, $this->socketStream, $opCode);
174
        } catch (WebsocketException $e) {
175
            throw new RuntimeException($e);
176
        }
177
    }
178
179
    /**
180
     * @param mixed $data
181
     */
182
    protected function error($data)
183
    {
184
        $message = "A connectivity error occurred: " . $data;
185
        $this->logger->error($message);
186
        $this->getHandler()->onError(new WebsocketException($message), $this);
187
    }
188
189
    /**
190
     * If it's a new client, we need to make some special actions named the handshake.
191
     *
192
     * @param string $data
193
     */
194 5
    protected function processHandcheck($data)
195
    {
196 5
        if ($this->handshakeDone) {
197
            return;
198
        }
199
200 5
        $request = Request::create($data);
201 5
        $this->handshake->verify($request);
202 5
        $this->uri = $request->getUri();
203 5
        $response = Response::createSwitchProtocolResponse();
204 5
        $this->handshake->sign($request, $response);
0 ignored issues
show
Documentation introduced by
$request is of type object<Nekland\Woketo\Http\Request>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
205 5
        $response->send($this->socketStream);
206
        
207 5
        $this->handshakeDone = true;
208 5
        $this->getHandler()->onConnection($this);
209 4
    }
210
211
    /**
212
     * @return string
213
     */
214
    public function getIp()
215
    {
216
        return $this->socketStream->getRemoteAddress();
217
    }
218
219
    /**
220
     * @return \Psr\Log\LoggerInterface
221
     */
222 1
    public function getLogger()
223
    {
224 1
        return $this->logger;
225
    }
226
227
    /**
228
     * Close the connection with normal close.
229
     */
230 1
    public function close()
231
    {
232 1
        $this->messageProcessor->close($this->socketStream);
233 1
    }
234
235
    /**
236
     * @return MessageHandlerInterface
237
     * @throws NoHandlerException
238
     */
239 5
    private function getHandler()
240
    {
241 5
        if ($this->handler instanceof \Closure) {
242 5
            $handler = $this->handler;
243 5
            $handler = $handler($this->uri, $this);
244
245 5
            if (null === $handler) {
246 1
                throw new NoHandlerException(sprintf('No handler for request URI %s.', $this->uri));
247
            }
248
249 4
            return $this->handler = $handler;
250
        }
251
252 2
        return $this->handler;
253
    }
254
}
255