Completed
Pull Request — master (#100)
by Maxime
03:29
created

Connection::processMessage()   B

Complexity

Conditions 6
Paths 10

Size

Total Lines 33
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 0
Metric Value
dl 0
loc 33
ccs 0
cts 26
cp 0
rs 8.439
c 0
b 0
f 0
cc 6
eloc 19
nc 10
nop 1
crap 42
1
<?php
2
3
/**
4
 * This file is a part of Woketo package.
5
 *
6
 * (c) Nekland <[email protected]>
7
 *
8
 * For the full license, take a look to the LICENSE file
9
 * on the root directory of this project
10
 */
11
12
namespace Nekland\Woketo\Client;
13
14
15
use Nekland\Woketo\Core\AbstractConnection;
16
use Nekland\Woketo\Exception\Http\IncompleteHttpMessageException;
17
use Nekland\Woketo\Exception\RuntimeException;
18
use Nekland\Woketo\Exception\WebsocketException;
19
use Nekland\Woketo\Http\Request;
20
use Nekland\Woketo\Http\Response;
21
use Nekland\Woketo\Message\MessageHandlerInterface;
22
use Nekland\Woketo\Rfc6455\Frame;
23
use Nekland\Woketo\Rfc6455\Handshake\ClientHandshake;
24
use Nekland\Woketo\Rfc6455\MessageProcessor;
25
use React\Promise\PromiseInterface;
26
use React\Stream\Stream;
27
28
class Connection extends AbstractConnection
29
{
30
    /**
31
     * @var bool
32
     */
33
    private $requestSent;
34
35
    /**
36
     * @var string
37
     */
38
    private $host;
39
40
    /**
41
     * @var string
42
     */
43
    private $buffer;
44
45
    public function __construct(string $uri, string $host, PromiseInterface $clientPromise, MessageProcessor $messageProcessor, MessageHandlerInterface $handler)
46
    {
47
        parent::__construct($messageProcessor, new ClientHandshake());
48
49
        $this->requestSent = false;
50
        $this->uri = $uri;
51
        $this->host = $host;
52
        $this->buffer = '';
53
        $this->handler = $handler;
54
55
        $clientPromise->then(function (Stream $stream) {
56
            $this->stream = $stream;
0 ignored issues
show
Documentation Bug introduced by
$stream is of type object<React\Stream\Stream>, but the property $stream was declared to be of type object<React\Socket\Connection>. Are you sure that you always receive this specific sub-class here, or does it make sense to add an instanceof check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a given class or a super-class is assigned to a property that is type hinted more strictly.

Either this assignment is in error or an instanceof check should be added for that assignment.

class Alien {}

class Dalek extends Alien {}

class Plot
{
    /** @var  Dalek */
    public $villain;
}

$alien = new Alien();
$plot = new Plot();
if ($alien instanceof Dalek) {
    $plot->villain = $alien;
}
Loading history...
57
            $this->onConnection($stream);
58
        }, function (\Exception $error){
59
            $this->onError($error);
60
        });
61
    }
62
63
    private function onConnection(Stream $stream)
64
    {
65
        // This is done because the handshake should come from the client.
66
        $this->processHandshake('');
67
68
        $stream->on('message', function (string $data) {
69
            $this->onMessage($data);
70
        });
71
    }
72
73
    /**
74
     * @param string $data
75
     */
76
    protected function processHandshake(string $data)
77
    {
78
        // Sending initialization request
79
        if (!$this->requestSent) {
80
            $request = Request::createClientRequest($this->uri, $this->host);
81
            $this->stream->write($request->getRequestAsString());
82
            return;
83
        }
84
85
        $this->buffer .= $data;
86
87
        // Receiving the response
88
        try {
89
            $response = Response::create($data);
90
        } catch (IncompleteHttpMessageException $e) {
91
            return;
92
        }
93
94
        // Verifying response data
95
        $this->handshake->verify($response);
96
97
        // Signaling the handshake is done to jump in the message exchange process
98
        $this->handshakeDone = true;
99
        $this->getHandler()->onConnection($this);
100
    }
101
102
    protected function processMessage(string $data)
103
    {
104
        // It may be a timeout going (we were waiting for data), let's clear it.
105
        if ($this->timeout !== null) {
106
            $this->timeout->cancel();
107
            $this->timeout = null;
108
        }
109
110
111
        foreach ($this->messageProcessor->onData($data, $this->stream, $this->currentMessage) as $message) {
112
            $this->currentMessage = $message;
113
114
            if ($this->currentMessage->isComplete()) {
115
                // Sending the message through the woketo API.
116
                switch($this->currentMessage->getOpcode()) {
117
                    case Frame::OP_TEXT:
118
                        $this->getHandler()->onMessage($this->currentMessage->getContent(), $this);
119
                        break;
120
                    case Frame::OP_BINARY:
121
                        $this->getHandler()->onBinary($this->currentMessage->getContent(), $this);
122
                        break;
123
                }
124
                $this->currentMessage = null;
125
126
            } else {
127
                // We wait for more data so we start a timeout.
128
                $this->timeout = $this->loop->addTimer(Connection::DEFAULT_TIMEOUT, function () {
129
                    $this->logger->notice('Connection to ' . $this->getIp() . ' timed out.');
130
                    $this->messageProcessor->timeout($this->stream);
131
                });
132
            }
133
        }
134
    }
135
136
    /**
137
     * @param string|Frame $frame
138
     * @param int          $opCode An int representing binary or text data (const of Frame class)
139
     * @throws \Nekland\Woketo\Exception\RuntimeException
140
     */
141
    public function write($frame, int $opCode = Frame::OP_TEXT)
142
    {
143
        try {
144
            $this->messageProcessor->writeMasked($frame, $this->stream, $opCode);
145
        } catch (WebsocketException $e) {
146
            throw new RuntimeException($e);
147
        }
148
    }
149
150
    /**
151
     * @param \Exception|string $error
152
     */
153
    private function onError($error)
154
    {
155
        $error = $error instanceof \Exception ? $error->getMessage() : $error;
156
157
        $this->logger->error(sprintf('An error occured: %s', $error));
158
    }
159
}
160