Completed
Push — master ( 6c49a0...37a203 )
by Andrew
53s
created

MessagesHandler.parse_imgs()   A

Complexity

Conditions 4

Size

Total Lines 14

Duplication

Lines 12
Ratio 85.71 %

Importance

Changes 2
Bugs 1 Features 0
Metric Value
cc 4
c 2
b 1
f 0
dl 12
loc 14
rs 9.2
1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from numbers import Number
7
from threading import Thread
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado import ioloop
20
from tornado.websocket import WebSocketHandler
21
22
from chat.cookies_middleware import create_id
23
from chat.log_filters import id_generator
24
from chat.utils import extract_photo
25
from chat.utils import get_or_create_ip
26
27
try:  # py2
28
	from urlparse import urlparse
29
except ImportError:  # py3
30
	from urllib.parse import urlparse
31
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM, \
32
	TORNADO_REDIS_PORT, WEBRTC_CONNECTION
33
from chat.models import User, Message, Room, get_milliseconds, UserJoinedInfo, RoomUsers, Image
34
35
PY3 = sys.version > '3'
36
str_type = str if PY3 else basestring
37
38
39
sessionStore = SessionStore()
40
41
parent_logger = logging.getLogger(__name__)
42
base_logger = logging.LoggerAdapter(parent_logger, {
43
	'id': 0,
44
	'ip': '000.000.000.000'
45
})
46
47
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
48
#CONNECTION_POOL = tornadoredis.ConnectionPool(
49
#	max_connections=500,
50
#	wait_for_available=True)
51
52
53
class Actions(object):
54
	LOGIN = 'addOnlineUser'
55
	SET_WS_ID = 'setWsId'
56
	LOGOUT = 'removeOnlineUser'
57
	SEND_MESSAGE = 'sendMessage'
58
	PRINT_MESSAGE = 'printMessage'
59
	WEBRTC = 'sendRtcData'
60
	CLOSE_FILE_CONNECTION = 'destroyFileConnection'
61
	CLOSE_CALL_CONNECTION = 'destroyCallConnection'
62
	CANCEL_CALL_CONNECTION = 'cancelCallConnection'
63
	ACCEPT_CALL = 'acceptCall'
64
	ACCEPT_FILE = 'acceptFile'
65
	ROOMS = 'setRooms'
66
	REFRESH_USER = 'setOnlineUsers'
67
	GROWL_MESSAGE = 'growl'
68
	GET_MESSAGES = 'loadMessages'
69
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
70
	DELETE_ROOM = 'deleteRoom'
71
	EDIT_MESSAGE = 'editMessage'
72
	DELETE_MESSAGE = 'deleteMessage'
73
	CREATE_ROOM_CHANNEL = 'addRoom'
74
	INVITE_USER = 'inviteUser'
75
	ADD_USER = 'addUserToDom'
76
	OFFLINE_MESSAGES = 'loadOfflineMessages'
77
	SET_WEBRTC_ID = 'setConnectionId'
78
	SET_WEBRTC_ERROR = 'setError'
79
	OFFER_FILE_CONNECTION = 'offerFile'
80
	OFFER_CALL_CONNECTION = 'offerCall'
81
	REPLY_FILE_CONNECTION = 'replyFile'
82
	REPLY_CALL_CONNECTION = 'replyCall'
83
84
85
class VarNames(object):
86
	WEBRTC_QUED_ID = 'id'
87
	USER = 'user'
88
	USER_ID = 'userId'
89
	TIME = 'time'
90
	CONTENT = 'content'
91
	IMG = 'images'
92
	IMG_B64 = 'b64'
93
	IMG_FILE_NAME = 'fileName'
94
	EVENT = 'action'
95
	MESSAGE_ID = 'id'
96
	GENDER = 'sex'
97
	ROOM_NAME = 'name'
98
	ROOM_ID = 'roomId'
99
	ROOM_USERS = 'users'
100
	CHANNEL = 'channel'
101
	WEBRTC_OPPONENT_ID = 'opponentWsId'
102
	GET_MESSAGES_COUNT = 'count'
103
	GET_MESSAGES_HEADER_ID = 'headerId'
104
	CHANNEL_NAME = 'channel'
105
	IS_ROOM_PRIVATE = 'private'
106
	CONNECTION_ID = 'connId'
107
	HANDLER_NAME = 'handler'
108
109
110
class HandlerNames:
111
	CHANNELS = 'channels'
112
	CHAT = 'chat'
113
	GROWL = 'growl'
114
	WEBRTC = 'webrtc'
115
	PEER_CONNECTION = 'peerConnection'
116
	WEBRTC_TRANSFER = 'webrtcTransfer'
117
	WS = 'ws'
118
119
120
class WebRtcRedisStates:
121
	RESPONDED = 'responded'
122
	READY = 'ready'
123
	OFFERED = 'offered'
124
	CLOSED = 'closed'
125
126
127
class RedisPrefix:
128
	USER_ID_CHANNEL_PREFIX = 'u'
129
	DEFAULT_CHANNEL = ALL_ROOM_ID
130
	CONNECTION_ID_LENGTH = 8  # should be secure
131
132
	@classmethod
133
	def generate_user(cls, key):
134
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
135
136
137
class MessagesCreator(object):
138
139
	def __init__(self, *args, **kwargs):
140
		self.sex = None
141
		self.sender_name = None
142
		self.id = None  # child init
143
		self.user_id = 0  # anonymous by default
144
145
	def default(self, content, event, handler):
146
		"""
147
		:return: {"action": event, "content": content, "time": "20:48:57"}
148
		"""
149
		return {
150
			VarNames.EVENT: event,
151
			VarNames.CONTENT: content,
152
			VarNames.USER_ID: self.user_id,
153
			VarNames.TIME: get_milliseconds(),
154
			VarNames.HANDLER_NAME: handler
155
		}
156
157
	def reply_webrtc(self, event, connection_id):
158
		"""
159
		:return: {"action": event, "content": content, "time": "20:48:57"}
160
		"""
161
		return {
162
			VarNames.EVENT: event,
163
			VarNames.CONNECTION_ID: connection_id,
164
			VarNames.USER_ID: self.user_id,
165
			VarNames.USER: self.sender_name,
166
			VarNames.WEBRTC_OPPONENT_ID: self.id,
167
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
168
		}
169
170
	def set_ws_id(self, random, self_id):
171
		return {
172
			VarNames.HANDLER_NAME: HandlerNames.WS,
173
			VarNames.EVENT: Actions.SET_WS_ID,
174
			VarNames.CONTENT: random,
175
			VarNames.WEBRTC_OPPONENT_ID: self_id
176
		}
177
178
	def room_online(self, online, event, channel):
179
		"""
180
		:return: {"action": event, "content": content, "time": "20:48:57"}
181
		"""
182
		room_less = self.default(online, event, HandlerNames.CHAT)
183
		room_less[VarNames.CHANNEL_NAME] = channel
184
		room_less[VarNames.USER] = self.sender_name
185
		room_less[VarNames.GENDER] = self.sex
186
		return room_less
187
188
	def offer_webrtc(self, content, connection_id, room_id, action):
189
		"""
190
		:return: {"action": "call", "content": content, "time": "20:48:57"}
191
		"""
192
		message = self.default(content, action, HandlerNames.WEBRTC)
193
		message[VarNames.USER] = self.sender_name
194
		message[VarNames.CONNECTION_ID] = connection_id
195
		message[VarNames.WEBRTC_OPPONENT_ID] = self.id
196
		message[VarNames.CHANNEL] = room_id
197
		return message
198
199
	def set_connection_id(self, qued_id, connection_id):
200
		return {
201
			VarNames.EVENT: Actions.SET_WEBRTC_ID,
202
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC,
203
			VarNames.CONNECTION_ID: connection_id,
204
			VarNames.WEBRTC_QUED_ID: qued_id
205
		}
206
207
	def set_webrtc_error(self, error, connection_id, qued_id=None):
208
		message = self.default(error, Actions.SET_WEBRTC_ERROR, HandlerNames.PEER_CONNECTION) # TODO file/call
209
		message[VarNames.CONNECTION_ID] = connection_id
210
		if qued_id:
211
			message[VarNames.WEBRTC_QUED_ID] = qued_id
212
		return message
213
214
	@classmethod
215
	def create_message(cls, message, images):
216
		res = {
217
			VarNames.USER_ID: message.sender_id,
218
			VarNames.CONTENT: message.content,
219
			VarNames.TIME: message.time,
220
			VarNames.MESSAGE_ID: message.id,
221
			VarNames.IMG: images
222
		}
223
		return res
224
225
	@classmethod
226
	def create_send_message(cls, message, event, imgs):
227
		"""
228
		:param message:
229
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
230
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
231
		"""
232
		res = cls.create_message(message, imgs)
233
		res[VarNames.EVENT] = event
234
		res[VarNames.CHANNEL] = message.room_id
235
		res[VarNames.HANDLER_NAME] = HandlerNames.CHAT
236
		return res
237
238
	@classmethod
239
	def append_images(cls, messages, images):
240
		res_mess = []
241
		for message in messages:
242
			res_images = cls.prepare_img(images, message.id)
243
			res_mess.append(cls.create_message(message, res_images))
244
		return res_mess
245
246
	@classmethod
247
	def prepare_img(cls, images, message_id):
248
		if images:
249
			return {x.symbol: x.img.url for x in images if x.message_id == message_id}
250
251
	@classmethod
252
	def get_messages(cls, messages, channel, images):
253
		"""
254
		:type messages: list[Messages]
255
		:type channel: str
256
		:type messages: QuerySet[Messages]
257
		"""
258
		return {
259
			VarNames.CONTENT: cls.append_images(messages, images),
260
			VarNames.EVENT: Actions.GET_MESSAGES,
261
			VarNames.CHANNEL: channel,
262
			VarNames.HANDLER_NAME: HandlerNames.CHAT
263
		}
264
265
	@property
266
	def channel(self):
267
		return RedisPrefix.generate_user(self.user_id)
268
269
	def subscribe_direct_channel_message(self, room_id, other_user_id):
270
		return {
271
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
272
			VarNames.ROOM_ID: room_id,
273
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
274
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS
275
		}
276
277
	def subscribe_room_channel_message(self, room_id, room_name):
278
		return {
279
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
280
			VarNames.ROOM_ID: room_id,
281
			VarNames.ROOM_USERS: [self.user_id],
282
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
283
			VarNames.ROOM_NAME: room_name
284
		}
285
286
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
287
		return {
288
			VarNames.EVENT: Actions.INVITE_USER,
289
			VarNames.ROOM_ID: room_id,
290
			VarNames.USER_ID: user_id,
291
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
292
			VarNames.ROOM_NAME: room_name,
293
			VarNames.CONTENT: users
294
		}
295
296
	def add_user_to_room(self, channel, user_id, content):
297
		return {
298
			VarNames.EVENT: Actions.ADD_USER,
299
			VarNames.CHANNEL: channel,
300
			VarNames.USER_ID: user_id,
301
			VarNames.HANDLER_NAME: HandlerNames.CHAT,
302
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
303
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
304
		}
305
306
	def unsubscribe_direct_message(self, room_id):
307
		return {
308
			VarNames.EVENT: Actions.DELETE_ROOM,
309
			VarNames.ROOM_ID: room_id,
310
			VarNames.USER_ID: self.user_id,
311
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
312
			VarNames.TIME: get_milliseconds()
313
		}
314
315
	def load_offline_message(self, offline_messages, channel_key):
316
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
317
		res[VarNames.CHANNEL] = channel_key
318
		return res
319
320
321
class MessagesHandler(MessagesCreator):
322
323
	def __init__(self, *args, **kwargs):
324
		self.closed_channels = None
325
		self.parsable_prefix = 'p'
326
		super(MessagesHandler, self).__init__(*args, **kwargs)
327
		self.webrtc_ids = {}
328
		self.ip = None
329
		from chat import global_redis
330
		self.async_redis_publisher = global_redis.async_redis_publisher
331
		self.sync_redis = global_redis.sync_redis
332
		self.channels = []
333
		self._logger = None
334
		self.async_redis = tornadoredis.Client(port=TORNADO_REDIS_PORT)
335
		self.patch_tornadoredis()
336
		self.pre_process_message = {
337
			Actions.GET_MESSAGES: self.process_get_messages,
338
			Actions.SEND_MESSAGE: self.process_send_message,
339
			Actions.WEBRTC: self.proxy_webrtc,
340
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
341
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
342
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
343
			Actions.ACCEPT_CALL: self.accept_call,
344
			Actions.ACCEPT_FILE: self.accept_file,
345
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
346
			Actions.DELETE_ROOM: self.delete_channel,
347
			Actions.EDIT_MESSAGE: self.edit_message,
348
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
349
			Actions.INVITE_USER: self.invite_user,
350
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
351
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
352
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
353
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
354
		}
355
		self.post_process_message = {
356
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
357
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
358
			Actions.DELETE_ROOM: self.send_client_delete_channel,
359
			Actions.INVITE_USER: self.send_client_new_channel,
360
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
361
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
362
		}
363
364
	def patch_tornadoredis(self):  # TODO remove this
365
		fabric = type(self.async_redis.connection.readline)
366
		self.async_redis.connection.old_read = self.async_redis.connection.readline
367
		def new_read(new_self, callback=None):
368
			try:
369
				return new_self.old_read(callback=callback)
370
			except Exception as e:
371
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
372
				self.logger.error(e)
373
				self.logger.error(
374
					"Exception info: "
375
					"self.id: %s ;;; "
376
					"self.connected = '%s';;; "
377
					"Redis default channel online = '%s';;; "
378
					"self.channels = '%s';;; "
379
					"self.closed_channels  = '%s';;;",
380
					self.id, self.connected, current_online, self.channels, self.closed_channels
381
				)
382
				raise e
383
384
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
385
386
	@property
387
	def connected(self):
388
		raise NotImplemented
389
390
	@connected.setter
391
	def connected(self, value):
392
		raise NotImplemented
393
394
	@tornado.gen.engine
395
	def listen(self, channels):
396
		yield tornado.gen.Task(
397
			self.async_redis.subscribe, channels)
398
		self.async_redis.listen(self.pub_sub_message)
399
400
	@property
401
	def logger(self):
402
		return self._logger if self._logger else base_logger
403
404
	@tornado.gen.engine
405
	def add_channel(self, channel):
406
		self.channels.append(channel)
407
		yield tornado.gen.Task(
408
			self.async_redis.subscribe, (channel,))
409
410
	def evaluate(self, query_set):
411
		self.do_db(len, query_set)
412
		return query_set
413
414
	def do_db(self, callback, *args, **kwargs):
415
		try:
416
			return callback(*args, **kwargs)
417
		except (OperationalError, InterfaceError) as e:
418
			if 'MySQL server has gone away' in str(e):
419
				self.logger.warning('%s, reconnecting' % e)
420
				connection.close()
421
				return callback(*args, **kwargs)
422
			else:
423
				raise e
424
425
	def execute_query(self, query, *args, **kwargs):
426
		cursor = connection.cursor()
427
		cursor.execute(query, *args, **kwargs)
428
		desc = cursor.description
429
		return [
430
			dict(zip([col[0] for col in desc], row))
431
			for row in cursor.fetchall()
432
			]
433
434
	def get_online_from_redis(self, channel, check_self_online=False):
435
		"""
436
		:rtype : dict
437
		returns (dict, bool) if check_type is present
438
		"""
439
		online = self.sync_redis.smembers(channel)
440
		self.logger.debug('!! channel %s redis online: %s', channel, online)
441
		result = set()
442
		user_is_online = False
443
		# redis stores8 REDIS_USER_FORMAT, so parse them
444
		if online:
445
			for raw in online:  # py2 iteritems
446
				decoded = raw.decode('utf-8')
447
				# : char specified in cookies_middleware.py.create_id
448
				user_id = int(decoded.split(':')[0])
449
				if user_id == self.user_id and decoded != self.id:
450
					user_is_online = True
451
				result.add(user_id)
452
		result = list(result)
453
		return (result, user_is_online) if check_self_online else result
454
455
	def add_online_user(self, room_id, offline_messages=None):
456
		"""
457
		adds to redis
458
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
459
		:return:
460
		"""
461
		self.async_redis_publisher.sadd(room_id, self.id)
462
		# since we add user to online first, latest trigger will always show correct online
463
		online, is_online = self.get_online_from_redis(room_id, True)
464
		if not is_online:  # if a new tab has been opened
465
			online.append(self.user_id)
466
			online_user_names_mes = self.room_online(
467
				online,
468
				Actions.LOGIN,
469
				room_id
470
			)
471
			self.logger.info('!! First tab, sending refresh online for all')
472
			self.publish(online_user_names_mes, room_id)
473
			if offline_messages:
474
				self.ws_write(self.load_offline_message(offline_messages, room_id))
475
		else:  # Send user names to self
476
			online_user_names_mes = self.room_online(
477
				online,
478
				Actions.REFRESH_USER,
479
				room_id
480
			)
481
			self.logger.info('!! Second tab, retrieving online for self')
482
			self.ws_write(online_user_names_mes)
483
484
	def publish(self, message, channel, parsable=False):
485
		jsoned_mess = json.dumps(message)
486
		self.logger.debug('<%s> %s', channel, jsoned_mess)
487
		if parsable:
488
			jsoned_mess = self.encode(jsoned_mess)
489
		self.async_redis_publisher.publish(channel, jsoned_mess)
490
491
	def encode(self, message):
492
		"""
493
		Marks message with prefix to specify that
494
		it should be decoded and proccesed before sending to client
495
		@param message: message to mark
496
		@return: marked message
497
		"""
498
		return self.parsable_prefix + message
499
500
	def remove_parsable_prefix(self, message):
501
		if message.startswith(self.parsable_prefix):
502
			return message[1:]
503
504
	def pub_sub_message(self, message):
505
		data = message.body
506
		if isinstance(data, str_type):  # subscribe event
507
			prefixless_str = self.remove_parsable_prefix(data)
508
			if prefixless_str:
509
				dict_message = json.loads(prefixless_str)
510
				res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
511
				if not res:
512
					self.ws_write(prefixless_str)
513
			else:
514
				self.ws_write(data)
515
516
	def ws_write(self, message):
517
		raise NotImplementedError('WebSocketHandler implements')
518
519
	def process_send_message(self, message):
520
		"""
521
		:type message: dict
522
		"""
523
		channel = message[VarNames.CHANNEL]
524
		message_db = Message(
525
			sender_id=self.user_id,
526
			content=message[VarNames.CONTENT]
527
		)
528
		message_db.room_id = channel
529
		self.do_db(message_db.save)
530
		res_imgs = self.parse_imgs(message.get(VarNames.IMG), message_db.id)
531
		prepared_message = self.create_send_message(message_db, Actions.PRINT_MESSAGE, self.prepare_img(res_imgs, message_db.id))
532
		self.publish(prepared_message, channel)
533
534
	def parse_imgs(self, imgs, mess_id):
535
		res_imgs = []
536 View Code Duplication
		if not imgs:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
537
			return res_imgs
538
		fetch = False
539
		for k in imgs:
540
			b64 = imgs[k].get(VarNames.IMG_B64)
541
			if b64:
542
				img = extract_photo(imgs[k][VarNames.IMG_B64], imgs[k][VarNames.IMG_FILE_NAME])
543
				res_imgs.append(Image(message_id=mess_id, img=img, symbol=k))
544
			else:
545
				fetch = True
546
		self.merge_came_imgs_with_db(fetch, mess_id, res_imgs)
547
		return res_imgs
548
549
	def merge_came_imgs_with_db(self, fetch, mess_id, res_imgs):
550
		fetched_messages = None
551
		if fetch:
552
			fetched_messages = Image.objects.filter(message_id=mess_id)
553
		if res_imgs:
554
			Image.objects.bulk_create(res_imgs)
555
		if fetched_messages:
556 View Code Duplication
			for m in fetched_messages:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
557
				res_imgs.append(m)
558
559
	def close_file_connection(self, in_message):
560
		connection_id = in_message[VarNames.CONNECTION_ID]
561
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
562
		if not self_channel_status:
563
			raise Exception("Access Denied")
564
		if self_channel_status != WebRtcRedisStates.CLOSED:
565
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
566
			if sender_id == self.id:
567
				self.close_file_sender(connection_id)
568
			else:
569
				self.close_file_receiver(connection_id, in_message, sender_id)
570
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
571
572
	def close_call_connection(self, in_message):
573
		connection_id = in_message[VarNames.CONNECTION_ID]
574
		conn_users = self.sync_redis.shgetall(connection_id)
575
		if conn_users[self.id] in [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED]:
576
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
577
			del conn_users[self.id]
578
			message = {
579
				VarNames.EVENT: Actions.CLOSE_CALL_CONNECTION,
580
				VarNames.CONNECTION_ID: connection_id,
581
				VarNames.USER_ID: self.user_id,
582
				VarNames.WEBRTC_OPPONENT_ID: self.id,
583
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
584
			}
585
			for user in conn_users:
586
				if conn_users[user] != WebRtcRedisStates.CLOSED:
587
					self.publish(message, user)
588
		else:
589
			raise ValidationError("Invalid channel status.")
590
591
	def cancel_call_connection(self, in_message, reply_action):
592
		self.send_call_answer(in_message, WebRtcRedisStates.CLOSED, Actions.CANCEL_CALL_CONNECTION)
593
594
	def close_file_receiver(self, connection_id, in_message, sender_id):
595
		sender_status = self.sync_redis.shget(connection_id, sender_id)
596
		if not sender_status:
597
			raise Exception("Access denied")
598
		if sender_status != WebRtcRedisStates.CLOSED:
599
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
600
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
601
			self.publish(in_message, sender_id)
602
603
	def close_file_sender(self, connection_id):
604
		values = self.sync_redis.shgetall(connection_id)
605
		del values[self.id]
606
		for ws_id in values:
607
			if values[ws_id] == WebRtcRedisStates.CLOSED:
608
				continue
609
			self.publish({
610
				VarNames.EVENT: Actions.CLOSE_FILE_CONNECTION,
611
				VarNames.CONNECTION_ID: connection_id,
612
				VarNames.WEBRTC_OPPONENT_ID: self.id,
613
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
614
			}, ws_id)
615
616
	def accept_file(self, in_message):
617
		connection_id = in_message[VarNames.CONNECTION_ID] # TODO accept all if call
618
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
619
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
620
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
621
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.RESPONDED:
622
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
623
			self.publish({
624
				VarNames.EVENT: Actions.ACCEPT_FILE,
625 View Code Duplication
				VarNames.CONNECTION_ID: connection_id,
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
626
				VarNames.WEBRTC_OPPONENT_ID: self.id,
627
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
628
			}, sender_ws_id)
629
		else:
630
			raise ValidationError("Invalid channel status")
631
632
	# todo
633
	# we can use channel_status = self.sync_redis.shgetall(connection_id)
634
	# and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
635
	# if we shgetall and only then do async hset
636
	# we can catch an issue when 2 concurrent users accepted the call
637
	# but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
638
	def accept_call(self, in_message):
639
		connection_id = in_message[VarNames.CONNECTION_ID]
640
		self_status = self.sync_redis.shget(connection_id, self.id)
641
		if self_status == WebRtcRedisStates.RESPONDED:
642
			self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.READY)
643
			channel_status = self.sync_redis.shgetall(connection_id)
644
			del channel_status[self.id]
645 View Code Duplication
			message = {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
646
				VarNames.EVENT: Actions.ACCEPT_CALL,
647
				VarNames.USER_ID: self.user_id,
648
				VarNames.CONNECTION_ID: connection_id,
649
				VarNames.WEBRTC_OPPONENT_ID: self.id,
650
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
651
			}
652
			for key in channel_status:
653
				if channel_status[key] != WebRtcRedisStates.CLOSED:
654
					self.publish(message, key)
655
		else:
656
			raise ValidationError("Invalid channel status")
657
658
	def offer_webrtc_connection(self, in_message):
659
		room_id = in_message[VarNames.CHANNEL]
660
		content = in_message.get(VarNames.CONTENT)
661
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
662
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
663
		# use list because sets dont have 1st element which is offerer
664
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
665
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
666
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
667
		self_message = self.set_connection_id(qued_id, connection_id)
668
		self.ws_write(self_message)
669
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
670
		self.publish(opponents_message, room_id, True)
671
672
	def reply_call_connection(self, in_message):
673
		self.send_call_answer(in_message, WebRtcRedisStates.RESPONDED, Actions.REPLY_CALL_CONNECTION)
674
675
	def send_call_answer(self, in_message, status_set, reply_action):
676
		connection_id = in_message[VarNames.CONNECTION_ID]
677
		conn_users = self.sync_redis.shgetall(connection_id)
678
		if conn_users[self.id] == WebRtcRedisStates.OFFERED:
679
			self.async_redis_publisher.hset(connection_id, self.id, status_set)
680
			del conn_users[self.id]
681
			message = self.reply_webrtc(reply_action, connection_id)
682
			for user in conn_users:
683
				if conn_users[user] != WebRtcRedisStates.CLOSED:
684
					self.publish(message, user)
685
		else:
686
			raise ValidationError("Invalid channel status.")
687
688
	def reply_file_connection(self, in_message):
689
		connection_id = in_message[VarNames.CONNECTION_ID]
690
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
691
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
692
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
693
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
694
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
695
			self.publish(self.reply_webrtc(Actions.REPLY_FILE_CONNECTION, connection_id), sender_ws_id)
696
		else:
697
			raise ValidationError("Invalid channel status.")
698
699
	def proxy_webrtc(self, in_message):
700
		"""
701
		:type in_message: dict
702
		"""
703
		connection_id = in_message[VarNames.CONNECTION_ID]
704
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
705
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
706
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
707
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
708
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
709
				self_channel_status, opponent_channel_status
710
			)) # todo receiver should only accept proxy_webrtc from sender, sender can accept all
711
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
712
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
713
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
714
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
715
		self.logger.debug("Forwarding message to channel %s, self %s, other status %s",
716
			channel, self_channel_status, opponent_channel_status
717
		)
718
		self.publish(in_message, channel)
719
720
	def create_new_room(self, message):
721
		room_name = message[VarNames.ROOM_NAME]
722
		if not room_name or len(room_name) > 16:
723
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
724
		room = Room(name=room_name)
725
		self.do_db(room.save)
726
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
727
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
728
		self.publish(subscribe_message, self.channel, True)
729
730
	def invite_user(self, message):
731
		room_id = message[VarNames.ROOM_ID]
732
		user_id = message[VarNames.USER_ID]
733
		if room_id not in self.channels:
734
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
735
		room = self.do_db(Room.objects.get, id=room_id)
736
		if room.is_private:
737
			raise ValidationError("You can't add users to direct room, create a new room instead")
738
		try:
739
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
740
		except IntegrityError:
741
			raise ValidationError("User is already in channel")
742
		users_in_room = {}
743
		for user in room.users.all():
744
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
745
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
746
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
747
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
748
749
	def create_room(self, user_rooms, user_id):
750
		if self.user_id == user_id:
751
			room_ids = list([room['room_id'] for room in self.evaluate(user_rooms)])
752
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
753
		else:
754
			rooms_query = RoomUsers.objects.filter(user_id=user_id, room__in=user_rooms)
755
			query_res = rooms_query.values('room__id', 'room__disabled')
756
		try:
757
			room = self.do_db(query_res.get)
758
			room_id = room['room__id']
759
			self.update_room(room_id, room['room__disabled'])
760
		except RoomUsers.DoesNotExist:
761
			room = Room()
762
			room.save()
763
			room_id = room.id
764
			if self.user_id == user_id:
765
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
766
			else:
767
				RoomUsers.objects.bulk_create([
768
					RoomUsers(user_id=user_id, room_id=room_id),
769
					RoomUsers(user_id=self.user_id, room_id=room_id),
770
				])
771
		return room_id
772
773
	def update_room(self, room_id, disabled):
774
		if not disabled:
775
			raise ValidationError('This room already exist')
776
		else:
777
			Room.objects.filter(id=room_id).update(disabled=False)
778
779
	def create_user_channel(self, message):
780
		user_id = message[VarNames.USER_ID]
781
		# get all self private rooms ids
782
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
783
		# get private room that contains another user from rooms above
784
		room_id = self.create_room(user_rooms, user_id)
785
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
786
		self.publish(subscribe_message, self.channel, True)
787
		other_channel = RedisPrefix.generate_user(user_id)
788
		if self.channel != other_channel:
789
			self.publish(subscribe_message, other_channel, True)
790
791
	def delete_channel(self, message):
792
		room_id = message[VarNames.ROOM_ID]
793
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
794
			raise ValidationError('You are not allowed to exit this room')
795
		room = self.do_db(Room.objects.get, id=room_id)
796
		if room.disabled:
797
			raise ValidationError('Room is already deleted')
798
		if room.name is None:  # if private then disable
799
			room.disabled = True
800
		else: # if public -> leave the room, delete the link
801
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
802
			online = self.get_online_from_redis(room_id)
803
			online.remove(self.user_id)
804
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
805
		room.save()
806
		message = self.unsubscribe_direct_message(room_id)
807
		self.publish(message, room_id, True)
808
809
	def edit_message(self, data):
810
		message_id = data[VarNames.MESSAGE_ID]
811
		message = Message.objects.get(id=message_id)
812
		if message.sender_id != self.user_id:
813
			raise ValidationError("You can only edit your messages")
814
		if message.time + 600000 < get_milliseconds():
815
			raise ValidationError("You can only edit messages that were send not more than 10 min ago")
816
		if message.deleted:
817
			raise ValidationError("Already deleted")
818
		message.content = data[VarNames.CONTENT]
819
		selector = Message.objects.filter(id=message_id)
820
		if message.content is None:
821
			imgs = None
822
			selector.update(deleted=True)
823
			action = Actions.DELETE_MESSAGE
824
		else:
825
			imgs = self.parse_imgs(data.get(VarNames.IMG), message.id)
826
			prep_imgs = self.prepare_img(imgs, message_id)
827
			action = Actions.EDIT_MESSAGE
828
			selector.update(content=message.content)
829
		self.publish(self.create_send_message(message, action, prep_imgs), message.room_id)
830
831
	def send_client_new_channel(self, message):
832
		room_id = message[VarNames.ROOM_ID]
833
		self.add_channel(room_id)
834
		self.add_online_user(room_id)
835
836
	def set_opponent_call_channel(self, message):
837
		connection_id = message[VarNames.CONNECTION_ID]
838
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
839
			return True
840
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
841
842
	def send_client_delete_channel(self, message):
843
		room_id = message[VarNames.ROOM_ID]
844
		self.async_redis.unsubscribe((room_id,))
845
		self.async_redis_publisher.hdel(room_id, self.id)
846
		self.channels.remove(room_id)
847
848
	def process_get_messages(self, data):
849
		"""
850
		:type data: dict
851
		"""
852
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
853
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
854
		room_id = data[VarNames.CHANNEL]
855
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
856
		if header_id is None:
857
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
858
		else:
859
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
860
		images = self.do_db(self.get_message_images, messages)
861
		response = self.get_messages(messages, room_id, images)
862
		self.ws_write(response)
863
864
	def get_message_images(self, messages):
865
		ids =[message.id for message in messages]
866
		images = Image.objects.filter(message_id__in=ids)
867
		self.logger.info('!! Messages have %d images', len(images))
868
		return images
869
870
	def get_offline_messages(self):
871
		res = {}
872
		off_mess = Message.objects.filter(
873
			id__gt=F('room__roomusers__last_read_message_id'),
874
			deleted=False,
875
			room__roomusers__user_id=self.user_id
876
		)
877
		images = self.do_db(self.get_message_images, off_mess)
878
		for message in off_mess:
879
			prep_m = self.create_message(message, self.prepare_img(images, message.id))
880
			res.setdefault(message.room_id, []).append(prep_m)
881
		return res
882
883
	def get_users_in_current_user_rooms(self):
884
		"""
885
		{
886
			"ROOM_ID:1": {
887
				"name": "All",
888
				"users": {
889
					"USER_ID:admin": {
890
						"name": "USER_NAME:admin",
891
						"sex": "SEX:Secret"
892
					},
893
					"USER_ID_2": {
894
						"name": "USER_NAME:Mike",
895
						"sex": "Male"
896
					}
897
				},
898
				"isPrivate": true
899
			}
900
		}
901
		"""
902
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
903
		res = {room['id']: {
904
				VarNames.ROOM_NAME: room['name'],
905
				VarNames.ROOM_USERS: {}
906
			} for room in user_rooms}
907
		room_ids = (room_id for room_id in res)
908
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
909
		for user in rooms_users:
910
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
911
		return res
912
913
	def set_js_user_structure(self, user_dict, user_id, name, sex):
914
		user_dict[user_id] = {
915
			VarNames.USER: name,
916
			VarNames.GENDER: GENDERS[sex]
917
		}
918
919
	def save_ip(self):
920
		if (self.do_db(UserJoinedInfo.objects.filter(
921
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
922
			return
923
		ip_address = get_or_create_ip(self.ip, self.logger)
924
		UserJoinedInfo.objects.create(
925
			ip=ip_address,
926
			user_id=self.user_id
927
		)
928
929
	def publish_logout(self, channel, log_data):
930
		# seems like async solves problem with connection lost and wrong data status
931
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
932
		online, is_online = self.get_online_from_redis(channel, True)
933
		log_data[channel] = {'online': online, 'is_online': is_online}
934
		if not is_online:
935
			message = self.room_online(online, Actions.LOGOUT, channel)
936
			self.publish(message, channel)
937
			return True
938
939
940
class AntiSpam(object):
941
942
	def __init__(self):
943
		self.spammed = 0
944
		self.info = {}
945
946
	def check_spam(self, json_message):
947
		message_length = len(json_message)
948
		info_key = int(round(time.time() * 100))
949
		self.info[info_key] = message_length
950
		if message_length > MAX_MESSAGE_SIZE:
951
			self.spammed += 1
952
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
953
		self.check_timed_spam()
954
955
	def check_timed_spam(self):
956
		# TODO implement me
957
		pass
958
		# raise ValidationError("You're chatting too much, calm down a bit!")
959
960
961
class TornadoHandler(WebSocketHandler, MessagesHandler):
962
963
	def __init__(self, *args, **kwargs):
964
		super(TornadoHandler, self).__init__(*args, **kwargs)
965
		self.__connected__ = False
966
		self.anti_spam = AntiSpam()
967
968
	@property
969
	def connected(self):
970
		return self.__connected__
971
972
	@connected.setter
973
	def connected(self, value):
974
		self.__connected__ = value
975
976
	def data_received(self, chunk):
977
		pass
978
979
	def on_message(self, json_message):
980
		try:
981
			if not self.connected:
982
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
983
			if not json_message:
984
				raise Exception('Skipping null message')
985
			# self.anti_spam.check_spam(json_message)
986
			self.logger.debug('<< %.1000s', json_message)
987
			message = json.loads(json_message)
988
			if message[VarNames.EVENT] not in self.pre_process_message:
989
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
990
			channel = message.get(VarNames.CHANNEL)
991
			if channel and channel not in self.channels:
992
				raise Exception('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
993
			self.pre_process_message[message[VarNames.EVENT]](message)
994
		except ValidationError as e:
995
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
996
			self.ws_write(error_message)
997
998
	def on_close(self):
999
		if self.async_redis.subscribed:
1000
			self.logger.info("Close event, unsubscribing from %s", self.channels)
1001
			self.async_redis.unsubscribe(self.channels)
1002
		else:
1003
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
1004
		log_data = {}
1005
		gone_offline = False
1006
		for channel in self.channels:
1007
			if not isinstance(channel, Number):
1008
				continue
1009
			self.sync_redis.srem(channel, self.id)
1010
			if self.connected:
1011
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
1012
		if gone_offline:
1013
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
1014
			self.logger.info("Updated %s last read message", res)
1015
		self.disconnect(json.dumps(log_data))
1016
1017
	def disconnect(self, log_data, tries=0):
1018
		"""
1019
		Closes a connection if it's not in proggress, otherwice timeouts closing
1020
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
1021
		"""
1022
		self.connected = False
1023
		self.closed_channels = self.channels
1024
		self.channels = []
1025
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
1026
			self.logger.debug('Closing a connection timeouts')
1027
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
1028
		else:
1029
			self.logger.info("Close connection result: %s", log_data)
1030
			self.async_redis.disconnect()
1031
1032
	def generate_self_id(self):
1033
		"""
1034
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
1035
		So if ws loses a connection it still can reconnect with same id,
1036
		and TornadoHandler can restore webrtc_connections to previous state
1037
		"""
1038
		conn_arg = self.get_argument('id', None)
1039
		self.id, random = create_id(self.user_id, conn_arg)
1040
		if random != conn_arg:
1041
			self.ws_write(self.set_ws_id(random, self.id))
1042
1043
	def open(self):
1044
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
1045
		if sessionStore.exists(session_key):
1046
			self.ip = self.get_client_ip()
1047
			session = SessionStore(session_key)
1048
			self.user_id = int(session["_auth_user_id"])
1049
			self.generate_self_id()
1050
			log_params = {
1051
				'id': self.id,
1052
				'ip': self.ip
1053
			}
1054
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
1055
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
1056
			self.async_redis.connect()
1057
			user_db = self.do_db(User.objects.get, id=self.user_id)
1058
			self.sender_name = user_db.username
1059
			self.sex = user_db.sex_str
1060
			user_rooms = self.get_users_in_current_user_rooms()
1061
			self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
1062
			# get all missed messages
1063
			self.channels = []  # py2 doesn't support clear()
1064
			self.channels.append(self.channel)
1065
			self.channels.append(self.id)
1066
			for room_id in user_rooms:
1067
				self.channels.append(room_id)
1068
			self.listen(self.channels)
1069
			off_messages = self.get_offline_messages()
1070
			for room_id in user_rooms:
1071
				self.add_online_user(room_id, off_messages.get(room_id))
1072
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
1073
			self.connected = True
1074
			Thread(target=self.save_ip).start()
1075
		else:
1076
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
1077
			self.close(403, "Session key %s has been rejected" % session_key)
1078
1079
	def check_origin(self, origin):
1080
		"""
1081
		check whether browser set domain matches origin
1082
		"""
1083
		parsed_origin = urlparse(origin)
1084
		origin = parsed_origin.netloc
1085
		origin_domain = origin.split(':')[0].lower()
1086
		browser_set = self.request.headers.get("Host")
1087
		browser_domain = browser_set.split(':')[0]
1088
		return browser_domain == origin_domain
1089
1090
	def ws_write(self, message):
1091
		"""
1092
		Tries to send message, doesn't throw exception outside
1093
		:type self: MessagesHandler
1094
		"""
1095
		# self.logger.debug('<< THREAD %s >>', os.getppid())
1096
		try:
1097
			if isinstance(message, dict):
1098
				message = json.dumps(message)
1099
			if not isinstance(message, str_type):
1100
				raise ValueError('Wrong message type : %s' % str(message))
1101
			self.logger.debug(">> %.1000s", message)
1102
			self.write_message(message)
1103
		except tornado.websocket.WebSocketClosedError as e:
1104
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
1105
1106
	def get_client_ip(self):
1107
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
1108