Completed
Push — master ( b80779...c4c0b1 )
by Nazar
06:16
created

Server::onMessage()   C

Complexity

Conditions 14
Paths 17

Size

Total Lines 81
Code Lines 52

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 81
rs 5.0043
cc 14
eloc 52
nc 17
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * @package   WebSockets
4
 * @category  modules
5
 * @author    Nazar Mokrynskyi <[email protected]>
6
 * @copyright Copyright (c) 2015, Nazar Mokrynskyi
7
 * @license   MIT License, see license.txt
8
 */
9
namespace cs\modules\WebSockets;
10
use
11
	Ratchet\Client\Factory as Client_factory,
12
	Ratchet\Client\WebSocket as Client_websocket,
13
	Ratchet\ConnectionInterface,
14
	Ratchet\Http\HttpServer,
15
	Ratchet\MessageComponentInterface,
16
	Ratchet\Server\IoServer,
17
	Ratchet\WebSocket\WsServer,
18
	React\Dns\Resolver\Factory as Dns_factory,
19
	React\EventLoop\Factory as Loop_factory,
20
	cs\Config,
21
	cs\Event,
22
	cs\Session,
23
	cs\Singleton,
24
	cs\User,
25
	Exception,
26
	SplObjectStorage;
27
/**
28
 * @method static Server instance($check = false)
29
 */
30
class Server implements MessageComponentInterface {
31
	use
32
		Singleton;
33
	/**
34
	 * Message will be delivered to everyone
35
	 */
36
	const SEND_TO_ALL = 1;
37
	/**
38
	 * Message will be delivered to registered users only
39
	 */
40
	const SEND_TO_REGISTERED_USERS = 2;
41
	/**
42
	 * Message will be delivered to users, specified in target (might be array of users)
43
	 */
44
	const SEND_TO_SPECIFIC_USERS = 3;
45
	/**
46
	 * Message will be delivered to users from group, specified in target (might be array of groups)
47
	 */
48
	const SEND_TO_USERS_GROUP = 4;
49
	/**
50
	 * Message will be delivered to users whose connection objects have property with certain value, target should be an array with format [property, value]
51
	 */
52
	const SEND_TO_FILTER = 5;
53
	/**
54
	 * Each object additionally will have properties `user_id`, `session_id`, `session_expire` and `user_groups` with user id and ids of user groups
55
	 * correspondingly
56
	 *
57
	 * @var ConnectionInterface[]|SplObjectStorage
58
	 */
59
	protected $clients;
60
	/**
61
	 * @var ConnectionInterface[]|SplObjectStorage
62
	 */
63
	protected $servers;
64
	/**
65
	 * Connection to master server
66
	 *
67
	 * @var ConnectionInterface
68
	 */
69
	protected $connection_to_master;
70
	/**
71
	 * @var Pool
72
	 */
73
	protected $pool;
74
	/**
75
	 * Address to WebSockets server in format wss://server/WebSockets or ws://server/WebSockets, so that one WebSockets server can reach another (in case of
76
	 * several servers)
77
	 *
78
	 * @var string
79
	 */
80
	protected $address;
81
	/**
82
	 * @var IoServer
83
	 */
84
	protected $io_server;
85
	/**
86
	 * @var \React\EventLoop\LoopInterface
87
	 */
88
	protected $loop;
89
	/**
90
	 * @var Client_factory
91
	 */
92
	protected $client_connector;
93
	/**
94
	 * @var int
95
	 */
96
	protected $listen_port;
97
	/**
98
	 * @var string
99
	 */
100
	protected $listen_locally;
101
	/**
102
	 * @var string
103
	 */
104
	protected $dns_server;
105
	/**
106
	 * @var string
107
	 */
108
	protected $security_key;
109
	/**
110
	 * @var bool
111
	 */
112
	protected $remember_session_ip;
113
	protected function construct () {
114
		$Config                    = Config::instance();
115
		$module_data               = $Config->module('WebSockets');
116
		$this->listen_port         = $module_data->listen_port;
117
		$this->listen_locally      = $module_data->listen_locally ? '127.0.0.1' : '0.0.0.0';
118
		$this->dns_server          = $module_data->dns_server ?: '127.0.0.1';
119
		$this->dns_server          = $module_data->security_key;
120
		$this->remember_session_ip = $Config->core['remember_user_ip'];
121
		$this->pool                = Pool::instance();
122
		$this->clients             = new SplObjectStorage;
123
		$this->servers             = new SplObjectStorage;
124
		/**
125
		 * @var \cs\_SERVER $_SERVER
126
		 */
127
		$this->address = ($_SERVER->secure ? 'wss' : 'ws')."://$_SERVER->host/WebSockets";
128
	}
129
	/**
130
	 * Run WebSockets server
131
	 *
132
	 * @param null|string $address
133
	 */
134
	function run ($address = null) {
135
		$this->address = $address ?: $this->address;
136
		@ini_set('error_log', LOGS.'/WebSockets-server.log');
137
		$ws_server = new WsServer($this);
138
		// No encoding check - better performance, browsers do this anyway
139
		$ws_server->setEncodingChecks(false);
140
		// Disable all versions except RFC6455, which is supported by all modern browsers
141
		$ws_server->disableVersion(0);
142
		$ws_server->disableVersion(6);
143
		$this->io_server        = IoServer::factory(
144
			new HttpServer(
145
				new Connection_properties_injector($ws_server)
146
			),
147
			$this->listen_port,
148
			$this->listen_locally
149
		);
150
		$this->loop             = $this->io_server->loop;
151
		$this->client_connector = new Client_factory(
152
			$this->loop,
153
			(new Dns_factory)->create(
154
				$this->dns_server,
155
				$this->loop
156
			)
157
		);
158
		$this->connect_to_master();
159
		// Since we may work with a lot of different users - disable this cache in order to not run out of memory
160
		User::instance()->disable_memory_cache();
161
		$this->loop->run();
162
	}
163
	/**
164
	 * @param ConnectionInterface $connection
165
	 */
166
	function onOpen (ConnectionInterface $connection) {
167
		$this->clients->attach($connection);
168
	}
169
	/**
170
	 * @param ConnectionInterface $connection
171
	 * @param string              $message
172
	 */
173
	function onMessage (ConnectionInterface $connection, $message) {
174
		$from_master = $connection === $this->connection_to_master;
175
		if (!$this->parse_message($message, $action, $details, $send_to, $target)) {
176
			if (!$from_master) {
177
				$connection->close();
178
			}
179
			return;
180
		}
181
		switch ($action) {
182
			/**
183
			 * Connection to master server as server (by default all connections considered as clients)
184
			 */
185
			case "Server/connect:$this->security_key":
186
				/**
187
				 * Under certain circumstances it may happen so that one server become available through multiple addresses,
188
				 * in this case we need to remove one of them from list of pools
189
				 */
190
				if ($details['from_slave'] === $this->address) {
191
					$this->pool->del($details['to_master']);
192
					$connection->close();
193
				} else {
194
					$this->clients->detach($connection);
195
					$this->servers->attach($connection);
196
				}
197
				return;
198
			/**
199
			 * Internal connection from application
200
			 */
201
			case "Application/Internal:$this->security_key":
202
				/** @noinspection PhpUndefinedFieldInspection */
203
				if ($this->parse_message($details, $action_, $details_, $send_to_, $target_)) {
204
					$connection->close();
205
					$this->send_to_clients($action_, $details_, $send_to_, $target_);
206
				}
207
				return;
208
			case 'Client/authentication':
209
				$Session = Session::instance();
210
				/** @noinspection PhpUndefinedFieldInspection */
211
				$session = $Session->get($connection->session_id);
212
				/** @noinspection PhpUndefinedFieldInspection */
213
				if (
214
					!$session ||
215
					!$Session->is_session_owner($session['id'], $connection->user_agent, $connection->remote_addr, $connection->ip)
216
				) {
217
					$connection->send(
218
						_json_encode(['Client/authentication:error', $this->compose_error(403)])
219
					);
220
					$connection->close();
221
					return;
222
				}
223
				$connection->user_id        = $session['user'];
224
				$connection->session_id     = $session['id'];
225
				$connection->session_expire = $session['expire'];
226
				$connection->groups         = User::instance()->get_groups($session['user']);
227
				$connection->send(
228
					_json_encode(['Client/authentication', 'ok'])
229
				);
230
		}
231
		if ($from_master) {
232
			$this->send_to_clients_internal($action, $details, $send_to, $target);
233
		} elseif ($this->servers->contains($connection)) {
234
			$this->broadcast_message_to_servers($message, $connection);
235
			if (!$send_to) {
236
				return;
237
			}
238
			$this->send_to_clients_internal($action, $details, $send_to, $target);
239
		} elseif (isset($connection->user_id)) {
240
			/** @noinspection PhpUndefinedFieldInspection */
241
			Event::instance()->fire(
242
				"WebSockets/message",
243
				[
244
					'action'     => $action,
245
					'details'    => $details,
246
					'language'   => $connection->language,
247
					'user'       => $connection->user_id,
248
					'session'    => $connection->session_id,
249
					'connection' => $connection
250
				]
251
			);
252
		}
253
	}
254
	/**
255
	 * @param string    $message
256
	 * @param string    $action
257
	 * @param mixed     $details
258
	 * @param int|int[] $send_to
259
	 * @param int       $target
260
	 *
261
	 * @return bool
262
	 */
263
	protected function parse_message ($message, &$action, &$details, &$send_to, &$target) {
264
		$decoded_message = _json_decode($message);
265
		if (
266
			!is_array($decoded_message) ||
267
			!array_key_exists(0, $decoded_message) ||
268
			!array_key_exists(1, $decoded_message)
269
		) {
270
			return false;
271
		}
272
		list($action, $details) = $decoded_message;
273
		$send_to = isset($decoded_message[2]) ? $decoded_message[2] : 0;
274
		$target  = isset($decoded_message[3]) ? $decoded_message[3] : false;
275
		return true;
276
	}
277
	/**
278
	 * @param string                   $message
279
	 * @param ConnectionInterface|null $skip_server
280
	 */
281
	protected function broadcast_message_to_servers ($message, $skip_server = null) {
282
		foreach ($this->servers as $server) {
283
			if ($server === $skip_server) {
284
				continue;
285
			}
286
			$server->send($message);
287
		}
288
	}
289
	/**
290
	 * Compose error
291
	 *
292
	 * @param int         $error_code    HTTP status code
293
	 * @param null|string $error_message String representation of status code
294
	 *
295
	 * @return array Array to be passed as details to `::send_to_clients()`
0 ignored issues
show
Documentation introduced by
Consider making the return type a bit more specific; maybe use array<integer|null|string>.

This check looks for the generic type array as a return type and suggests a more specific type. This type is inferred from the actual code.

Loading history...
296
	 */
297
	function compose_error ($error_code, $error_message = null) {
298
		$error_message = $error_message ?: status_code_string($error_code);
299
		return [$error_code, $error_message];
300
	}
301
	/**
302
	 * Send request to client
303
	 *
304
	 * @param string          $action
305
	 * @param mixed           $details
306
	 * @param int             $send_to Constants `self::SEND_TO*` should be used here
307
	 * @param false|int|int[] $target  Id or array of ids in case of response to one or several users or groups
308
	 */
309
	function send_to_clients ($action, $details, $send_to, $target = false) {
310
		$message = _json_encode([$action, $details, $send_to, $target]);
311
		/**
312
		 * If server running in current process
313
		 */
314
		if ($this->io_server) {
315
			if ($this->connection_to_master) {
316
				$this->connection_to_master->send($message);
317
			} else {
318
				$this->broadcast_message_to_servers($message);
319
			}
320
			$this->send_to_clients_internal($action, $details, $send_to, $target);
321
			return;
322
		}
323
		$servers = $this->pool->get_all();
324
		if ($servers) {
325
			shuffle($servers);
326
			$loop      = Loop_factory::create();
327
			$connector = new Client_factory($loop);
328
			$connector($servers[0])->then(
329
				function (Client_websocket $connection) use ($message) {
330
					$connection->send(
331
						_json_encode(["Application/Internal:$this->security_key", $message])
332
					);
333
					// Connection will be closed by server itself, no need to stop loop here
334
				},
335
				function () use ($loop) {
336
					$loop->stop();
337
				}
338
			);
339
			$loop->run();
340
		}
341
	}
342
	/**
343
	 * Send request to client
344
	 *
345
	 * @param string                  $action
346
	 * @param mixed                   $details
347
	 * @param int                     $send_to Constants `self::SEND_TO_*` should be used here
348
	 * @param false|int|int[]|mixed[] $target  Id or array of ids in case of response to one or several users or groups, [property, value] for filter
349
	 */
350
	protected function send_to_clients_internal ($action, $details, $send_to, $target = false) {
351
		$message = _json_encode([$action, $details]);
352
		/**
353
		 * Special system actions
354
		 */
355
		switch ($action) {
356
			case 'Server/close_by_session':
357
				foreach ($this->clients as $client) {
358
					if ($client->session_id == $details) {
359
						$client->send(_json_encode('Server/close'));
360
						$client->close();
361
					}
362
				}
363
				return;
364
			case 'Server/close_by_user':
365
				foreach ($this->clients as $client) {
366
					if ($client->user_id == $details) {
367
						$client->send(_json_encode('Server/close'));
368
						$client->close();
369
					}
370
				}
371
				return;
372
		}
373
		switch ($send_to) {
374
			case self::SEND_TO_ALL:
375
				foreach ($this->clients as $client) {
376
					$client->send($message);
377
				}
378
				break;
379
			case self::SEND_TO_REGISTERED_USERS:
380
				foreach ($this->clients as $client) {
381
					if (isset($client->user_id)) {
382
						$this->send_to_client_if_not_expire($client, $message);
383
					}
384
				}
385
				break;
386
			case self::SEND_TO_SPECIFIC_USERS:
387
				$target = (array)$target;
388
				foreach ($this->clients as $client) {
389
					if (isset($client->user_id) && in_array($client->user_id, $target)) {
390
						$this->send_to_client_if_not_expire($client, $message);
391
					}
392
				}
393
				break;
394
			case self::SEND_TO_USERS_GROUP:
395
				$target = (array)$target;
396
				foreach ($this->clients as $client) {
397
					if (isset($client->user_groups) && array_intersect($client->user_groups, $target)) {
398
						$this->send_to_client_if_not_expire($client, $message);
399
					}
400
				}
401
				break;
402
			case self::SEND_TO_FILTER:
403
				list($property, $value) = $target;
404
				foreach ($this->clients as $client) {
405
					if (isset($client->$property) && $client->$property === $value) {
406
						$this->send_to_client_if_not_expire($client, $message);
407
					}
408
				}
409
				break;
410
		}
411
	}
412
	/**
413
	 * If session not expire - will send message, otherwise will disconnect
414
	 *
415
	 * @param ConnectionInterface $client
416
	 * @param string              $message
417
	 */
418
	protected function send_to_client_if_not_expire ($client, $message) {
419
		/** @noinspection PhpUndefinedFieldInspection */
420
		if ($client->session_expire >= time()) {
421
			$client->send($message);
422
		} else {
423
			$client->close();
424
		}
425
	}
426
	/**
427
	 * Close all client connections by specified session id
428
	 *
429
	 * @param string $session_id
430
	 */
431
	function close_by_session ($session_id) {
432
		$this->send_to_clients('Server/close_by_session', $session_id, 0);
433
	}
434
	/**
435
	 * Close all client connections by specified user id
436
	 *
437
	 * @param string $user_id
438
	 */
439
	function close_by_user ($user_id) {
440
		$this->send_to_clients('Server/close_by_user', $user_id, 0);
441
	}
442
	/**
443
	 * Connect to master server
444
	 *
445
	 * Two trials, if server do not respond twice - it will be removed from servers pool, and next server will become master
446
	 */
447
	protected function connect_to_master () {
448
		static $last_trial = '';
449
		// Add server to connections pool and connect to master if any
450
		$this->pool->add($this->address);
451
		$master = $this->pool->get_master();
452
		if ($master && $master != $this->address) {
453
			call_user_func($this->client_connector, $master)->then(
454
				function (Client_websocket $connection) use (&$last_trial, $master) {
455
					$last_trial                 = '';
456
					$this->connection_to_master = $connection;
457
					$connection->on(
458
						'message',
459
						function ($message) use ($connection) {
460
							$this->onMessage($connection, $message);
461
						}
462
					);
463
					$connection->on(
464
						'error',
465
						function () use ($connection) {
466
							$connection->close();
467
						}
468
					);
469
					$connection->on(
470
						'close',
471
						function () {
472
							$this->connection_to_master = null;
473
							$this->loop->addTimer(
474
								1,
475
								function () {
476
									$this->connect_to_master();
477
								}
478
							);
479
						}
480
					);
481
					/**
482
					 * Tell master that we are server also, not regular client
483
					 */
484
					$connection->send(
485
						_json_encode(
486
							[
487
								"Server/connect:$this->security_key",
488
								[
489
									'to_master'  => $master,
490
									'from_slave' => $this->address
491
								]
492
							]
493
						)
494
					);
495
				},
496
				function () use (&$last_trial, $master) {
497
					if ($last_trial == $master) {
498
						$this->pool->del($master);
499
					} else {
500
						$last_trial = $master;
501
					}
502
					$this->loop->addTimer(
503
						1,
504
						function () {
505
							$this->connect_to_master();
506
						}
507
					);
508
					$this->connect_to_master();
509
				}
510
			);
511
		} else {
512
			$last_trial = '';
513
			/**
514
			 * Sometimes other servers may loose connection with master server, so new master will be selected and we need to handle this nicely
515
			 */
516
			$this->loop->addTimer(
517
				30,
518
				function () {
519
					$this->connect_to_master();
520
				}
521
			);
522
		}
523
	}
524
	/**
525
	 * Get event loop instance
526
	 *
527
	 * @return \React\EventLoop\LoopInterface
528
	 */
529
	function get_loop () {
530
		return $this->loop;
531
	}
532
	/**
533
	 * @param ConnectionInterface $connection
534
	 */
535
	function onClose (ConnectionInterface $connection) {
536
		/**
537
		 * Generate pseudo-event when client is disconnected
538
		 */
539
		if (isset($connection->user_id) && $this->clients->contains($connection)) {
540
			/** @noinspection PhpUndefinedFieldInspection */
541
			Event::instance()->fire(
542
				"WebSockets/message",
543
				[
544
					'action'     => 'Client/disconnection',
545
					'details'    => null,
546
					'language'   => $connection->language,
547
					'user'       => $connection->user_id,
548
					'session'    => $connection->session_id,
549
					'connection' => $connection
550
				]
551
			);
552
		}
553
		// The connection is closed, remove it, as we can no longer send it messages
554
		$this->clients->detach($connection);
555
		$this->servers->detach($connection);
556
	}
557
	/**
558
	 * @param ConnectionInterface $connection
559
	 * @param Exception           $e
560
	 */
561
	function onError (ConnectionInterface $connection, Exception $e) {
562
		$connection->close();
563
	}
564
	function __destruct () {
565
		$this->pool->del($this->address);
566
	}
567
}
568