Completed
Pull Request — master (#100)
by Maxime
02:42
created

Connection::close()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
c 0
b 0
f 0
ccs 0
cts 3
cp 0
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
3
/**
4
 * This file is a part of Woketo package.
5
 *
6
 * (c) Nekland <[email protected]>
7
 *
8
 * For the full license, take a look to the LICENSE file
9
 * on the root directory of this project
10
 */
11
12
namespace Nekland\Woketo\Server;
13
14
use Nekland\Woketo\Core\AbstractConnection;
15
use Nekland\Woketo\Exception\NoHandlerException;
16
use Nekland\Woketo\Exception\RuntimeException;
17
use Nekland\Woketo\Exception\WebsocketException;
18
use Nekland\Woketo\Http\Request;
19
use Nekland\Woketo\Http\Response;
20
use Nekland\Woketo\Message\MessageHandlerInterface;
21
use Nekland\Woketo\Rfc6455\Frame;
22
use Nekland\Woketo\Rfc6455\Message;
23
use Nekland\Woketo\Rfc6455\MessageProcessor;
24
use Nekland\Woketo\Rfc6455\Handshake\ServerHandshake;
25
use React\EventLoop\LoopInterface;
26
use React\EventLoop\Timer\TimerInterface;
27
use React\Socket\ConnectionInterface;
28
29
class Connection extends AbstractConnection
30
{
31
    /**
32
     * 5 seconds
33
     */
34
    const DEFAULT_TIMEOUT = 5;
35
36
    /**
37
     * @var Message
38
     */
39
    private $currentMessage;
40
41
    /**
42
     * @var LoopInterface
43
     */
44
    private $loop;
45
46
    /**
47
     * @var TimerInterface
48
     */
49
    private $timeout;
50
51
    /**
52
     * @var string
53
     */
54
    private $uri;
55
56 5
    public function __construct(
57
        ConnectionInterface $socketStream,
58
        \Closure $messageHandler,
59
        LoopInterface $loop,
60
        MessageProcessor $messageProcessor,
61
        ServerHandshake $handshake = null
62
    ) {
63 5
        parent::__construct($messageProcessor, $handshake ?: new ServerHandshake);
64 5
        $this->stream = $socketStream;
0 ignored issues
show
Documentation Bug introduced by
$socketStream is of type object<React\Socket\ConnectionInterface>, but the property $stream was declared to be of type object<React\Socket\Connection>. Are you sure that you always receive this specific sub-class here, or does it make sense to add an instanceof check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a given class or a super-class is assigned to a property that is type hinted more strictly.

Either this assignment is in error or an instanceof check should be added for that assignment.

class Alien {}

class Dalek extends Alien {}

class Plot
{
    /** @var  Dalek */
    public $villain;
}

$alien = new Alien();
$plot = new Plot();
if ($alien instanceof Dalek) {
    $plot->villain = $alien;
}
Loading history...
65 5
        $this->initListeners();
66 5
        $this->handler = $messageHandler;
67 5
        $this->loop = $loop;
68 5
    }
69
70 5
    private function initListeners()
71
    {
72
        $this->stream->on('data', function ($data) {
73 5
            $this->processData($data);
74 5
        });
75
        $this->stream->on('error', function ($data) {
76
            $this->error($data);
77 5
        });
78 5
    }
79
80 5
    private function processData($data)
81
    {
82
        try {
83 5
            if (!$this->handshakeDone) {
84 5
                $this->processHandshake($data);
85
            } else {
86
                $this->processMessage($data);
87
            }
88
89
            return;
90 5
        } catch (WebsocketException $e) {
91
            $this->messageProcessor->close($this->stream);
92
            $this->logger->notice('Connection to ' . $this->getIp() . ' closed with error : ' . $e->getMessage());
93
            $this->getHandler()->onError($e, $this);
94 5
        } catch (NoHandlerException $e) {
95
            $this->getLogger()->info(sprintf('No handler found for uri %s. Connection closed.', $this->uri));
96
            $this->close();
97
        }
98
    }
99
100
    /**
101
     * This method build a message and buffer data in case of incomplete data.
102
     *
103
     * @param string $data
104
     */
105
    protected function processMessage(string $data)
106
    {
107
        // It may be a timeout going (we were waiting for data), let's clear it.
108
        if ($this->timeout !== null) {
109
            $this->timeout->cancel();
110
            $this->timeout = null;
111
        }
112
113
        foreach ($this->messageProcessor->onData($data, $this->stream, $this->currentMessage) as $message) {
114
            $this->currentMessage = $message;
115
            if ($this->currentMessage->isComplete()) {
116
                // Sending the message through the woketo API.
117
                switch($this->currentMessage->getOpcode()) {
118
                    case Frame::OP_TEXT:
119
                        $this->getHandler()->onMessage($this->currentMessage->getContent(), $this);
120
                        break;
121
                    case Frame::OP_BINARY:
122
                        $this->getHandler()->onBinary($this->currentMessage->getContent(), $this);
123
                        break;
124
                }
125
                $this->currentMessage = null;
126
127
            } else {
128
                // We wait for more data so we start a timeout.
129
                $this->timeout = $this->loop->addTimer(Connection::DEFAULT_TIMEOUT, function () {
130
                    $this->logger->notice('Connection to ' . $this->getIp() . ' timed out.');
131
                    $this->messageProcessor->timeout($this->stream);
132
                });
133
            }
134
        }
135
    }
136
137
    /**
138
     * @param string|Frame $frame
139
     * @param int          $opCode An int representing binary or text data (const of Frame class)
140
     * @throws \Nekland\Woketo\Exception\RuntimeException
141
     */
142
    public function write($frame, int $opCode = Frame::OP_TEXT)
143
    {
144
        try {
145
            $this->messageProcessor->write($frame, $this->stream, $opCode);
146
        } catch (WebsocketException $e) {
147
            throw new RuntimeException($e);
148
        }
149
    }
150
151
    /**
152
     * @param mixed $data
153
     */
154
    protected function error($data)
155
    {
156
        $message = "A connectivity error occurred: " . $data;
157
        $this->logger->error($message);
158
        $this->getHandler()->onError(new WebsocketException($message), $this);
159
    }
160
161
    /**
162
     * If it's a new client, we need to make some special actions named the handshake.
163
     *
164
     * @param string $data
165
     */
166 5
    protected function processHandshake(string $data)
167
    {
168 5
        if ($this->handshakeDone) {
169
            return;
170
        }
171
172 5
        $request = Request::create($data);
173 5
        $this->handshake->verify($request);
174 5
        $this->uri = $request->getUri();
175 5
        $response = Response::createSwitchProtocolResponse();
176
        $this->handshake->sign($request, $response);
177
        $response->send($this->stream);
178
        
179
        $this->handshakeDone = true;
180
        $this->getHandler()->onConnection($this);
181
    }
182
183
    /**
184
     * Close the connection with normal close.
185
     */
186
    public function close()
187
    {
188
        $this->messageProcessor->close($this->stream);
189
    }
190
191
    /**
192
     * @return MessageHandlerInterface
193
     * @throws NoHandlerException
194
     */
195
    private function getHandler() : MessageHandlerInterface
196
    {
197
        if ($this->handler instanceof \Closure) {
198
            $handler = $this->handler;
199
            $handler = $handler($this->uri, $this);
200
201
            if (null === $handler) {
202
                throw new NoHandlerException(sprintf('No handler for request URI %s.', $this->uri));
203
            }
204
205
            $this->handler = $handler;
206
        }
207
208
        return $this->handler;
209
    }
210
}
211