Completed
Push — master ( 83681b...007d34 )
by Taosikai
12:03
created

Client::getClientId()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 4
rs 10
cc 1
eloc 2
nc 1
nop 0
1
<?php
2
/**
3
 * Spike library
4
 * @author Tao <[email protected]>
5
 */
6
namespace Spike\Client;
7
8
use React\EventLoop\LoopInterface;
9
use React\EventLoop\Factory as LoopFactory;
10
use React\Socket\ConnectionInterface;
11
use React\Socket\Connector;
12
use Slince\Event\Dispatcher;
13
use Slince\Event\Event;
14
use Spike\Client\Timer\Heartbeat;
15
use Spike\Logger\Logger;
16
use Spike\Timer\MemoryWatcher;
17
use Spike\Timer\TimerInterface;
18
use Spike\Timer\UseTimerTrait;
19
use Spike\Tunnel\HttpTunnel;
20
use Spike\Tunnel\TunnelFactory;
21
use Spike\Tunnel\TunnelInterface;
22
use Spike\Client\TunnelClient\TunnelClientInterface;
23
use Spike\Exception\InvalidArgumentException;
24
use Spike\Exception\RuntimeException;
25
use Spike\Parser\SpikeParser;
26
use Spike\Protocol\Spike;
27
use Spike\Protocol\SpikeInterface;
28
29
class Client
30
{
31
    use UseTimerTrait;
32
33
    /**
34
     * @var LoopInterface
35
     */
36
    protected $loop;
37
38
    /**
39
     * @var Dispatcher
40
     */
41
    protected $dispatcher;
42
43
    /**
44
     * @var Connector
45
     */
46
    protected $connector;
47
48
    /**
49
     * @var ConnectionInterface
50
     */
51
    protected $controlConnection;
52
53
    /**
54
     * @var TunnelClientCollection
55
     */
56
    protected $tunnelClients;
57
58
    /**
59
     * @var string
60
     */
61
    protected $serverAddress;
62
63
    /**
64
     * @var array
65
     */
66
    protected $credential;
67
68
    /**
69
     * @var string
70
     */
71
    protected $id;
72
73
    /**
74
     * Tunnels collection
75
     * @var TunnelCollection
76
     */
77
    protected $tunnels = [];
78
79
    /**
80
     * Auth info
81
     * @var array
82
     */
83
    protected $auth;
84
85
    /**
86
     * @var Logger
87
     */
88
    protected $logger;
89
90
    public function __construct($serverAddress, $tunnels, $auth, LoopInterface $loop = null, Dispatcher $dispatcher = null)
91
    {
92
        $this->serverAddress = $serverAddress;
93
        $this->auth = $auth;
94
        $this->dispatcher = $dispatcher ?: new Dispatcher();
95
        $this->loop = $loop ?: LoopFactory::create();
96
        $this->connector = new Connector($this->loop);
97
        $this->tunnels = $this->createTunnels($tunnels);
98
        $this->tunnelClients = new TunnelClientCollection();
99
    }
100
101
    /**
102
     * Creates array of tunnels
103
     * @param array $data
104
     * @return TunnelCollection
105
     */
106
    protected function createTunnels($data)
107
    {
108
        $tunnels = [];
109
        foreach ($data as $info) {
110
            $tunnel = TunnelFactory::fromArray($info);
111
            $tunnels[] = $tunnel;
112
        }
113
        return new TunnelCollection($tunnels);
114
    }
115
116
    /**
117
     * Run the client
118
     */
119
    public function run()
120
    {
121
        $this->connector->connect($this->serverAddress)->then(function(ConnectionInterface $connection){
122
            //Emit the event
123
            $this->dispatcher->dispatch(new Event(EventStore::CONNECT_TO_SERVER, $this, [
124
                'connection' => $connection
125
            ]));
126
            $this->controlConnection = $connection;
127
            $this->requestAuth($connection);
128
            $this->handleControlConnection($connection);
129
        });
130
        $this->dispatcher->dispatch(EventStore::CLIENT_RUN);
131
        foreach ($this->getDefaultTimers() as $timer) {
132
            $this->addTimer($timer);
133
        }
134
        $this->loop->run();
135
    }
136
137
    /**
138
     * Close the client
139
     */
140
    public function close()
141
    {
142
        foreach ($this->timers as $timer) {
143
            $timer->cancel();
144
        }
145
        foreach ($this->tunnelClients as $tunnelClient) {
146
            $tunnelClient->close();
147
        }
148
        $this->controlConnection && $this->controlConnection->end();
149
    }
150
151
    /**
152
     * Handles the control connection
153
     * @param ConnectionInterface $connection
154
     */
155
    protected function handleControlConnection(ConnectionInterface $connection)
156
    {
157
        $parser = new SpikeParser();
158
        $connection->on('data', function($data) use($parser, $connection){
159
            $parser->pushIncoming($data);
160
            $messages = $parser->parse();
161
            foreach ($messages as $message) {
162
                $message = Spike::fromString($message);
163
                $this->dispatcher->dispatch(new Event(EventStore::RECEIVE_MESSAGE, $this, [
164
                    'message' => $message,
165
                    'connection' => $connection
166
                ]));
167
                $this->createMessageHandler($message, $connection)->handle($message);
168
            }
169
        });
170
        $connection->on('close', function(){
171
            $this->close();
172
        });
173
    }
174
175
    /**
176
     * Request for auth
177
     * @param ConnectionInterface $connection
178
     */
179
    protected function requestAuth(ConnectionInterface $connection)
180
    {
181
        $authInfo = array_replace([
182
            'os' => PHP_OS,
183
            'version' => '',
184
        ], $this->auth);
185
        $connection->write(new Spike('auth', $authInfo));
186
    }
187
188
    /**
189
     * Gets the client id
190
     * @return string
191
     */
192
    public function getClientId()
193
    {
194
        return $this->id;
195
    }
196
197
    /**
198
     * Sets the client id
199
     * @param string $id
200
     */
201
    public function setClientId($id)
202
    {
203
        $this->id = $id;
204
        Spike::setGlobalHeader('Client-ID', $id);
205
        $this->addTimer(new Heartbeat($this));
206
    }
207
208
    /**
209
     * Gets all tunnel clients
210
     * @return TunnelClientCollection
211
     */
212
    public function getTunnelClients()
213
    {
214
        return $this->tunnelClients;
215
    }
216
217
    /**
218
     * Gets all tunnels
219
     * @return TunnelCollection
220
     */
221
    public function getTunnels()
222
    {
223
        return $this->tunnels;
224
    }
225
226
    /**
227
     * Gets the dispatcher
228
     * @return Dispatcher
229
     */
230
    public function getDispatcher()
231
    {
232
        return $this->dispatcher;
233
    }
234
235
    /**
236
     * @return ConnectionInterface
237
     */
238
    public function getControlConnection()
239
    {
240
        return $this->controlConnection;
241
    }
242
243
    /**
244
     * Sets a logger
245
     * @param Logger $logger
246
     */
247
    public function setLogger($logger)
248
    {
249
        $this->logger = $logger;
250
    }
251
252
    /**
253
     * Gets the logger
254
     * @return Logger
255
     */
256
    public function getLogger()
257
    {
258
        return $this->logger;
259
    }
260
261
    /**
262
     * Gets the loop instance
263
     * @return LoopInterface
264
     */
265
    public function getLoop()
266
    {
267
        return $this->loop;
268
    }
269
270
    /**
271
     * Creates a tunnel client to process proxy connection
272
     * @param TunnelInterface $tunnel
273
     * @param string $proxyConnectionId
274
     * @return TunnelClientInterface
275
     */
276
    public function createTunnelClient(TunnelInterface $tunnel, $proxyConnectionId)
277
    {
278
        if ($tunnel instanceof HttpTunnel) {
279
            $tunnelClient = new TunnelClient\HttpTunnelClient($this, $tunnel, $proxyConnectionId, $this->serverAddress, $this->loop);
280
        } else {
281
            $tunnelClient = new TunnelClient\TcpTunnelClient($this, $tunnel, $proxyConnectionId, $this->serverAddress, $this->loop);
282
        }
283
        $tunnelClient->run();
284
        $this->tunnelClients->add($tunnelClient);
285
        return $tunnelClient;
286
    }
287
288
    /**
289
     * Creates the handler for the received message
290
     * @param SpikeInterface $message
291
     * @param ConnectionInterface $connection
292
     * @return Handler\HandlerInterface
293
     */
294
    protected function createMessageHandler(SpikeInterface $message, ConnectionInterface $connection)
295
    {
296
        switch ($message->getAction()) {
297
            case 'auth_response':
298
                $handler = new Handler\AuthResponseHandler($this, $connection);
299
                break;
300
            case 'register_tunnel_response':
301
                $handler = new Handler\RegisterTunnelResponseHandler($this, $connection);
302
                break;
303
            case 'request_proxy':
304
                $handler = new Handler\RequestProxyHandler($this, $connection);
305
                break;
306
            default:
307
                throw new InvalidArgumentException(sprintf('Cannot find handler for the message: "%s"',
308
                    $message->getAction()
309
                ));
310
        }
311
        return $handler;
312
    }
313
314
    /**
315
     * Creates default timers
316
     * @return TimerInterface[]
317
     */
318
    protected function getDefaultTimers()
319
    {
320
        return [
321
            new MemoryWatcher($this->logger)
322
        ];
323
    }
324
}