Completed
Pull Request — master (#28)
by Maxime
01:54
created

Connection::processMessage()   B

Complexity

Conditions 6
Paths 10

Size

Total Lines 31
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 31
rs 8.439
cc 6
eloc 18
nc 10
nop 1
1
<?php
2
3
/**
4
 * This file is a part of Woketo package.
5
 *
6
 * (c) Ci-tron <[email protected]>
7
 *
8
 * For the full license, take a look to the LICENSE file
9
 * on the root directory of this project
10
 */
11
12
namespace Nekland\Woketo\Server;
13
14
use Nekland\Woketo\Exception\RuntimeException;
15
use Nekland\Woketo\Exception\WebsocketException;
16
use Nekland\Woketo\Http\Request;
17
use Nekland\Woketo\Http\Response;
18
use Nekland\Woketo\Message\MessageHandlerInterface;
19
use Nekland\Woketo\Rfc6455\Frame;
20
use Nekland\Woketo\Rfc6455\Message;
21
use Nekland\Woketo\Rfc6455\MessageProcessor;
22
use Nekland\Woketo\Rfc6455\ServerHandshake;
23
use React\EventLoop\LoopInterface;
24
use React\EventLoop\Timer\TimerInterface;
25
use React\Socket\ConnectionInterface;
26
27
class Connection
28
{
29
    /**
30
     * 5 seconds
31
     */
32
    const DEFAULT_TIMEOUT = 5;
33
34
    /**
35
     * @var ConnectionInterface
36
     */
37
    private $socketStream;
38
39
    /**
40
     * @var MessageHandlerInterface
41
     */
42
    private $handler;
43
44
    /**
45
     * @var bool
46
     */
47
    private $handshakeDone;
48
49
    /**
50
     * @var ServerHandshake
51
     */
52
    private $handshake;
53
54
    /**
55
     * @var Message
56
     */
57
    private $currentMessage;
58
59
    /**
60
     * @var MessageProcessor
61
     */
62
    private $messageProcessor;
63
64
    /**
65
     * @var LoopInterface
66
     */
67
    private $loop;
68
69
    /**
70
     * @var TimerInterface
71
     */
72
    private $timeout;
73
    
74
    public function __construct(ConnectionInterface $socketStream, MessageHandlerInterface $messageHandler, LoopInterface $loop, MessageProcessor $messageProcessor, ServerHandshake $handshake = null)
75
    {
76
        $this->socketStream = $socketStream;
77
        $this->initListeners();
78
        $this->handler = $messageHandler;
79
        $this->handshake = $handshake ?: new ServerHandshake;
80
        $this->loop = $loop;
81
        $this->messageProcessor = $messageProcessor;
82
    }
83
84
    private function initListeners()
85
    {
86
        $this->socketStream->on('data', function ($data) {
87
            $this->processData($data);
88
        });
89
        $this->socketStream->on('error', [$this, 'error']);
90
    }
91
92
    private function processData($data)
93
    {
94
        try {
95
            if (!$this->handshakeDone) {
96
                $this->processHandcheck($data);
97
            } else {
98
                $this->processMessage($data);
99
            }
100
101
            return;
102
        } catch (WebsocketException $e) {
103
            $this->messageProcessor->close($this->socketStream);
104
            $this->handler->onError($e, $this);
105
        }
106
    }
107
108
    /**
109
     * This method build a message and buffer data in case of incomplete data.
110
     *
111
     * @param string $data
112
     */
113
    protected function processMessage($data)
114
    {
115
        // It may be a timeout going (we were waiting for data), let's clear it.
116
        if ($this->timeout !== null) {
117
            $this->timeout->cancel();
118
            $this->timeout = null;
119
        }
120
121
        foreach ($this->messageProcessor->onData($data, $this->socketStream, $this->currentMessage) as $message) {
122
            $this->currentMessage = $message;
123
            if ($this->currentMessage->isComplete()) {
124
                // Sending the message through the woketo API.
125
                switch($this->currentMessage->getOpcode()) {
126
                    case Frame::OP_TEXT:
127
                        $this->handler->onMessage($this->currentMessage->getContent(), $this);
128
                        break;
129
                    case Frame::OP_BINARY:
130
                        $this->handler->onBinary($this->currentMessage->getContent(), $this);
131
                        break;
132
                }
133
                $this->currentMessage = null;
134
135
            } else {
136
                // We wait for more data so we start a timeout.
137
                $this->timeout = $this->loop->addTimer(Connection::DEFAULT_TIMEOUT, function () {
138
                    $this->messageProcessor->timeout($this->socketStream);
139
                });
140
            }
141
        }
142
143
    }
144
145
    /**
146
     * @param string|Frame $frame
147
     * @param int          $opCode An int representing binary or text data (const of Frame class)
148
     * @throws \Nekland\Woketo\Exception\RuntimeException
149
     */
150
    public function write($frame, int $opCode = Frame::OP_TEXT)
151
    {
152
        try {
153
            $this->messageProcessor->write($frame, $this->socketStream, $opCode);
154
        } catch (WebsocketException $e) {
155
            throw new RuntimeException($e);
156
        }
157
    }
158
159
    /**
160
     * @param $data
161
     */
162
    public function error($data)
163
    {
164
        echo "There is an error : \n" . $data . "\n\n";
165
    }
166
167
    /**
168
     * If it's a new client, we need to make some special actions named the handshake.
169
     *
170
     * @param string $data
171
     */
172
    protected function processHandcheck($data)
173
    {
174
        if ($this->handshakeDone) {
175
            return;
176
        }
177
178
        $request = Request::create($data);
179
        $this->handshake->verify($request);
180
        $response = Response::createSwitchProtocolResponse();
181
        $this->handshake->sign($request, $response);
0 ignored issues
show
Documentation introduced by
$request is of type object<Nekland\Woketo\Http\Request>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
182
        $response->send($this->socketStream);
183
        
184
        $this->handshakeDone = true;
185
        $this->handler->onConnection($this);
186
    }
187
}
188