Completed
Push — master ( ca8586...ca93ac )
by Taosikai
10:52
created

Client::handleControlConnection()   B

Complexity

Conditions 3
Paths 1

Size

Total Lines 24
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 24
rs 8.9713
cc 3
eloc 17
nc 1
nop 1
1
<?php
2
/**
3
 * Spike library
4
 * @author Tao <[email protected]>
5
 */
6
namespace Spike\Client;
7
8
use React\EventLoop\LoopInterface;
9
use React\EventLoop\Factory as LoopFactory;
10
use React\Socket\ConnectionInterface;
11
use React\Socket\Connector;
12
use Slince\Event\Dispatcher;
13
use Slince\Event\Event;
14
use Spike\Client\Timer\Heartbeat;
15
use Spike\Logger\Logger;
16
use Spike\Timer\MemoryWatcher;
17
use Spike\Timer\TimerInterface;
18
use Spike\Timer\UseTimerTrait;
19
use Spike\Tunnel\HttpTunnel;
20
use Spike\Tunnel\TunnelFactory;
21
use Spike\Tunnel\TunnelInterface;
22
use Spike\Client\TunnelClient\TunnelClientInterface;
23
use Spike\Exception\InvalidArgumentException;
24
use Spike\Exception\RuntimeException;
25
use Spike\Parser\SpikeParser;
26
use Spike\Protocol\Spike;
27
use Spike\Protocol\SpikeInterface;
28
29
class Client
30
{
31
    use UseTimerTrait;
32
33
    /**
34
     * @var LoopInterface
35
     */
36
    protected $loop;
37
38
    /**
39
     * @var Dispatcher
40
     */
41
    protected $dispatcher;
42
43
    /**
44
     * @var Connector
45
     */
46
    protected $connector;
47
48
    /**
49
     * Tunnels collection
50
     * @var TunnelCollection
51
     */
52
    protected $tunnels;
53
54
    /**
55
     * @var ConnectionInterface
56
     */
57
    protected $controlConnection;
58
59
    /**
60
     * @var TunnelClientCollection
61
     */
62
    protected $tunnelClients;
63
64
    /**
65
     * @var string
66
     */
67
    protected $serverAddress;
68
69
    /**
70
     * @var array
71
     */
72
    protected $credential;
73
74
    /**
75
     * @var string
76
     */
77
    protected $id;
78
79
    /**
80
     * Auth info
81
     * @var array
82
     */
83
    protected $auth;
84
85
    /**
86
     * @var Logger
87
     */
88
    protected $logger;
89
90
    public function __construct($serverAddress, $tunnels, $auth, LoopInterface $loop = null, Dispatcher $dispatcher = null)
91
    {
92
        $this->serverAddress = $serverAddress;
93
        $this->auth = $auth;
94
        $this->dispatcher = $dispatcher ?: new Dispatcher();
95
        $this->loop = $loop ?: LoopFactory::create();
96
        $this->connector = new Connector($this->loop);
97
        $this->tunnels = $this->createTunnels($tunnels);
98
        $this->tunnelClients = new TunnelClientCollection();
99
    }
100
101
    /**
102
     * Creates array of tunnels
103
     * @param array $data
104
     * @return TunnelCollection
105
     */
106
    protected function createTunnels($data)
107
    {
108
        $tunnels = [];
109
        foreach ($data as $info) {
110
            $tunnel = TunnelFactory::fromArray($info);
111
            $tunnels[] = $tunnel;
112
        }
113
        return new TunnelCollection($tunnels);
114
    }
115
116
    /**
117
     * Run the client
118
     * @codeCoverageIgnore
119
     */
120
    public function run()
121
    {
122
        $this->connector->connect($this->serverAddress)->then(function(ConnectionInterface $connection){
123
            //Emit the event
124
            $this->dispatcher->dispatch(new Event(EventStore::CONNECT_TO_SERVER, $this, [
125
                'connection' => $connection
126
            ]));
127
            $this->controlConnection = $connection;
128
            $this->requestAuth($connection);
129
            $this->handleControlConnection($connection);
130
        }, function(){
131
            $this->dispatcher->dispatch(new Event(EventStore::CANNOT_CONNECT_TO_SERVER, $this));
132
        });
133
        $this->dispatcher->dispatch(EventStore::CLIENT_RUN);
134
        foreach ($this->getDefaultTimers() as $timer) {
135
            $this->addTimer($timer);
136
        }
137
        $this->loop->run();
138
    }
139
140
    /**
141
     * Close the client
142
     * @codeCoverageIgnore
143
     */
144
    public function close()
145
    {
146
        foreach ($this->timers as $timer) {
147
            $timer->cancel();
148
        }
149
        foreach ($this->tunnelClients as $tunnelClient) {
150
            $tunnelClient->close();
151
        }
152
        if ($this->controlConnection) {
153
            $this->controlConnection->removeListener('close', [$this, 'handleDisconnectServer']);
154
            $this->controlConnection->end();
155
        }
156
    }
157
158
    /**
159
     * Handles the control connection
160
     * @param ConnectionInterface $connection
161
     * @codeCoverageIgnore
162
     */
163
    protected function handleControlConnection(ConnectionInterface $connection)
164
    {
165
        $parser = new SpikeParser();
166
        $connection->on('data', function($data) use($parser, $connection){
167
            $parser->pushIncoming($data);
168
            try {
169
                $messages = $parser->parse();
170
                foreach ($messages as $message) {
171
                    $message = Spike::fromString($message);
172
                    $this->dispatcher->dispatch(new Event(EventStore::RECEIVE_MESSAGE, $this, [
173
                        'message' => $message,
174
                        'connection' => $connection
175
                    ]));
176
                    $this->createMessageHandler($message, $connection)->handle($message);
177
                }
178
            } catch (RuntimeException $exception) {
179
                $this->dispatcher->dispatch(new Event(EventStore::CONNECTION_ERROR, $this, [
180
                    'connection' => $connection,
181
                    'exception' => $exception
182
                ]));
183
            }
184
        });
185
        $connection->on('close', [$this, 'handleDisconnectServer']);
186
    }
187
188
    /**
189
     * If the client disconnect from the server
190
     * @codeCoverageIgnore
191
     */
192
    public function handleDisconnectServer()
193
    {
194
        $this->dispatcher->dispatch(new Event(EventStore::DISCONNECT_SERVER, $this));
195
        $this->close();
196
    }
197
198
    /**
199
     * Request for auth
200
     * @param ConnectionInterface $connection
201
     * @codeCoverageIgnore
202
     */
203
    protected function requestAuth(ConnectionInterface $connection)
204
    {
205
        $authInfo = array_replace([
206
            'os' => PHP_OS,
207
            'version' => '',
208
        ], $this->auth);
209
        $connection->write(new Spike('auth', $authInfo));
210
    }
211
212
    /**
213
     * Gets the client id
214
     * @return string
215
     */
216
    public function getClientId()
217
    {
218
        return $this->id;
219
    }
220
221
    /**
222
     * Sets the client id
223
     * @param string $id
224
     * @codeCoverageIgnore
225
     */
226
    public function setClientId($id)
227
    {
228
        $this->id = $id;
229
        Spike::setGlobalHeader('Client-ID', $id);
230
        $this->addTimer(new Heartbeat($this));
231
    }
232
233
    /**
234
     * Gets all tunnel clients
235
     * @return TunnelClientCollection
236
     */
237
    public function getTunnelClients()
238
    {
239
        return $this->tunnelClients;
240
    }
241
242
    /**
243
     * Gets all tunnels
244
     * @return TunnelCollection
245
     */
246
    public function getTunnels()
247
    {
248
        return $this->tunnels;
249
    }
250
251
    /**
252
     * Gets the dispatcher
253
     * @return Dispatcher
254
     */
255
    public function getDispatcher()
256
    {
257
        return $this->dispatcher;
258
    }
259
260
    /**
261
     * @return ConnectionInterface
262
     */
263
    public function getControlConnection()
264
    {
265
        return $this->controlConnection;
266
    }
267
268
    /**
269
     * Sets a logger
270
     * @param Logger $logger
271
     */
272
    public function setLogger($logger)
273
    {
274
        $this->logger = $logger;
275
    }
276
277
    /**
278
     * Gets the logger
279
     * @return Logger
280
     */
281
    public function getLogger()
282
    {
283
        return $this->logger;
284
    }
285
286
    /**
287
     * Gets the loop instance
288
     * @return LoopInterface
289
     */
290
    public function getLoop()
291
    {
292
        return $this->loop;
293
    }
294
295
    /**
296
     * Creates a tunnel client to process proxy connection
297
     * @param TunnelInterface $tunnel
298
     * @param string $proxyConnectionId
299
     * @return TunnelClientInterface
300
     */
301
    public function createTunnelClient(TunnelInterface $tunnel, $proxyConnectionId)
302
    {
303
        if ($tunnel instanceof HttpTunnel) {
304
            $tunnelClient = new TunnelClient\HttpTunnelClient($this, $tunnel, $proxyConnectionId, $this->serverAddress, $this->loop);
305
        } else {
306
            $tunnelClient = new TunnelClient\TcpTunnelClient($this, $tunnel, $proxyConnectionId, $this->serverAddress, $this->loop);
307
        }
308
        $tunnelClient->run();
309
        $this->tunnelClients->add($tunnelClient);
310
        return $tunnelClient;
311
    }
312
313
    /**
314
     * Creates the handler for the received message
315
     * @param SpikeInterface $message
316
     * @param ConnectionInterface $connection
317
     * @return Handler\HandlerInterface
318
     * @codeCoverageIgnore
319
     */
320
    protected function createMessageHandler(SpikeInterface $message, ConnectionInterface $connection)
321
    {
322
        switch ($message->getAction()) {
323
            case 'auth_response':
324
                $handler = new Handler\AuthResponseHandler($this, $connection);
325
                break;
326
            case 'register_tunnel_response':
327
                $handler = new Handler\RegisterTunnelResponseHandler($this, $connection);
328
                break;
329
            case 'request_proxy':
330
                $handler = new Handler\RequestProxyHandler($this, $connection);
331
                break;
332
            default:
333
                throw new InvalidArgumentException(sprintf('Cannot find handler for the message: "%s"',
334
                    $message->getAction()
335
                ));
336
        }
337
        return $handler;
338
    }
339
340
    /**
341
     * Creates default timers
342
     * @return TimerInterface[]
343
     * @codeCoverageIgnore
344
     */
345
    protected function getDefaultTimers()
346
    {
347
        return [
348
            new MemoryWatcher($this->logger)
349
        ];
350
    }
351
}