Completed
Push — master ( 1ea769...678708 )
by Taosikai
13:47
created

TunnelServer::getServer()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
/**
3
 * Spike library
4
 * @author Tao <[email protected]>
5
 */
6
namespace Spike\Server\TunnelServer;
7
8
use React\EventLoop\LoopInterface;
9
use React\Socket\ConnectionInterface;
10
use React\Socket\Server as Socket;
11
use Slince\Event\Event;
12
use Spike\Exception\InvalidArgumentException;
13
use Spike\Protocol\Spike;
14
use Spike\Protocol\SpikeInterface;
15
use Spike\Server\EventStore;
16
use Spike\Server\Server;
17
use Spike\Server\TunnelServer\Timer\ReviewPublicConnection;
18
use Spike\Timer\UseTimerTrait;
19
use Spike\Tunnel\TunnelInterface;
20
use Slince\Event\Dispatcher;
21
use Spike\Timer\TimerInterface;
22
23
abstract class TunnelServer implements TunnelServerInterface
24
{
25
    use UseTimerTrait;
26
27
    /**
28
     * @var ConnectionInterface
29
     */
30
    protected $controlConnection;
31
32
    /**
33
     * @var PublicConnectionCollection
34
     */
35
    protected $publicConnections;
36
37
    /**
38
     * @var Socket
39
     */
40
    protected $socket;
41
42
    /**
43
     * @var TunnelInterface
44
     */
45
    protected $tunnel;
46
47
    /**
48
     * @var Server
49
     */
50
    protected $server;
51
52
    /**
53
     * @var LoopInterface
54
     */
55
    protected $loop;
56
57
    public function __construct(Server $server, ConnectionInterface $controlConnection, TunnelInterface $tunnel, LoopInterface $loop)
58
    {
59
        $this->server = $server;
60
        $this->controlConnection = $controlConnection;
61
        $this->tunnel = $tunnel;
62
        $this->loop = $loop;
63
        $this->publicConnections = new PublicConnectionCollection();
64
    }
65
66
    /**
67
     * {@inheritdoc}
68
     * @codeCoverageIgnore
69
     */
70
    public function run()
71
    {
72
        $this->socket = new Socket($this->getListenAddress(), $this->loop);
73
        $this->socket->on('connection', function($connection){
74
            $publicConnection = new PublicConnection($connection);
75
            $this->publicConnections->add($publicConnection);
76
            $this->handlePublicConnection($publicConnection);
77
        });
78
        //Creates defaults timers
79
        foreach ($this->getDefaultTimers() as $timer) {
80
            $this->addTimer($timer);
81
        }
82
    }
83
84
    /**
85
     * Gets the event dispatcher
86
     * @return Dispatcher
87
     */
88
    public function getDispatcher()
89
    {
90
        return $this->server->getDispatcher();
91
    }
92
93
    /**
94
     * {@inheritdoc}
95
     */
96
    public function close()
97
    {
98
        //Close all public connection
99
        foreach ($this->publicConnections as $publicConnection) {
100
            $this->closePublicConnection($publicConnection, 'The tunnel server has been closed');
101
        }
102
        //Cancel all timers
103
        foreach ($this->timers as $timer) {
104
            $timer->cancel();
105
        }
106
        $this->publicConnections = null;
107
        $this->timers = null;
0 ignored issues
show
Documentation Bug introduced by
It seems like null of type null is incompatible with the declared type array<integer,object<Spike\Timer\TimerInterface>> of property $timers.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
108
        $this->socket && $this->socket->close();
109
    }
110
111
    /**
112
     * Handles the public connection
113
     * @param PublicConnection $publicConnection
114
     * @codeCoverageIgnore
115
     */
116
    public function handlePublicConnection(PublicConnection $publicConnection)
117
    {
118
        $requestProxyMessage = new Spike('request_proxy', $this->tunnel->toArray(), [
119
            'Proxy-Connection-ID' => $publicConnection->getId()
120
        ]);
121
        $this->controlConnection->write($requestProxyMessage);
122
        //Fires 'request_proxy' event
123
        $this->getDispatcher()->dispatch(new Event(EventStore::REQUEST_PROXY, $this, [
124
            'message' => $requestProxyMessage
125
        ]));
126
        $publicConnection->removeAllListeners();
127
        $publicConnection->pause();
128
    }
129
130
    /**
131
     * Registers proxy connection
132
     * @param ConnectionInterface $proxyConnection
133
     * @param SpikeInterface $message
134
     * @codeCoverageIgnore
135
     */
136
    public function registerProxyConnection(ConnectionInterface $proxyConnection, SpikeInterface $message)
137
    {
138
        $connectionId = $message->getHeader('Proxy-Connection-ID');
139
        $publicConnection = $this->publicConnections->findById($connectionId);
140
        if (is_null($publicConnection)) {
141
            throw new InvalidArgumentException(sprintf('Cannot find the public connection "%s"', $connectionId));
142
        }
143
        $startProxyMessage = new Spike('start_proxy');
144
        $proxyConnection->write($startProxyMessage);
145
        //Fires 'start_proxy' event
146
        $this->getDispatcher()->dispatch(new Event(EventStore::REQUEST_PROXY, $this, [
147
            'message' => $startProxyMessage
148
        ]));
149
        //Resumes the public connection
150
        $publicConnection->resume();
151
        $publicConnection->pipe($proxyConnection);
152
        $proxyConnection->pipe($publicConnection->getConnection());
153
        $proxyConnection->write($publicConnection->getInitBuffer());
154
155
        //Handles public connection close
156
        $handlePublicConnectionClose = function() use ($proxyConnection, $publicConnection, &$handleProxyConnectionClose){
157
            $proxyConnection->removeListener('close', $handleProxyConnectionClose);
158
            $proxyConnection->removeListener('error', $handleProxyConnectionClose);
159
            $proxyConnection->end();
160
            echo 'proxy end';
161
            $this->publicConnections->removeElement($publicConnection);
162
        };
163
        $publicConnection->on('close', $handlePublicConnectionClose);
164
        $publicConnection->on('error', $handlePublicConnectionClose);
165
166
        //Handles proxy connection close
167
        $handleProxyConnectionClose = function () use ($publicConnection, &$handlePublicConnectionClose) {
0 ignored issues
show
Bug introduced by
Consider using a different name than the imported variable $handleProxyConnectionClose, or did you forget to import by reference?

It seems like you are assigning to a variable which was imported through a use statement which was not imported by reference.

For clarity, we suggest to use a different name or import by reference depending on whether you would like to have the change visibile in outer-scope.

Change not visible in outer-scope

$x = 1;
$callable = function() use ($x) {
    $x = 2; // Not visible in outer scope. If you would like this, how
            // about using a different variable name than $x?
};

$callable();
var_dump($x); // integer(1)

Change visible in outer-scope

$x = 1;
$callable = function() use (&$x) {
    $x = 2;
};

$callable();
var_dump($x); // integer(2)
Loading history...
168
            $publicConnection->removeListener('close', $handlePublicConnectionClose);
169
            $publicConnection->removeListener('error', $handlePublicConnectionClose);
170
            $publicConnection->end();
171
            echo 'tunnel end';
172
        };
173
        $proxyConnection->on('close', $handleProxyConnectionClose);
174
        $proxyConnection->on('error', $handleProxyConnectionClose);
175
    }
176
177
    /**
178
     * Gets the server
179
     * @return Server
180
     */
181
    public function getServer()
182
    {
183
        return $this->server;
184
    }
185
186
    /**
187
     * Gets the server address to bind
188
     * @return string
189
     */
190
    protected function getListenAddress()
191
    {
192
        return "{$this->server->getHost()}:{$this->tunnel->getServerPort()}";
193
    }
194
195
    /**
196
     * Creates default timers
197
     * @return TimerInterface[]
198
     */
199
    protected function getDefaultTimers()
200
    {
201
        return [
202
            new ReviewPublicConnection($this)
203
        ];
204
    }
205
206
    /**
207
     * Gets the socket of the tunnel server
208
     * @return Socket
209
     */
210
    public function getSocket()
211
    {
212
        return $this->socket;
213
    }
214
215
    /**
216
     * {@inheritdoc}
217
     */
218
    public function getControlConnection()
219
    {
220
        return $this->controlConnection;
221
    }
222
223
    /**
224
     * {@inheritdoc}
225
     */
226
    public function getLoop()
227
    {
228
        return $this->loop;
229
    }
230
231
    /**
232
     * {@inheritdoc}
233
     */
234
    public function getPublicConnections()
235
    {
236
        return $this->publicConnections;
237
    }
238
239
    /**
240
     * {@inheritdoc}
241
     */
242
    public function getTunnel()
243
    {
244
        return $this->tunnel;
245
    }
246
}