1
|
|
|
<?php
|
2
|
|
|
|
3
|
|
|
/**
|
4
|
|
|
* This file is a part of Woketo package.
|
5
|
|
|
*
|
6
|
|
|
* (c) Nekland <[email protected]>
|
7
|
|
|
*
|
8
|
|
|
* For the full license, take a look to the LICENSE file
|
9
|
|
|
* on the root directory of this project
|
10
|
|
|
*/
|
11
|
|
|
|
12
|
|
|
namespace Nekland\Woketo\Client;
|
13
|
|
|
|
14
|
|
|
|
15
|
|
|
use Nekland\Woketo\Core\AbstractConnection;
|
16
|
|
|
use Nekland\Woketo\Exception\Http\IncompleteHttpMessageException;
|
17
|
|
|
use Nekland\Woketo\Exception\RuntimeException;
|
18
|
|
|
use Nekland\Woketo\Exception\WebsocketException;
|
19
|
|
|
use Nekland\Woketo\Http\Response;
|
20
|
|
|
use Nekland\Woketo\Http\Url;
|
21
|
|
|
use Nekland\Woketo\Message\MessageHandlerInterface;
|
22
|
|
|
use Nekland\Woketo\Rfc6455\Frame;
|
23
|
|
|
use Nekland\Woketo\Rfc6455\Handshake\ClientHandshake;
|
24
|
|
|
use Nekland\Woketo\Rfc6455\MessageProcessor;
|
25
|
|
|
use React\EventLoop\LoopInterface;
|
26
|
|
|
use React\Promise\PromiseInterface;
|
27
|
|
|
use React\Socket\ConnectionInterface;
|
28
|
|
|
|
29
|
|
|
class Connection extends AbstractConnection
|
30
|
|
|
{
|
31
|
|
|
/**
|
32
|
|
|
* @var string|null
|
33
|
|
|
*/
|
34
|
|
|
private $handshakeKey;
|
35
|
|
|
|
36
|
|
|
/**
|
37
|
|
|
* @var string
|
38
|
|
|
*/
|
39
|
|
|
private $buffer;
|
40
|
|
|
|
41
|
|
|
/**
|
42
|
|
|
* @var Url
|
43
|
|
|
*/
|
44
|
|
|
private $url;
|
45
|
|
|
|
46
|
3 |
|
public function __construct(Url $url, PromiseInterface $clientPromise, MessageProcessor $messageProcessor, MessageHandlerInterface $handler, LoopInterface $loop)
|
47
|
|
|
{
|
48
|
3 |
|
parent::__construct($messageProcessor, $loop, new ClientHandshake());
|
49
|
|
|
|
50
|
3 |
|
$this->url = $url;
|
51
|
3 |
|
$this->uri = $this->url->getUri();
|
52
|
3 |
|
$this->buffer = '';
|
53
|
3 |
|
$this->handler = $handler;
|
|
|
|
|
54
|
|
|
|
55
|
|
|
$clientPromise->then(function (ConnectionInterface $stream) {
|
56
|
3 |
|
$this->stream = $stream;
|
57
|
3 |
|
$this->onConnection($stream);
|
58
|
|
|
}, function (\Exception $error) {
|
59
|
|
|
$this->onError($error);
|
60
|
3 |
|
});
|
61
|
3 |
|
}
|
62
|
|
|
|
63
|
3 |
|
private function onConnection(ConnectionInterface $stream)
|
64
|
|
|
{
|
65
|
|
|
$stream->on('data', function (string $data) {
|
66
|
|
|
$this->onMessage($data);
|
67
|
3 |
|
});
|
68
|
|
|
|
69
|
|
|
// This is done because the handshake should come from the client.
|
70
|
3 |
|
$this->processHandshake('');
|
71
|
3 |
|
}
|
72
|
|
|
|
73
|
|
|
/**
|
74
|
|
|
* {@inheritdoc}
|
75
|
|
|
*/
|
76
|
3 |
|
protected function processHandshake(string $data)
|
77
|
|
|
{
|
78
|
|
|
// Sending initialization request
|
79
|
3 |
|
if (null === $this->handshakeKey) {
|
80
|
3 |
|
$request = $this->handshake->getRequest($this->url);
|
|
|
|
|
81
|
3 |
|
$this->stream->write($request->getRequestAsString());
|
82
|
3 |
|
$this->handshakeKey = $request->getKey();
|
83
|
|
|
|
84
|
3 |
|
return;
|
85
|
|
|
}
|
86
|
|
|
|
87
|
|
|
$this->buffer .= $data;
|
88
|
|
|
|
89
|
|
|
// Receiving the response
|
90
|
|
|
try {
|
91
|
|
|
$response = Response::create($this->buffer);
|
92
|
|
|
} catch (IncompleteHttpMessageException $e) {
|
93
|
|
|
return;
|
94
|
|
|
}
|
95
|
|
|
|
96
|
|
|
// Verifying response data
|
97
|
|
|
$this->handshake->verify($response, $this->handshakeKey);
|
98
|
|
|
|
99
|
|
|
// Signaling the handshake is done to jump in the message exchange process
|
100
|
|
|
$this->handshakeDone = true;
|
101
|
|
|
$this->getHandler()->onConnection($this);
|
102
|
|
|
|
103
|
|
|
if (!empty($this->buffer)) {
|
104
|
|
|
$buffer = $this->buffer;
|
105
|
|
|
$this->buffer = '';
|
106
|
|
|
$this->onMessage($buffer);
|
107
|
|
|
}
|
108
|
|
|
}
|
109
|
|
|
|
110
|
|
|
/**
|
111
|
|
|
* {@inheritdoc}
|
112
|
|
|
*/
|
113
|
|
|
protected function processMessage(string $data)
|
114
|
|
|
{
|
115
|
|
|
// It may be a timeout going (we were waiting for data), let's clear it.
|
116
|
|
|
if ($this->timeout !== null) {
|
117
|
|
|
$this->loop->cancelTimer($this->timeout);
|
118
|
|
|
$this->timeout = null;
|
119
|
|
|
}
|
120
|
|
|
|
121
|
|
|
foreach ($this->messageProcessor->onData($data, $this->stream, $this->currentMessage) as $message) {
|
122
|
|
|
$this->currentMessage = $message;
|
123
|
|
|
|
124
|
|
|
if ($this->currentMessage->isComplete()) {
|
125
|
|
|
// Sending the message through the woketo API.
|
126
|
|
|
switch($this->currentMessage->getOpcode()) {
|
127
|
|
|
case Frame::OP_TEXT:
|
128
|
|
|
$this->getHandler()->onMessage($this->currentMessage->getContent(), $this);
|
129
|
|
|
break;
|
130
|
|
|
case Frame::OP_BINARY:
|
131
|
|
|
$this->getHandler()->onBinary($this->currentMessage->getContent(), $this);
|
132
|
|
|
break;
|
133
|
|
|
}
|
134
|
|
|
$this->currentMessage = null;
|
135
|
|
|
|
136
|
|
|
} else {
|
137
|
|
|
// We wait for more data so we start a timeout.
|
138
|
|
|
$this->timeout = $this->loop->addTimer(Connection::DEFAULT_TIMEOUT, function () {
|
139
|
|
|
$this->getLogger()->notice('Connection to ' . $this->getIp() . ' timed out.');
|
140
|
|
|
$this->messageProcessor->timeout($this->stream);
|
141
|
|
|
});
|
142
|
|
|
}
|
143
|
|
|
}
|
144
|
|
|
}
|
145
|
|
|
|
146
|
|
|
/**
|
147
|
|
|
* @param string|Frame $frame
|
148
|
|
|
* @param int $opCode An int representing binary or text data (const of Frame class)
|
149
|
|
|
* @throws \Nekland\Woketo\Exception\RuntimeException
|
150
|
|
|
*/
|
151
|
1 |
|
public function write($frame, int $opCode = Frame::OP_TEXT)
|
152
|
|
|
{
|
153
|
|
|
try {
|
154
|
1 |
|
$this->messageProcessor->write($frame, $this->stream, $opCode);
|
155
|
|
|
} catch (WebsocketException $e) {
|
156
|
|
|
throw new RuntimeException($e);
|
157
|
|
|
}
|
158
|
1 |
|
}
|
159
|
|
|
|
160
|
|
|
/**
|
161
|
|
|
* @param \Exception|string $error
|
162
|
|
|
*/
|
163
|
|
|
private function onError($error)
|
164
|
|
|
{
|
165
|
|
|
$error = $error instanceof \Exception ? $error->getMessage() : $error;
|
166
|
|
|
|
167
|
|
|
$this->getLogger()->error(sprintf('An error occured: %s', $error));
|
168
|
|
|
}
|
169
|
|
|
|
170
|
|
|
/**
|
171
|
|
|
* {@inheritdoc}
|
172
|
|
|
*/
|
173
|
|
|
public function getIp()
|
174
|
|
|
{
|
175
|
|
|
return $this->url->getHost();
|
176
|
|
|
}
|
177
|
|
|
}
|
178
|
|
|
|
Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.
Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..