Connection::processData()   A
last analyzed

Complexity

Conditions 4
Paths 6

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 4.1967

Importance

Changes 0
Metric Value
dl 0
loc 19
ccs 10
cts 13
cp 0.7692
rs 9.6333
c 0
b 0
f 0
cc 4
nc 6
nop 1
crap 4.1967
1
<?php
2
3
/**
4
 * This file is a part of Woketo package.
5
 *
6
 * (c) Nekland <[email protected]>
7
 *
8
 * For the full license, take a look to the LICENSE file
9
 * on the root directory of this project
10
 */
11
12
namespace Nekland\Woketo\Server;
13
14
use Nekland\Woketo\Core\AbstractConnection;
15
use Nekland\Woketo\Exception\NoHandlerException;
16
use Nekland\Woketo\Exception\RuntimeException;
17
use Nekland\Woketo\Exception\WebsocketException;
18
use Nekland\Woketo\Http\Request;
19
use Nekland\Woketo\Http\Response;
20
use Nekland\Woketo\Rfc6455\Frame;
21
use Nekland\Woketo\Rfc6455\MessageProcessor;
22
use Nekland\Woketo\Rfc6455\Handshake\ServerHandshake;
23
use React\EventLoop\LoopInterface;
24
use React\Socket\ConnectionInterface;
25
26
class Connection extends AbstractConnection
27
{
28 9
    public function __construct(
29
        ConnectionInterface $socketStream,
30
        callable $messageHandler,
31
        LoopInterface $loop,
32
        MessageProcessor $messageProcessor,
33
        ServerHandshake $handshake = null
34
    ) {
35 9
        parent::__construct($messageProcessor, $loop, $handshake ?: new ServerHandshake);
36 9
        $this->stream = $socketStream;
37 9
        $this->initListeners();
38 9
        $this->handler = $messageHandler;
39 9
    }
40
41 9
    private function initListeners()
42
    {
43
        $this->stream->on('data', function ($data) {
44 8
            $this->processData($data);
45 9
        });
46
        $this->stream->once('end', function() {
47 3
            if (!$this->handshakeDone) {
48 1
                $this->logger->info('Disconnected but websocket didn\'t start.');
49 1
                return;
50
            }
51 2
            $this->getHandler()->onDisconnect($this);
52 9
        });
53
        $this->stream->on('error', function ($data) {
54
            $this->error($data);
55 9
        });
56 9
    }
57
58 8
    private function processData($data)
59
    {
60
        try {
61 8
            if (!$this->handshakeDone) {
62 8
                $this->processHandshake($data);
63
            } else {
64 2
                $this->processMessage($data);
65
            }
66
67 7
            return;
68 1
        } catch (WebsocketException $e) {
69
            $this->messageProcessor->close($this->stream);
70
            $this->logger->notice('Connection to ' . $this->getIp() . ' closed with error : ' . $e->getMessage());
71
            $this->getHandler()->onError($e, $this);
72 1
        } catch (NoHandlerException $e) {
73 1
            $this->getLogger()->info(sprintf('No handler found for uri %s. Connection closed.', $this->uri));
74 1
            $this->close(Frame::CLOSE_WRONG_DATA);
75
        }
76 1
    }
77
78
    /**
79
     * This method build a message and buffer data in case of incomplete data.
80
     *
81
     * @param string $data
82
     */
83 2
    protected function processMessage(string $data)
84
    {
85
        // It may be a timeout going (we were waiting for data), let's clear it.
86 2
        if ($this->timeout !== null) {
87
            $this->loop->cancelTimer($this->timeout);
88
            $this->timeout = null;
89
        }
90
91 2
        foreach ($this->messageProcessor->onData($data, $this->stream, $this->currentMessage) as $message) {
92 2
            $this->currentMessage = $message;
93 2
            if ($this->currentMessage->isComplete()) {
94
                // Sending the message through the woketo API.
95 2
                switch($this->currentMessage->getOpcode()) {
96
                    case Frame::OP_TEXT:
97 1
                        $this->getHandler()->onMessage($this->currentMessage->getContent(), $this);
98 1
                        break;
99
                    case Frame::OP_BINARY:
100 1
                        $this->getHandler()->onBinary($this->currentMessage->getContent(), $this);
101 1
                        break;
102
                }
103 2
                $this->currentMessage = null;
104
105
            } else {
106
                // We wait for more data so we start a timeout.
107
                $this->timeout = $this->loop->addTimer(Connection::DEFAULT_TIMEOUT, function () {
108
                    $this->logger->notice('Connection to ' . $this->getIp() . ' timed out.');
109
                    $this->messageProcessor->timeout($this->stream);
110
                });
111
            }
112
        }
113 2
    }
114
115
    public function write($frame, int $opCode = Frame::OP_TEXT)
116
    {
117
        try {
118
            $this->messageProcessor->write($frame, $this->stream, $opCode);
119
        } catch (WebsocketException $e) {
120
            throw new RuntimeException($e);
121
        }
122
    }
123
124
    /**
125
     * @param mixed $data
126
     */
127
    protected function error($data)
128
    {
129
        $message = "A connectivity error occurred: " . $data;
130
        $this->logger->error($message);
131
        $this->getHandler()->onError(new WebsocketException($message), $this);
132
    }
133
134
    /**
135
     * If it's a new client, we need to make some special actions named the handshake.
136
     *
137
     * @param string $data
138
     */
139 8
    protected function processHandshake(string $data)
140
    {
141 8
        if ($this->handshakeDone) {
142
            return;
143
        }
144
145 8
        $request = Request::create($data);
146 8
        $this->handshake->verify($request);
147 8
        $this->uri = $request->getUri();
148 8
        $response = Response::createSwitchProtocolResponse();
149 8
        $this->handshake->sign($response, $this->handshake->extractKeyFromRequest($request));
0 ignored issues
show
Bug introduced by
The method extractKeyFromRequest does only exist in Nekland\Woketo\Rfc6455\Handshake\ServerHandshake, but not in Nekland\Woketo\Rfc6455\Handshake\ClientHandshake.

It seems like the method you are trying to call exists only in some of the possible types.

Let’s take a look at an example:

class A
{
    public function foo() { }
}

class B extends A
{
    public function bar() { }
}

/**
 * @param A|B $x
 */
function someFunction($x)
{
    $x->foo(); // This call is fine as the method exists in A and B.
    $x->bar(); // This method only exists in B and might cause an error.
}

Available Fixes

  1. Add an additional type-check:

    /**
     * @param A|B $x
     */
    function someFunction($x)
    {
        $x->foo();
    
        if ($x instanceof B) {
            $x->bar();
        }
    }
    
  2. Only allow a single type to be passed if the variable comes from a parameter:

    function someFunction(B $x) { /** ... */ }
    
Loading history...
150 8
        $response->send($this->stream);
151
152 8
        $this->handshakeDone = true;
153 8
        $this->getHandler()->onConnection($this);
154 7
    }
155
156
    /**
157
     * @return string
158
     */
159 5
    public function getIp()
160
    {
161 5
        return $this->stream->getRemoteAddress();
162
    }
163
}
164