Completed
Pull Request — master (#100)
by Maxime
01:49
created

Connection::processMessage()   B

Complexity

Conditions 6
Paths 10

Size

Total Lines 33
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 0
Metric Value
dl 0
loc 33
ccs 0
cts 26
cp 0
rs 8.439
c 0
b 0
f 0
cc 6
eloc 19
nc 10
nop 1
crap 42
1
<?php
2
3
/**
4
 * This file is a part of Woketo package.
5
 *
6
 * (c) Nekland <[email protected]>
7
 *
8
 * For the full license, take a look to the LICENSE file
9
 * on the root directory of this project
10
 */
11
12
namespace Nekland\Woketo\Client;
13
14
15
use Nekland\Woketo\Core\AbstractConnection;
16
use Nekland\Woketo\Exception\Http\IncompleteHttpMessageException;
17
use Nekland\Woketo\Exception\RuntimeException;
18
use Nekland\Woketo\Exception\WebsocketException;
19
use Nekland\Woketo\Http\Response;
20
use Nekland\Woketo\Http\Url;
21
use Nekland\Woketo\Message\MessageHandlerInterface;
22
use Nekland\Woketo\Rfc6455\Frame;
23
use Nekland\Woketo\Rfc6455\Handshake\ClientHandshake;
24
use Nekland\Woketo\Rfc6455\MessageProcessor;
25
use React\EventLoop\LoopInterface;
26
use React\Promise\PromiseInterface;
27
use React\Stream\Stream;
28
29
class Connection extends AbstractConnection
30
{
31
    /**
32
     * @var string|null
33
     */
34
    private $handshakeKey;
35
36
    /**
37
     * @var string
38
     */
39
    private $buffer;
40
41
    /**
42
     * @var Url
43
     */
44
    private $url;
45
46
    public function __construct(Url $url, PromiseInterface $clientPromise, MessageProcessor $messageProcessor, MessageHandlerInterface $handler, LoopInterface $loop)
47
    {
48
        parent::__construct($messageProcessor, $loop, new ClientHandshake());
49
50
        $this->url = $url;
51
        $this->uri = $this->url->getUri();
52
        $this->buffer = '';
53
        $this->handler = $handler;
54
55
        $clientPromise->then(function (Stream $stream) {
56
            $this->stream = $stream;
0 ignored issues
show
Documentation Bug introduced by
$stream is of type object<React\Stream\Stream>, but the property $stream was declared to be of type object<React\Socket\Connection>. Are you sure that you always receive this specific sub-class here, or does it make sense to add an instanceof check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a given class or a super-class is assigned to a property that is type hinted more strictly.

Either this assignment is in error or an instanceof check should be added for that assignment.

class Alien {}

class Dalek extends Alien {}

class Plot
{
    /** @var  Dalek */
    public $villain;
}

$alien = new Alien();
$plot = new Plot();
if ($alien instanceof Dalek) {
    $plot->villain = $alien;
}
Loading history...
57
            $this->onConnection($stream);
58
        }, function (\Exception $error){
59
            $this->onError($error);
60
        });
61
    }
62
63
    private function onConnection(Stream $stream)
64
    {
65
        $stream->on('data', function (string $data) {
66
            $this->onMessage($data);
67
        });
68
69
        // This is done because the handshake should come from the client.
70
        $this->processHandshake('');
71
    }
72
73
    /**
74
     * @param string $data
75
     */
76
    protected function processHandshake(string $data)
77
    {
78
        // Sending initialization request
79
        if (null === $this->handshakeKey) {
80
            $request = $this->handshake->getRequest($this->url);
0 ignored issues
show
Bug introduced by
The method getRequest does only exist in Nekland\Woketo\Rfc6455\Handshake\ClientHandshake, but not in Nekland\Woketo\Rfc6455\Handshake\ServerHandshake.

It seems like the method you are trying to call exists only in some of the possible types.

Let’s take a look at an example:

class A
{
    public function foo() { }
}

class B extends A
{
    public function bar() { }
}

/**
 * @param A|B $x
 */
function someFunction($x)
{
    $x->foo(); // This call is fine as the method exists in A and B.
    $x->bar(); // This method only exists in B and might cause an error.
}

Available Fixes

  1. Add an additional type-check:

    /**
     * @param A|B $x
     */
    function someFunction($x)
    {
        $x->foo();
    
        if ($x instanceof B) {
            $x->bar();
        }
    }
    
  2. Only allow a single type to be passed if the variable comes from a parameter:

    function someFunction(B $x) { /** ... */ }
    
Loading history...
81
            $this->stream->write($request->getRequestAsString());
82
            $this->handshakeKey = $request->getKey();
83
84
            return;
85
        }
86
87
        $this->buffer .= $data;
88
89
        // Receiving the response
90
        try {
91
            $response = Response::create($this->buffer);
92
        } catch (IncompleteHttpMessageException $e) {
93
            return;
94
        }
95
96
        // Verifying response data
97
        $this->handshake->verify($response, $this->handshakeKey);
98
99
        // Signaling the handshake is done to jump in the message exchange process
100
        $this->handshakeDone = true;
101
        $this->getHandler()->onConnection($this);
102
103
        if (!empty($this->buffer)) {
104
            $buffer = $this->buffer;
105
            $this->buffer = '';
106
            $this->onMessage($buffer);
107
        }
108
    }
109
110
    protected function processMessage(string $data)
111
    {
112
        // It may be a timeout going (we were waiting for data), let's clear it.
113
        if ($this->timeout !== null) {
114
            $this->timeout->cancel();
115
            $this->timeout = null;
116
        }
117
118
119
        foreach ($this->messageProcessor->onData($data, $this->stream, $this->currentMessage) as $message) {
120
            $this->currentMessage = $message;
121
122
            if ($this->currentMessage->isComplete()) {
123
                // Sending the message through the woketo API.
124
                switch($this->currentMessage->getOpcode()) {
125
                    case Frame::OP_TEXT:
126
                        $this->getHandler()->onMessage($this->currentMessage->getContent(), $this);
127
                        break;
128
                    case Frame::OP_BINARY:
129
                        $this->getHandler()->onBinary($this->currentMessage->getContent(), $this);
130
                        break;
131
                }
132
                $this->currentMessage = null;
133
134
            } else {
135
                // We wait for more data so we start a timeout.
136
                $this->timeout = $this->loop->addTimer(Connection::DEFAULT_TIMEOUT, function () {
137
                    $this->getLogger()->notice('Connection to ' . $this->getIp() . ' timed out.');
138
                    $this->messageProcessor->timeout($this->stream);
139
                });
140
            }
141
        }
142
    }
143
144
    /**
145
     * @param string|Frame $frame
146
     * @param int          $opCode An int representing binary or text data (const of Frame class)
147
     * @throws \Nekland\Woketo\Exception\RuntimeException
148
     */
149
    public function write($frame, int $opCode = Frame::OP_TEXT)
150
    {
151
        try {
152
            $this->messageProcessor->writeMasked($frame, $this->stream, $opCode);
153
        } catch (WebsocketException $e) {
154
            throw new RuntimeException($e);
155
        }
156
    }
157
158
    /**
159
     * @param \Exception|string $error
160
     */
161
    private function onError($error)
162
    {
163
        $error = $error instanceof \Exception ? $error->getMessage() : $error;
164
165
        $this->getLogger()->error(sprintf('An error occured: %s', $error));
166
    }
167
168
    /**
169
     * May return ip or hostname
170
     *
171
     * @return string
172
     */
173
    public function getIp()
174
    {
175
        return $this->url->getHost();
176
    }
177
}
178