Completed
Push — master ( 390e4e...b0ec12 )
by Taosikai
12:57
created

TunnelServer::handleProxyConnection()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 13
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 8
nc 1
nop 1
1
<?php
2
/**
3
 * Spike library
4
 * @author Tao <[email protected]>
5
 */
6
namespace Spike\Server\TunnelServer;
7
8
use React\EventLoop\LoopInterface;
9
use React\Socket\ConnectionInterface;
10
use React\Socket\Server as Socket;
11
use Slince\Event\Event;
12
use Spike\Exception\InvalidArgumentException;
13
use Spike\Protocol\Spike;
14
use Spike\Protocol\SpikeInterface;
15
use Spike\Server\EventStore;
16
use Spike\Server\Server;
17
use Spike\Server\TunnelServer\Timer\ReviewPublicConnection;
18
use Spike\Timer\UseTimerTrait;
19
use Spike\Tunnel\TunnelInterface;
20
use Slince\Event\Dispatcher;
21
use Spike\Timer\TimerInterface;
22
23
abstract class TunnelServer implements TunnelServerInterface
24
{
25
    use UseTimerTrait;
26
27
    /**
28
     * @var ConnectionInterface
29
     */
30
    protected $controlConnection;
31
32
    /**
33
     * @var PublicConnectionCollection
34
     */
35
    protected $publicConnections;
36
37
    /**
38
     * @var Socket
39
     */
40
    protected $socket;
41
42
    /**
43
     * @var TunnelInterface
44
     */
45
    protected $tunnel;
46
47
    /**
48
     * @var Server
49
     */
50
    protected $server;
51
52
    /**
53
     * @var LoopInterface
54
     */
55
    protected $loop;
56
57
    public function __construct(Server $server, ConnectionInterface $controlConnection, TunnelInterface $tunnel, LoopInterface $loop)
58
    {
59
        $this->server = $server;
60
        $this->controlConnection = $controlConnection;
61
        $this->tunnel = $tunnel;
62
        $this->loop = $loop;
63
        $this->socket = new Socket($this->getListenAddress(), $loop);
64
        $this->publicConnections = new PublicConnectionCollection();
65
    }
66
67
    /**
68
     * {@inheritdoc}
69
     */
70
    public function run()
71
    {
72
        $this->socket->on('connection', function($connection){
73
            $publicConnection = new PublicConnection($connection);
74
            $this->publicConnections->add($publicConnection);
75
            $this->handlePublicConnection($publicConnection);
76
        });
77
        //Creates defaults timers
78
        foreach ($this->getDefaultTimers() as $timer) {
79
            $this->addTimer($timer);
80
        }
81
    }
82
83
    /**
84
     * Gets the event dispatcher
85
     * @return Dispatcher
86
     */
87
    public function getDispatcher()
88
    {
89
        return $this->server->getDispatcher();
90
    }
91
92
    /**
93
     * {@inheritdoc}
94
     */
95
    public function close()
96
    {
97
        //Close all public connection
98
        foreach ($this->publicConnections as $publicConnection) {
99
            $this->closePublicConnection($publicConnection, 'The tunnel server has been closed');
100
        }
101
        //Cancel all timers
102
        foreach ($this->timers as $timer) {
103
            $timer->cancel();
104
        }
105
        $this->publicConnections = null;
106
        $this->timers = null;
0 ignored issues
show
Documentation Bug introduced by
It seems like null of type null is incompatible with the declared type array<integer,object<Spike\Timer\TimerInterface>> of property $timers.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
107
        $this->socket->close();
108
    }
109
110
    /**
111
     * Handles the public connection
112
     * @param PublicConnection $publicConnection
113
     */
114
    public function handlePublicConnection(PublicConnection $publicConnection)
115
    {
116
        $requestProxyMessage = new Spike('request_proxy', $this->tunnel->toArray(), [
117
            'Proxy-Connection-ID' => $publicConnection->getId()
118
        ]);
119
        $this->controlConnection->write($requestProxyMessage);
120
        //Fires 'request_proxy' event
121
        $this->getDispatcher()->dispatch(new Event(EventStore::REQUEST_PROXY, $this, [
122
            'message' => $requestProxyMessage
123
        ]));
124
        $publicConnection->removeAllListeners();
125
        $publicConnection->pause();
126
    }
127
128
    /**
129
     * Registers proxy connection
130
     * @param ConnectionInterface $proxyConnection
131
     * @param SpikeInterface $message
132
     */
133
    public function registerProxyConnection(ConnectionInterface $proxyConnection, SpikeInterface $message)
134
    {
135
        $connectionId = $message->getHeader('Proxy-Connection-ID');
136
        $publicConnection = $this->publicConnections->findById($connectionId);
137
        if (is_null($publicConnection)) {
138
            throw new InvalidArgumentException(sprintf('Cannot find the public connection "%s"', $connectionId));
139
        }
140
        $startProxyMessage = new Spike('start_proxy');
141
        $proxyConnection->write($startProxyMessage);
142
        //Fires 'start_proxy' event
143
        $this->getDispatcher()->dispatch(new Event(EventStore::REQUEST_PROXY, $this, [
144
            'message' => $startProxyMessage
145
        ]));
146
        //Resumes the public connection
147
        $publicConnection->resume();
148
        $publicConnection->pipe($proxyConnection);
149
        $proxyConnection->pipe($publicConnection->getConnection());
150
        $proxyConnection->write($publicConnection->getInitBuffer());
151
152
        //Handles public connection close
153
        $handlePublicConnectionClose = function() use ($proxyConnection, $publicConnection, &$handleProxyConnectionClose){
154
            $proxyConnection->removeListener('close', $handleProxyConnectionClose);
155
            $proxyConnection->removeListener('error', $handleProxyConnectionClose);
156
            $proxyConnection->end();
157
            echo 'proxy end';
158
            $this->publicConnections->removeElement($publicConnection);
159
        };
160
        $publicConnection->on('close', $handlePublicConnectionClose);
161
        $publicConnection->on('error', $handlePublicConnectionClose);
162
163
        //Handles proxy connection close
164
        $handleProxyConnectionClose = function () use ($publicConnection, &$handlePublicConnectionClose) {
0 ignored issues
show
Bug introduced by
Consider using a different name than the imported variable $handleProxyConnectionClose, or did you forget to import by reference?

It seems like you are assigning to a variable which was imported through a use statement which was not imported by reference.

For clarity, we suggest to use a different name or import by reference depending on whether you would like to have the change visibile in outer-scope.

Change not visible in outer-scope

$x = 1;
$callable = function() use ($x) {
    $x = 2; // Not visible in outer scope. If you would like this, how
            // about using a different variable name than $x?
};

$callable();
var_dump($x); // integer(1)

Change visible in outer-scope

$x = 1;
$callable = function() use (&$x) {
    $x = 2;
};

$callable();
var_dump($x); // integer(2)
Loading history...
165
            $publicConnection->removeListener('close', $handlePublicConnectionClose);
166
            $publicConnection->removeListener('error', $handlePublicConnectionClose);
167
            $publicConnection->end();
168
            echo 'tunnel end';
169
        };
170
        $proxyConnection->on('close', $handleProxyConnectionClose);
171
        $proxyConnection->on('error', $handleProxyConnectionClose);
172
    }
173
174
    /**
175
     * Gets the server address to bind
176
     * @return string
177
     */
178
    protected function getListenAddress()
179
    {
180
        return "{$this->server->getHost()}:{$this->tunnel->getServerPort()}";
181
    }
182
183
    /**
184
     * Creates default timers
185
     * @return TimerInterface[]
186
     */
187
    protected function getDefaultTimers()
188
    {
189
        return [
190
            new ReviewPublicConnection($this)
191
        ];
192
    }
193
194
    /**
195
     * {@inheritdoc}
196
     */
197
    public function getControlConnection()
198
    {
199
        return $this->controlConnection;
200
    }
201
202
    /**
203
     * {@inheritdoc}
204
     */
205
    public function getLoop()
206
    {
207
        return $this->loop;
208
    }
209
210
    /**
211
     * {@inheritdoc}
212
     */
213
    public function getPublicConnections()
214
    {
215
        return $this->publicConnections;
216
    }
217
218
    /**
219
     * {@inheritdoc}
220
     */
221
    public function getTunnel()
222
    {
223
        return $this->tunnel;
224
    }
225
}