Completed
Pull Request — master (#100)
by Maxime
02:25
created

Connection::processHandshake()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2.003

Importance

Changes 0
Metric Value
dl 0
loc 15
ccs 10
cts 11
cp 0.9091
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 10
nc 2
nop 1
crap 2.003
1
<?php
2
3
/**
4
 * This file is a part of Woketo package.
5
 *
6
 * (c) Nekland <[email protected]>
7
 *
8
 * For the full license, take a look to the LICENSE file
9
 * on the root directory of this project
10
 */
11
12
namespace Nekland\Woketo\Server;
13
14
use Nekland\Woketo\Core\AbstractConnection;
15
use Nekland\Woketo\Exception\RuntimeException;
16
use Nekland\Woketo\Exception\WebsocketException;
17
use Nekland\Woketo\Http\Request;
18
use Nekland\Woketo\Http\Response;
19
use Nekland\Woketo\Message\MessageHandlerInterface;
20
use Nekland\Woketo\Rfc6455\Frame;
21
use Nekland\Woketo\Rfc6455\Message;
22
use Nekland\Woketo\Rfc6455\MessageProcessor;
23
use Nekland\Woketo\Rfc6455\Handshake\ServerHandshake;
24
use React\EventLoop\LoopInterface;
25
use React\EventLoop\Timer\TimerInterface;
26
use React\Socket\ConnectionInterface;
27
28
class Connection extends AbstractConnection
29
{
30
    /**
31
     * 5 seconds
32
     */
33
    const DEFAULT_TIMEOUT = 5;
34
35
    /**
36
     * @var Message
37
     */
38
    private $currentMessage;
39
40
    /**
41
     * @var LoopInterface
42
     */
43
    private $loop;
44
45
    /**
46
     * @var TimerInterface
47
     */
48
    private $timeout;
49
    
50 2
    public function __construct(
51
        ConnectionInterface $socketStream,
52
        MessageHandlerInterface $messageHandler,
53
        LoopInterface $loop,
54
        MessageProcessor $messageProcessor,
55
        ServerHandshake $handshake = null
56
    ) {
57 2
        parent::__construct($messageProcessor, $handshake ?: new ServerHandshake);
58 2
        $this->stream = $socketStream;
0 ignored issues
show
Documentation Bug introduced by
$socketStream is of type object<React\Socket\ConnectionInterface>, but the property $stream was declared to be of type object<React\Socket\Connection>. Are you sure that you always receive this specific sub-class here, or does it make sense to add an instanceof check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a given class or a super-class is assigned to a property that is type hinted more strictly.

Either this assignment is in error or an instanceof check should be added for that assignment.

class Alien {}

class Dalek extends Alien {}

class Plot
{
    /** @var  Dalek */
    public $villain;
}

$alien = new Alien();
$plot = new Plot();
if ($alien instanceof Dalek) {
    $plot->villain = $alien;
}
Loading history...
59 2
        $this->initListeners();
60 2
        $this->handler = $messageHandler;
61 2
        $this->loop = $loop;
62 2
    }
63
64 2
    private function initListeners()
65
    {
66
        $this->stream->on('data', function ($data) {
67 2
            $this->processData($data);
68 2
        });
69
        $this->stream->on('error', function ($data) {
70
            $this->error($data);
71 2
        });
72 2
    }
73
74 2
    private function processData($data)
75
    {
76
        try {
77 2
            if (!$this->handshakeDone) {
78 2
                $this->processHandshake($data);
79
            } else {
80 2
                $this->processMessage($data);
81
            }
82
83 2
            return;
84
        } catch (WebsocketException $e) {
85
            $this->messageProcessor->close($this->stream);
86
            $this->logger->notice('Connection to ' . $this->getIp() . ' closed with error : ' . $e->getMessage());
87
            $this->handler->onError($e, $this);
88
        }
89
    }
90
91
    /**
92
     * This method build a message and buffer data in case of incomplete data.
93
     *
94
     * @param string $data
95
     */
96 2
    protected function processMessage(string $data)
97
    {
98
        // It may be a timeout going (we were waiting for data), let's clear it.
99 2
        if ($this->timeout !== null) {
100
            $this->timeout->cancel();
101
            $this->timeout = null;
102
        }
103
104 2
        foreach ($this->messageProcessor->onData($data, $this->stream, $this->currentMessage) as $message) {
105 2
            $this->currentMessage = $message;
106 2
            if ($this->currentMessage->isComplete()) {
107
                // Sending the message through the woketo API.
108 2
                switch($this->currentMessage->getOpcode()) {
109 2
                    case Frame::OP_TEXT:
110 1
                        $this->handler->onMessage($this->currentMessage->getContent(), $this);
111 1
                        break;
112 1
                    case Frame::OP_BINARY:
113 1
                        $this->handler->onBinary($this->currentMessage->getContent(), $this);
114 1
                        break;
115
                }
116 2
                $this->currentMessage = null;
117
118
            } else {
119
                // We wait for more data so we start a timeout.
120 2
                $this->timeout = $this->loop->addTimer(Connection::DEFAULT_TIMEOUT, function () {
121
                    $this->logger->notice('Connection to ' . $this->getIp() . ' timed out.');
122
                    $this->messageProcessor->timeout($this->stream);
123 2
                });
124
            }
125
        }
126 2
    }
127
128
    /**
129
     * @param string|Frame $frame
130
     * @param int          $opCode An int representing binary or text data (const of Frame class)
131
     * @throws \Nekland\Woketo\Exception\RuntimeException
132
     */
133
    public function write($frame, int $opCode = Frame::OP_TEXT)
134
    {
135
        try {
136
            $this->messageProcessor->write($frame, $this->stream, $opCode);
137
        } catch (WebsocketException $e) {
138
            throw new RuntimeException($e);
139
        }
140
    }
141
142
    /**
143
     * @param mixed $data
144
     */
145
    protected function error($data)
146
    {
147
        $message = "A connectivity error occurred: " . $data;
148
        $this->logger->error($message);
149
        $this->handler->onError(new WebsocketException($message), $this);
150
    }
151
152
    /**
153
     * If it's a new client, we need to make some special actions named the handshake.
154
     *
155
     * @param string $data
156
     */
157 2
    protected function processHandshake(string $data)
158
    {
159 2
        if ($this->handshakeDone) {
160
            return;
161
        }
162
163 2
        $request = Request::create($data);
164 2
        $this->handshake->verify($request);
165 2
        $response = Response::createSwitchProtocolResponse();
166 2
        $this->handshake->sign($request, $response);
167 2
        $response->send($this->stream);
168
        
169 2
        $this->handshakeDone = true;
170 2
        $this->handler->onConnection($this);
171 2
    }
172
}
173