Completed
Pull Request — master (#28)
by Maxime
03:40
created

Connection::processMessage()   B

Complexity

Conditions 6
Paths 10

Size

Total Lines 33
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 33
rs 8.439
cc 6
eloc 19
nc 10
nop 1
1
<?php
2
3
/**
4
 * This file is a part of Woketo package.
5
 *
6
 * (c) Ci-tron <[email protected]>
7
 *
8
 * For the full license, take a look to the LICENSE file
9
 * on the root directory of this project
10
 */
11
12
namespace Nekland\Woketo\Server;
13
14
use Nekland\Woketo\Exception\RuntimeException;
15
use Nekland\Woketo\Exception\WebsocketException;
16
use Nekland\Woketo\Http\Request;
17
use Nekland\Woketo\Http\Response;
18
use Nekland\Woketo\Message\MessageHandlerInterface;
19
use Nekland\Woketo\Rfc6455\Frame;
20
use Nekland\Woketo\Rfc6455\Message;
21
use Nekland\Woketo\Rfc6455\MessageProcessor;
22
use Nekland\Woketo\Rfc6455\ServerHandshake;
23
use React\EventLoop\LoopInterface;
24
use React\EventLoop\Timer\TimerInterface;
25
use React\Socket\ConnectionInterface;
26
27
class Connection
28
{
29
    /**
30
     * 5 seconds
31
     */
32
    const DEFAULT_TIMEOUT = 5;
33
34
    /**
35
     * @var ConnectionInterface
36
     */
37
    private $socketStream;
38
39
    /**
40
     * @var MessageHandlerInterface
41
     */
42
    private $handler;
43
44
    /**
45
     * @var bool
46
     */
47
    private $handshakeDone;
48
49
    /**
50
     * @var ServerHandshake
51
     */
52
    private $handshake;
53
54
    /**
55
     * @var Message
56
     */
57
    private $currentMessage;
58
59
    /**
60
     * @var MessageProcessor
61
     */
62
    private $messageProcessor;
63
64
    /**
65
     * @var LoopInterface
66
     */
67
    private $loop;
68
69
    /**
70
     * @var TimerInterface
71
     */
72
    private $timeout;
73
    
74
    public function __construct(ConnectionInterface $socketStream, MessageHandlerInterface $messageHandler, LoopInterface $loop, MessageProcessor $messageProcessor, ServerHandshake $handshake = null)
75
    {
76
        $this->socketStream = $socketStream;
77
        $this->initListeners();
78
        $this->handler = $messageHandler;
79
        $this->handshake = $handshake ?: new ServerHandshake;
80
        $this->loop = $loop;
81
        $this->messageProcessor = $messageProcessor;
82
    }
83
84
    private function initListeners()
85
    {
86
        $this->socketStream->on('data', function ($data) {
87
            $this->processData($data);
88
        });
89
        $this->socketStream->on('error', [$this, 'error']);
90
    }
91
92
    private function processData($data)
93
    {
94
        try {
95
            if (!$this->handshakeDone) {
96
                $this->processHandcheck($data);
97
            } else {
98
                $this->processMessage($data);
99
            }
100
101
            return;
102
        } catch (WebsocketException $e) {
103
            $this->messageProcessor->close($this->socketStream);
104
            $this->handler->onError($e, $this);
105
        }
106
    }
107
108
    /**
109
     * This method build a message and buffer data in case of incomplete data.
110
     *
111
     * @param string $data
112
     */
113
    protected function processMessage($data)
114
    {
115
        // It may be a timeout going (we were waiting for data), let's clear it.
116
        if ($this->timeout !== null) {
117
            $this->timeout->cancel();
118
            $this->timeout = null;
119
        }
120
121
        $messages = $this->messageProcessor->onData($data, $this->socketStream, $this->currentMessage);
122
123
        foreach ($messages as $message) {
124
            $this->currentMessage = $message;
125
            if ($this->currentMessage->isComplete()) {
126
                // Sending the message through the woketo API.
127
                switch($this->currentMessage->getOpcode()) {
128
                    case Frame::OP_TEXT:
129
                        $this->handler->onMessage($this->currentMessage->getContent(), $this);
130
                        break;
131
                    case Frame::OP_BINARY:
132
                        $this->handler->onBinary($this->currentMessage->getContent(), $this);
133
                        break;
134
                }
135
                $this->currentMessage = null;
136
137
            } else {
138
                // We wait for more data so we start a timeout.
139
                $this->timeout = $this->loop->addTimer(Connection::DEFAULT_TIMEOUT, function () {
140
                    $this->messageProcessor->timeout($this->socketStream);
141
                });
142
            }
143
        }
144
145
    }
146
147
    /**
148
     * @param string|Frame $frame
149
     * @param int          $opCode An int representing binary or text data (const of Frame class)
150
     * @throws \Nekland\Woketo\Exception\RuntimeException
151
     */
152
    public function write($frame, int $opCode = Frame::OP_TEXT)
153
    {
154
        try {
155
            $this->messageProcessor->write($frame, $this->socketStream, $opCode);
156
        } catch (WebsocketException $e) {
157
            throw new RuntimeException($e);
158
        }
159
    }
160
161
    /**
162
     * @param $data
163
     */
164
    public function error($data)
165
    {
166
        echo "There is an error : \n" . $data . "\n\n";
167
    }
168
169
    /**
170
     * If it's a new client, we need to make some special actions named the handshake.
171
     *
172
     * @param string $data
173
     */
174
    protected function processHandcheck($data)
175
    {
176
        if ($this->handshakeDone) {
177
            return;
178
        }
179
180
        $request = Request::create($data);
181
        $this->handshake->verify($request);
182
        $response = Response::createSwitchProtocolResponse();
183
        $this->handshake->sign($request, $response);
0 ignored issues
show
Documentation introduced by
$request is of type object<Nekland\Woketo\Http\Request>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
184
        $response->send($this->socketStream);
185
        
186
        $this->handshakeDone = true;
187
        $this->handler->onConnection($this);
188
    }
189
}
190