Completed
Push — master ( 6a7ca0...6c49a0 )
by Andrew
33s
created

MessagesHandler.close_file_receiver()   A

Complexity

Conditions 3

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 8
rs 9.4285
1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from numbers import Number
7
from threading import Thread
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado import ioloop
20
from tornado.websocket import WebSocketHandler
21
22
from chat.cookies_middleware import create_id
23
from chat.log_filters import id_generator
24
from chat.utils import extract_photo
25
from chat.utils import get_or_create_ip
26
27
try:  # py2
28
	from urlparse import urlparse
29
except ImportError:  # py3
30
	from urllib.parse import urlparse
31
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM, \
32
	TORNADO_REDIS_PORT, WEBRTC_CONNECTION
33
from chat.models import User, Message, Room, get_milliseconds, UserJoinedInfo, RoomUsers, Image
34
35
PY3 = sys.version > '3'
36
str_type = str if PY3 else basestring
37
38
39
sessionStore = SessionStore()
40
41
parent_logger = logging.getLogger(__name__)
42
base_logger = logging.LoggerAdapter(parent_logger, {
43
	'id': 0,
44
	'ip': '000.000.000.000'
45
})
46
47
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
48
#CONNECTION_POOL = tornadoredis.ConnectionPool(
49
#	max_connections=500,
50
#	wait_for_available=True)
51
52
53
class Actions(object):
54
	LOGIN = 'addOnlineUser'
55
	SET_WS_ID = 'setWsId'
56
	LOGOUT = 'removeOnlineUser'
57
	SEND_MESSAGE = 'sendMessage'
58
	PRINT_MESSAGE = 'printMessage'
59
	WEBRTC = 'sendRtcData'
60
	CLOSE_FILE_CONNECTION = 'destroyFileConnection'
61
	CLOSE_CALL_CONNECTION = 'destroyCallConnection'
62
	CANCEL_CALL_CONNECTION = 'cancelCallConnection'
63
	ACCEPT_CALL = 'acceptCall'
64
	ACCEPT_FILE = 'acceptFile'
65
	ROOMS = 'setRooms'
66
	REFRESH_USER = 'setOnlineUsers'
67
	GROWL_MESSAGE = 'growl'
68
	GET_MESSAGES = 'loadMessages'
69
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
70
	DELETE_ROOM = 'deleteRoom'
71
	EDIT_MESSAGE = 'editMessage'
72
	DELETE_MESSAGE = 'deleteMessage'
73
	CREATE_ROOM_CHANNEL = 'addRoom'
74
	INVITE_USER = 'inviteUser'
75
	ADD_USER = 'addUserToDom'
76
	OFFLINE_MESSAGES = 'loadOfflineMessages'
77
	SET_WEBRTC_ID = 'setConnectionId'
78
	SET_WEBRTC_ERROR = 'setError'
79
	OFFER_FILE_CONNECTION = 'offerFile'
80
	OFFER_CALL_CONNECTION = 'offerCall'
81
	REPLY_FILE_CONNECTION = 'replyFile'
82
	REPLY_CALL_CONNECTION = 'replyCall'
83
84
85
class VarNames(object):
86
	WEBRTC_QUED_ID = 'id'
87
	USER = 'user'
88
	USER_ID = 'userId'
89
	TIME = 'time'
90
	CONTENT = 'content'
91
	IMG = 'images'
92
	IMG_B64 = 'b64'
93
	IMG_FILE_NAME = 'fileName'
94
	EVENT = 'action'
95
	MESSAGE_ID = 'id'
96
	GENDER = 'sex'
97
	ROOM_NAME = 'name'
98
	ROOM_ID = 'roomId'
99
	ROOM_USERS = 'users'
100
	CHANNEL = 'channel'
101
	WEBRTC_OPPONENT_ID = 'opponentWsId'
102
	GET_MESSAGES_COUNT = 'count'
103
	GET_MESSAGES_HEADER_ID = 'headerId'
104
	CHANNEL_NAME = 'channel'
105
	IS_ROOM_PRIVATE = 'private'
106
	CONNECTION_ID = 'connId'
107
	HANDLER_NAME = 'handler'
108
109
110
class HandlerNames:
111
	CHANNELS = 'channels'
112
	CHAT = 'chat'
113
	GROWL = 'growl'
114
	WEBRTC = 'webrtc'
115
	PEER_CONNECTION = 'peerConnection'
116
	WEBRTC_TRANSFER = 'webrtcTransfer'
117
	WS = 'ws'
118
119
120
class WebRtcRedisStates:
121
	RESPONDED = 'responded'
122
	READY = 'ready'
123
	OFFERED = 'offered'
124
	CLOSED = 'closed'
125
126
127
class RedisPrefix:
128
	USER_ID_CHANNEL_PREFIX = 'u'
129
	DEFAULT_CHANNEL = ALL_ROOM_ID
130
	CONNECTION_ID_LENGTH = 8  # should be secure
131
132
	@classmethod
133
	def generate_user(cls, key):
134
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
135
136
137
class MessagesCreator(object):
138
139
	def __init__(self, *args, **kwargs):
140
		self.sex = None
141
		self.sender_name = None
142
		self.id = None  # child init
143
		self.user_id = 0  # anonymous by default
144
145
	def default(self, content, event, handler):
146
		"""
147
		:return: {"action": event, "content": content, "time": "20:48:57"}
148
		"""
149
		return {
150
			VarNames.EVENT: event,
151
			VarNames.CONTENT: content,
152
			VarNames.USER_ID: self.user_id,
153
			VarNames.TIME: get_milliseconds(),
154
			VarNames.HANDLER_NAME: handler
155
		}
156
157
	def reply_webrtc(self, event, connection_id):
158
		"""
159
		:return: {"action": event, "content": content, "time": "20:48:57"}
160
		"""
161
		return {
162
			VarNames.EVENT: event,
163
			VarNames.CONNECTION_ID: connection_id,
164
			VarNames.USER_ID: self.user_id,
165
			VarNames.USER: self.sender_name,
166
			VarNames.WEBRTC_OPPONENT_ID: self.id,
167
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
168
		}
169
170
	def set_ws_id(self, random, self_id):
171
		return {
172
			VarNames.HANDLER_NAME: HandlerNames.WS,
173
			VarNames.EVENT: Actions.SET_WS_ID,
174
			VarNames.CONTENT: random,
175
			VarNames.WEBRTC_OPPONENT_ID: self_id
176
		}
177
178
	def room_online(self, online, event, channel):
179
		"""
180
		:return: {"action": event, "content": content, "time": "20:48:57"}
181
		"""
182
		room_less = self.default(online, event, HandlerNames.CHAT)
183
		room_less[VarNames.CHANNEL_NAME] = channel
184
		room_less[VarNames.USER] = self.sender_name
185
		room_less[VarNames.GENDER] = self.sex
186
		return room_less
187
188
	def offer_webrtc(self, content, connection_id, room_id, action):
189
		"""
190
		:return: {"action": "call", "content": content, "time": "20:48:57"}
191
		"""
192
		message = self.default(content, action, HandlerNames.WEBRTC)
193
		message[VarNames.USER] = self.sender_name
194
		message[VarNames.CONNECTION_ID] = connection_id
195
		message[VarNames.WEBRTC_OPPONENT_ID] = self.id
196
		message[VarNames.CHANNEL] = room_id
197
		return message
198
199
	def set_connection_id(self, qued_id, connection_id):
200
		return {
201
			VarNames.EVENT: Actions.SET_WEBRTC_ID,
202
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC,
203
			VarNames.CONNECTION_ID: connection_id,
204
			VarNames.WEBRTC_QUED_ID: qued_id
205
		}
206
207
	def set_webrtc_error(self, error, connection_id, qued_id=None):
208
		message = self.default(error, Actions.SET_WEBRTC_ERROR, HandlerNames.PEER_CONNECTION) # TODO file/call
209
		message[VarNames.CONNECTION_ID] = connection_id
210
		if qued_id:
211
			message[VarNames.WEBRTC_QUED_ID] = qued_id
212
		return message
213
214
	@classmethod
215
	def create_message(cls, message, images):
216
		res = {
217
			VarNames.USER_ID: message.sender_id,
218
			VarNames.CONTENT: message.content,
219
			VarNames.TIME: message.time,
220
			VarNames.MESSAGE_ID: message.id,
221
			VarNames.IMG: images
222
		}
223
		return res
224
225
	@classmethod
226
	def create_send_message(cls, message, event, imgs):
227
		"""
228
		:param message:
229
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
230
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
231
		"""
232
		res = cls.create_message(message, imgs)
233
		res[VarNames.EVENT] = event
234
		res[VarNames.CHANNEL] = message.room_id
235
		res[VarNames.HANDLER_NAME] = HandlerNames.CHAT
236
		return res
237
238
	@classmethod
239
	def append_images(cls, messages, images):
240
		res_mess = []
241
		for message in messages:
242
			res_images = cls.prepare_img(images, message.id)
243
			res_mess.append(cls.create_message(message, res_images))
244
		return res_mess
245
246
	@classmethod
247
	def prepare_img(cls, images, message_id):
248
		if images:
249
			return {x.symbol: x.img.url for x in images if x.message_id == message_id}
250
251
	@classmethod
252
	def get_messages(cls, messages, channel, images):
253
		"""
254
		:type messages: list[Messages]
255
		:type channel: str
256
		:type messages: QuerySet[Messages]
257
		"""
258
		return {
259
			VarNames.CONTENT: cls.append_images(messages, images),
260
			VarNames.EVENT: Actions.GET_MESSAGES,
261
			VarNames.CHANNEL: channel,
262
			VarNames.HANDLER_NAME: HandlerNames.CHAT
263
		}
264
265
	@property
266
	def channel(self):
267
		return RedisPrefix.generate_user(self.user_id)
268
269
	def subscribe_direct_channel_message(self, room_id, other_user_id):
270
		return {
271
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
272
			VarNames.ROOM_ID: room_id,
273
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
274
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS
275
		}
276
277
	def subscribe_room_channel_message(self, room_id, room_name):
278
		return {
279
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
280
			VarNames.ROOM_ID: room_id,
281
			VarNames.ROOM_USERS: [self.user_id],
282
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
283
			VarNames.ROOM_NAME: room_name
284
		}
285
286
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
287
		return {
288
			VarNames.EVENT: Actions.INVITE_USER,
289
			VarNames.ROOM_ID: room_id,
290
			VarNames.USER_ID: user_id,
291
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
292
			VarNames.ROOM_NAME: room_name,
293
			VarNames.CONTENT: users
294
		}
295
296
	def add_user_to_room(self, channel, user_id, content):
297
		return {
298
			VarNames.EVENT: Actions.ADD_USER,
299
			VarNames.CHANNEL: channel,
300
			VarNames.USER_ID: user_id,
301
			VarNames.HANDLER_NAME: HandlerNames.CHAT,
302
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
303
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
304
		}
305
306
	def unsubscribe_direct_message(self, room_id):
307
		return {
308
			VarNames.EVENT: Actions.DELETE_ROOM,
309
			VarNames.ROOM_ID: room_id,
310
			VarNames.USER_ID: self.user_id,
311
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
312
			VarNames.TIME: get_milliseconds()
313
		}
314
315
	def load_offline_message(self, offline_messages, channel_key):
316
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
317
		res[VarNames.CHANNEL] = channel_key
318
		return res
319
320
321
class MessagesHandler(MessagesCreator):
322
323
	def __init__(self, *args, **kwargs):
324
		self.closed_channels = None
325
		self.parsable_prefix = 'p'
326
		super(MessagesHandler, self).__init__(*args, **kwargs)
327
		self.webrtc_ids = {}
328
		self.ip = None
329
		from chat import global_redis
330
		self.async_redis_publisher = global_redis.async_redis_publisher
331
		self.sync_redis = global_redis.sync_redis
332
		self.channels = []
333
		self._logger = None
334
		self.async_redis = tornadoredis.Client(port=TORNADO_REDIS_PORT)
335
		self.patch_tornadoredis()
336
		self.pre_process_message = {
337
			Actions.GET_MESSAGES: self.process_get_messages,
338
			Actions.SEND_MESSAGE: self.process_send_message,
339
			Actions.WEBRTC: self.proxy_webrtc,
340
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
341
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
342
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
343
			Actions.ACCEPT_CALL: self.accept_call,
344
			Actions.ACCEPT_FILE: self.accept_file,
345
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
346
			Actions.DELETE_ROOM: self.delete_channel,
347
			Actions.EDIT_MESSAGE: self.edit_message,
348
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
349
			Actions.INVITE_USER: self.invite_user,
350
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
351
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
352
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
353
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
354
		}
355
		self.post_process_message = {
356
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
357
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
358
			Actions.DELETE_ROOM: self.send_client_delete_channel,
359
			Actions.INVITE_USER: self.send_client_new_channel,
360
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
361
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
362
		}
363
364
	def patch_tornadoredis(self):  # TODO remove this
365
		fabric = type(self.async_redis.connection.readline)
366
		self.async_redis.connection.old_read = self.async_redis.connection.readline
367
		def new_read(new_self, callback=None):
368
			try:
369
				return new_self.old_read(callback=callback)
370
			except Exception as e:
371
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
372
				self.logger.error(e)
373
				self.logger.error(
374
					"Exception info: "
375
					"self.id: %s ;;; "
376
					"self.connected = '%s';;; "
377
					"Redis default channel online = '%s';;; "
378
					"self.channels = '%s';;; "
379
					"self.closed_channels  = '%s';;;",
380
					self.id, self.connected, current_online, self.channels, self.closed_channels
381
				)
382
				raise e
383
384
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
385
386
	@property
387
	def connected(self):
388
		raise NotImplemented
389
390
	@connected.setter
391
	def connected(self, value):
392
		raise NotImplemented
393
394
	@tornado.gen.engine
395
	def listen(self, channels):
396
		yield tornado.gen.Task(
397
			self.async_redis.subscribe, channels)
398
		self.async_redis.listen(self.pub_sub_message)
399
400
	@property
401
	def logger(self):
402
		return self._logger if self._logger else base_logger
403
404
	@tornado.gen.engine
405
	def add_channel(self, channel):
406
		self.channels.append(channel)
407
		yield tornado.gen.Task(
408
			self.async_redis.subscribe, (channel,))
409
410
	def evaluate(self, query_set):
411
		self.do_db(len, query_set)
412
		return query_set
413
414
	def do_db(self, callback, *args, **kwargs):
415
		try:
416
			return callback(*args, **kwargs)
417
		except (OperationalError, InterfaceError) as e:
418
			if 'MySQL server has gone away' in str(e):
419
				self.logger.warning('%s, reconnecting' % e)
420
				connection.close()
421
				return callback(*args, **kwargs)
422
			else:
423
				raise e
424
425
	def execute_query(self, query, *args, **kwargs):
426
		cursor = connection.cursor()
427
		cursor.execute(query, *args, **kwargs)
428
		desc = cursor.description
429
		return [
430
			dict(zip([col[0] for col in desc], row))
431
			for row in cursor.fetchall()
432
			]
433
434
	def get_online_from_redis(self, channel, check_self_online=False):
435
		"""
436
		:rtype : dict
437
		returns (dict, bool) if check_type is present
438
		"""
439
		online = self.sync_redis.smembers(channel)
440
		self.logger.debug('!! channel %s redis online: %s', channel, online)
441
		result = set()
442
		user_is_online = False
443
		# redis stores8 REDIS_USER_FORMAT, so parse them
444
		if online:
445
			for raw in online:  # py2 iteritems
446
				decoded = raw.decode('utf-8')
447
				# : char specified in cookies_middleware.py.create_id
448
				user_id = int(decoded.split(':')[0])
449
				if user_id == self.user_id and decoded != self.id:
450
					user_is_online = True
451
				result.add(user_id)
452
		result = list(result)
453
		return (result, user_is_online) if check_self_online else result
454
455
	def add_online_user(self, room_id, offline_messages=None):
456
		"""
457
		adds to redis
458
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
459
		:return:
460
		"""
461
		self.async_redis_publisher.sadd(room_id, self.id)
462
		# since we add user to online first, latest trigger will always show correct online
463
		online, is_online = self.get_online_from_redis(room_id, True)
464
		if not is_online:  # if a new tab has been opened
465
			online.append(self.user_id)
466
			online_user_names_mes = self.room_online(
467
				online,
468
				Actions.LOGIN,
469
				room_id
470
			)
471
			self.logger.info('!! First tab, sending refresh online for all')
472
			self.publish(online_user_names_mes, room_id)
473
			if offline_messages:
474
				self.ws_write(self.load_offline_message(offline_messages, room_id))
475
		else:  # Send user names to self
476
			online_user_names_mes = self.room_online(
477
				online,
478
				Actions.REFRESH_USER,
479
				room_id
480
			)
481
			self.logger.info('!! Second tab, retrieving online for self')
482
			self.ws_write(online_user_names_mes)
483
484
	def publish(self, message, channel, parsable=False):
485
		jsoned_mess = json.dumps(message)
486
		self.logger.debug('<%s> %s', channel, jsoned_mess)
487
		if parsable:
488
			jsoned_mess = self.encode(jsoned_mess)
489
		self.async_redis_publisher.publish(channel, jsoned_mess)
490
491
	def encode(self, message):
492
		"""
493
		Marks message with prefix to specify that
494
		it should be decoded and proccesed before sending to client
495
		@param message: message to mark
496
		@return: marked message
497
		"""
498
		return self.parsable_prefix + message
499
500
	def remove_parsable_prefix(self, message):
501
		if message.startswith(self.parsable_prefix):
502
			return message[1:]
503
504
	def pub_sub_message(self, message):
505
		data = message.body
506
		if isinstance(data, str_type):  # subscribe event
507
			prefixless_str = self.remove_parsable_prefix(data)
508
			if prefixless_str:
509
				dict_message = json.loads(prefixless_str)
510
				res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
511
				if not res:
512
					self.ws_write(prefixless_str)
513
			else:
514
				self.ws_write(data)
515
516
	def ws_write(self, message):
517
		raise NotImplementedError('WebSocketHandler implements')
518
519
	def process_send_message(self, message):
520
		"""
521
		:type message: dict
522
		"""
523
		channel = message[VarNames.CHANNEL]
524
		message_db = Message(
525
			sender_id=self.user_id,
526
			content=message[VarNames.CONTENT]
527
		)
528
		message_db.room_id = channel
529
		self.do_db(message_db.save)
530
		res_imgs = self.parse_imgs(message.get(VarNames.IMG), message_db.id)
531
		prepared_message = self.create_send_message(message_db, Actions.PRINT_MESSAGE, self.prepare_img(res_imgs, message_db.id))
532
		self.publish(prepared_message, channel)
533
534
	def parse_imgs(self, imgs, mess_id):
535
		res_imgs = []
536 View Code Duplication
		if imgs:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
537
			fetch = False
538
			for k in imgs:
539
				b64 =imgs[k].get(VarNames.IMG_B64)
540
				if b64:
541
					img = extract_photo(imgs[k][VarNames.IMG_B64], imgs[k][VarNames.IMG_FILE_NAME])
542
					res_imgs.append(Image(message_id=mess_id, img=img, symbol=k))
543
				else:
544
					fetch = True
545
			fetched_messages = None
546
			if fetch:
547
				fetched_messages = Image.objects.filter(message_id=mess_id)
548
				len(fetched_messages)  # fetch
549
			if res_imgs:
550
				Image.objects.bulk_create(res_imgs)
551
			if fetched_messages:
552
				for m in fetched_messages:
553
					res_imgs.append(m)
554
		return res_imgs
555
556 View Code Duplication
	def close_file_connection(self, in_message):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
557
		connection_id = in_message[VarNames.CONNECTION_ID]
558
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
559
		if not self_channel_status:
560
			raise Exception("Access Denied")
561
		if self_channel_status != WebRtcRedisStates.CLOSED:
562
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
563
			if sender_id == self.id:
564
				self.close_file_sender(connection_id)
565
			else:
566
				self.close_file_receiver(connection_id, in_message, sender_id)
567
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
568
569
	def close_call_connection(self, in_message):
570
		connection_id = in_message[VarNames.CONNECTION_ID]
571
		conn_users = self.sync_redis.shgetall(connection_id)
572
		if conn_users[self.id] in [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED]:
573
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
574
			del conn_users[self.id]
575
			message = {
576
				VarNames.EVENT: Actions.CLOSE_CALL_CONNECTION,
577
				VarNames.CONNECTION_ID: connection_id,
578
				VarNames.USER_ID: self.user_id,
579
				VarNames.WEBRTC_OPPONENT_ID: self.id,
580
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
581
			}
582
			for user in conn_users:
583
				if conn_users[user] != WebRtcRedisStates.CLOSED:
584
					self.publish(message, user)
585
		else:
586
			raise ValidationError("Invalid channel status.")
587
588
	def cancel_call_connection(self, in_message, reply_action):
589
		self.send_call_answer(in_message, WebRtcRedisStates.CLOSED, Actions.CANCEL_CALL_CONNECTION)
590
591
	def close_file_receiver(self, connection_id, in_message, sender_id):
592
		sender_status = self.sync_redis.shget(connection_id, sender_id)
593
		if not sender_status:
594
			raise Exception("Access denied")
595
		if sender_status != WebRtcRedisStates.CLOSED:
596
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
597
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
598
			self.publish(in_message, sender_id)
599
600
	def close_file_sender(self, connection_id):
601
		values = self.sync_redis.shgetall(connection_id)
602
		del values[self.id]
603
		for ws_id in values:
604
			if values[ws_id] == WebRtcRedisStates.CLOSED:
605
				continue
606
			self.publish({
607
				VarNames.EVENT: Actions.CLOSE_FILE_CONNECTION,
608
				VarNames.CONNECTION_ID: connection_id,
609
				VarNames.WEBRTC_OPPONENT_ID: self.id,
610
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
611
			}, ws_id)
612
613
	def accept_file(self, in_message):
614
		connection_id = in_message[VarNames.CONNECTION_ID] # TODO accept all if call
615
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
616
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
617
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
618
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.RESPONDED:
619
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
620
			self.publish({
621
				VarNames.EVENT: Actions.ACCEPT_FILE,
622
				VarNames.CONNECTION_ID: connection_id,
623
				VarNames.WEBRTC_OPPONENT_ID: self.id,
624
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
625 View Code Duplication
			}, sender_ws_id)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
626
		else:
627
			raise ValidationError("Invalid channel status")
628
629
	# todo if we shgetall and only then do async hset
630
	# todo we can catch an issue when 2 concurrent users accepted the call
631
	# todo but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
632
	# def accept_call(self, in_message):
633
	# 	connection_id = in_message[VarNames.CONNECTION_ID]
634
	# 	channel_status = self.sync_redis.shgetall(connection_id)
635
	# 	if channel_status and channel_status[self.id] == WebRtcRedisStates.RESPONDED:
636
	# 		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
637
	# 		for key in channel_status:  # del channel_status[self.id] not needed as self in responded
638
	# 			if channel_status[key] == WebRtcRedisStates.READY:
639
	# 				self.publish({
640
	# 					VarNames.EVENT: Actions.ACCEPT_CALL,
641
	# 					VarNames.CONNECTION_ID: connection_id,
642
	# 					VarNames.WEBRTC_OPPONENT_ID: self.id,
643
	# 					VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
644
	# 				}, key)
645 View Code Duplication
	# 	else:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
646
	# 		raise ValidationError("Invalid channel status")
647
648
	def accept_call(self, in_message):
649
		connection_id = in_message[VarNames.CONNECTION_ID]
650
		self_status = self.sync_redis.shget(connection_id, self.id)
651
		if self_status == WebRtcRedisStates.RESPONDED:
652
			self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.READY)
653
			channel_status = self.sync_redis.shgetall(connection_id)
654
			del channel_status[self.id]
655
			message = {
656
				VarNames.EVENT: Actions.ACCEPT_CALL,
657
				VarNames.USER_ID: self.user_id,
658
				VarNames.CONNECTION_ID: connection_id,
659
				VarNames.WEBRTC_OPPONENT_ID: self.id,
660
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
661
			}
662
			for key in channel_status:
663
				if channel_status[key] != WebRtcRedisStates.CLOSED:
664
					self.publish(message, key)
665
		else:
666
			raise ValidationError("Invalid channel status")
667
668
	def offer_webrtc_connection(self, in_message):
669
		room_id = in_message[VarNames.CHANNEL]
670
		content = in_message.get(VarNames.CONTENT)
671
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
672
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
673
		# use list because sets dont have 1st element which is offerer
674
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
675
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
676
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
677
		self_message = self.set_connection_id(qued_id, connection_id)
678
		self.ws_write(self_message)
679
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
680
		self.publish(opponents_message, room_id, True)
681
682
	def reply_call_connection(self, in_message):
683
		self.send_call_answer(in_message, WebRtcRedisStates.RESPONDED, Actions.REPLY_CALL_CONNECTION)
684
685
	def send_call_answer(self, in_message, status_set, reply_action):
686
		connection_id = in_message[VarNames.CONNECTION_ID]
687
		conn_users = self.sync_redis.shgetall(connection_id)
688
		if conn_users[self.id] == WebRtcRedisStates.OFFERED:
689
			self.async_redis_publisher.hset(connection_id, self.id, status_set)
690
			del conn_users[self.id]
691
			message = self.reply_webrtc(reply_action, connection_id)
692
			for user in conn_users:
693
				if conn_users[user] != WebRtcRedisStates.CLOSED:
694
					self.publish(message, user)
695
		else:
696
			raise ValidationError("Invalid channel status.")
697
698
	def reply_file_connection(self, in_message):
699
		connection_id = in_message[VarNames.CONNECTION_ID]
700
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
701
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
702
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
703
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
704
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
705
			self.publish(self.reply_webrtc(Actions.REPLY_FILE_CONNECTION, connection_id), sender_ws_id)
706
		else:
707
			raise ValidationError("Invalid channel status.")
708
709
	def proxy_webrtc(self, in_message):
710
		"""
711
		:type in_message: dict
712
		"""
713
		connection_id = in_message[VarNames.CONNECTION_ID]
714
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
715
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
716
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
717
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
718
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
719
				self_channel_status, opponent_channel_status
720
			)) # todo receiver should only accept proxy_webrtc from sender, sender can accept all
721
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
722
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
723
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
724
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
725
		self.logger.debug("Forwarding message to channel %s, self %s, other status %s",
726
			channel, self_channel_status, opponent_channel_status
727
		)
728
		self.publish(in_message, channel)
729
730
	def create_new_room(self, message):
731
		room_name = message[VarNames.ROOM_NAME]
732
		if not room_name or len(room_name) > 16:
733
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
734
		room = Room(name=room_name)
735
		self.do_db(room.save)
736
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
737
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
738
		self.publish(subscribe_message, self.channel, True)
739
740
	def invite_user(self, message):
741
		room_id = message[VarNames.ROOM_ID]
742
		user_id = message[VarNames.USER_ID]
743
		if room_id not in self.channels:
744
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
745
		room = self.do_db(Room.objects.get, id=room_id)
746
		if room.is_private:
747
			raise ValidationError("You can't add users to direct room, create a new room instead")
748
		try:
749
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
750
		except IntegrityError:
751
			raise ValidationError("User is already in channel")
752
		users_in_room = {}
753
		for user in room.users.all():
754
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
755
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
756
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
757
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
758
759
	def create_room(self, user_rooms, user_id):
760
		if self.user_id == user_id:
761
			room_ids = list([room['room_id'] for room in self.evaluate(user_rooms)])
762
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
763
		else:
764
			rooms_query = RoomUsers.objects.filter(user_id=user_id, room__in=user_rooms)
765
			query_res = rooms_query.values('room__id', 'room__disabled')
766
		try:
767
			room = self.do_db(query_res.get)
768
			room_id = room['room__id']
769
			self.update_room(room_id, room['room__disabled'])
770
		except RoomUsers.DoesNotExist:
771
			room = Room()
772
			room.save()
773
			room_id = room.id
774
			if self.user_id == user_id:
775
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
776
			else:
777
				RoomUsers.objects.bulk_create([
778
					RoomUsers(user_id=user_id, room_id=room_id),
779
					RoomUsers(user_id=self.user_id, room_id=room_id),
780
				])
781
		return room_id
782
783
	def update_room(self, room_id, disabled):
784
		if not disabled:
785
			raise ValidationError('This room already exist')
786
		else:
787
			Room.objects.filter(id=room_id).update(disabled=False)
788
789
	def create_user_channel(self, message):
790
		user_id = message[VarNames.USER_ID]
791
		# get all self private rooms ids
792
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
793
		# get private room that contains another user from rooms above
794
		room_id = self.create_room(user_rooms, user_id)
795
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
796
		self.publish(subscribe_message, self.channel, True)
797
		other_channel = RedisPrefix.generate_user(user_id)
798
		if self.channel != other_channel:
799
			self.publish(subscribe_message, other_channel, True)
800
801
	def delete_channel(self, message):
802
		room_id = message[VarNames.ROOM_ID]
803
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
804
			raise ValidationError('You are not allowed to exit this room')
805
		room = self.do_db(Room.objects.get, id=room_id)
806
		if room.disabled:
807
			raise ValidationError('Room is already deleted')
808
		if room.name is None:  # if private then disable
809
			room.disabled = True
810
		else: # if public -> leave the room, delete the link
811
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
812
			online = self.get_online_from_redis(room_id)
813
			online.remove(self.user_id)
814
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
815
		room.save()
816
		message = self.unsubscribe_direct_message(room_id)
817
		self.publish(message, room_id, True)
818
819
	def edit_message(self, data):
820
		message_id = data[VarNames.MESSAGE_ID]
821
		message = Message.objects.get(id=message_id)
822
		if message.sender_id != self.user_id:
823
			raise ValidationError("You can only edit your messages")
824
		if message.time + 600000 < get_milliseconds():
825
			raise ValidationError("You can only edit messages that were send not more than 10 min ago")
826
		if message.deleted:
827
			raise ValidationError("Already deleted")
828
		message.content = data[VarNames.CONTENT]
829
		selector = Message.objects.filter(id=message_id)
830
		if message.content is None:
831
			imgs = None
832
			selector.update(deleted=True)
833
			action = Actions.DELETE_MESSAGE
834
		else:
835
			imgs = self.parse_imgs(data.get(VarNames.IMG), message.id)
836
			prep_imgs = self.prepare_img(imgs, message_id)
837
			action = Actions.EDIT_MESSAGE
838
			selector.update(content=message.content)
839
		self.publish(self.create_send_message(message, action, prep_imgs), message.room_id)
840
841
	def send_client_new_channel(self, message):
842
		room_id = message[VarNames.ROOM_ID]
843
		self.add_channel(room_id)
844
		self.add_online_user(room_id)
845
846
	def set_opponent_call_channel(self, message):
847
		connection_id = message[VarNames.CONNECTION_ID]
848
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
849
			return True
850
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
851
852
	def send_client_delete_channel(self, message):
853
		room_id = message[VarNames.ROOM_ID]
854
		self.async_redis.unsubscribe((room_id,))
855
		self.async_redis_publisher.hdel(room_id, self.id)
856
		self.channels.remove(room_id)
857
858
	def process_get_messages(self, data):
859
		"""
860
		:type data: dict
861
		"""
862
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
863
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
864
		room_id = data[VarNames.CHANNEL]
865
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
866
		if header_id is None:
867
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
868
		else:
869
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
870
		images = self.do_db(self.get_message_images, messages)
871
		response = self.get_messages(messages, room_id, images)
872
		self.ws_write(response)
873
874
	def get_message_images(self, messages):
875
		ids =[message.id for message in messages]
876
		images = Image.objects.filter(message_id__in=ids)
877
		self.logger.info('!! Messages have %d images', len(images))
878
		return images
879
880
	def get_offline_messages(self):
881
		res = {}
882
		off_mess = Message.objects.filter(
883
			id__gt=F('room__roomusers__last_read_message_id'),
884
			deleted=False,
885
			room__roomusers__user_id=self.user_id
886
		)
887
		images = self.do_db(self.get_message_images, off_mess)
888
		for message in off_mess:
889
			prep_m = self.create_message(message, self.prepare_img(images, message.id))
890
			res.setdefault(message.room_id, []).append(prep_m)
891
		return res
892
893
	def get_users_in_current_user_rooms(self):
894
		"""
895
		{
896
			"ROOM_ID:1": {
897
				"name": "All",
898
				"users": {
899
					"USER_ID:admin": {
900
						"name": "USER_NAME:admin",
901
						"sex": "SEX:Secret"
902
					},
903
					"USER_ID_2": {
904
						"name": "USER_NAME:Mike",
905
						"sex": "Male"
906
					}
907
				},
908
				"isPrivate": true
909
			}
910
		}
911
		"""
912
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
913
		res = {room['id']: {
914
				VarNames.ROOM_NAME: room['name'],
915
				VarNames.ROOM_USERS: {}
916
			} for room in user_rooms}
917
		room_ids = (room_id for room_id in res)
918
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
919
		for user in rooms_users:
920
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
921
		return res
922
923
	def set_js_user_structure(self, user_dict, user_id, name, sex):
924
		user_dict[user_id] = {
925
			VarNames.USER: name,
926
			VarNames.GENDER: GENDERS[sex]
927
		}
928
929
	def save_ip(self):
930
		if (self.do_db(UserJoinedInfo.objects.filter(
931
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
932
			return
933
		ip_address = get_or_create_ip(self.ip, self.logger)
934
		UserJoinedInfo.objects.create(
935
			ip=ip_address,
936
			user_id=self.user_id
937
		)
938
939
	def publish_logout(self, channel, log_data):
940
		# seems like async solves problem with connection lost and wrong data status
941
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
942
		online, is_online = self.get_online_from_redis(channel, True)
943
		log_data[channel] = {'online': online, 'is_online': is_online}
944
		if not is_online:
945
			message = self.room_online(online, Actions.LOGOUT, channel)
946
			self.publish(message, channel)
947
			return True
948
949
950
class AntiSpam(object):
951
952
	def __init__(self):
953
		self.spammed = 0
954
		self.info = {}
955
956
	def check_spam(self, json_message):
957
		message_length = len(json_message)
958
		info_key = int(round(time.time() * 100))
959
		self.info[info_key] = message_length
960
		if message_length > MAX_MESSAGE_SIZE:
961
			self.spammed += 1
962
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
963
		self.check_timed_spam()
964
965
	def check_timed_spam(self):
966
		# TODO implement me
967
		pass
968
		# raise ValidationError("You're chatting too much, calm down a bit!")
969
970
971
class TornadoHandler(WebSocketHandler, MessagesHandler):
972
973
	def __init__(self, *args, **kwargs):
974
		super(TornadoHandler, self).__init__(*args, **kwargs)
975
		self.__connected__ = False
976
		self.anti_spam = AntiSpam()
977
978
	@property
979
	def connected(self):
980
		return self.__connected__
981
982
	@connected.setter
983
	def connected(self, value):
984
		self.__connected__ = value
985
986
	def data_received(self, chunk):
987
		pass
988
989
	def on_message(self, json_message):
990
		try:
991
			if not self.connected:
992
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
993
			if not json_message:
994
				raise Exception('Skipping null message')
995
			# self.anti_spam.check_spam(json_message)
996
			self.logger.debug('<< %.1000s', json_message)
997
			message = json.loads(json_message)
998
			if message[VarNames.EVENT] not in self.pre_process_message:
999
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
1000
			channel = message.get(VarNames.CHANNEL)
1001
			if channel and channel not in self.channels:
1002
				raise Exception('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
1003
			self.pre_process_message[message[VarNames.EVENT]](message)
1004
		except ValidationError as e:
1005
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
1006
			self.ws_write(error_message)
1007
1008
	def on_close(self):
1009
		if self.async_redis.subscribed:
1010
			self.logger.info("Close event, unsubscribing from %s", self.channels)
1011
			self.async_redis.unsubscribe(self.channels)
1012
		else:
1013
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
1014
		log_data = {}
1015
		gone_offline = False
1016
		for channel in self.channels:
1017
			if not isinstance(channel, Number):
1018
				continue
1019
			self.sync_redis.srem(channel, self.id)
1020
			if self.connected:
1021
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
1022
		if gone_offline:
1023
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
1024
			self.logger.info("Updated %s last read message", res)
1025
		self.disconnect(json.dumps(log_data))
1026
1027
	def disconnect(self, log_data, tries=0):
1028
		"""
1029
		Closes a connection if it's not in proggress, otherwice timeouts closing
1030
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
1031
		"""
1032
		self.connected = False
1033
		self.closed_channels = self.channels
1034
		self.channels = []
1035
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
1036
			self.logger.debug('Closing a connection timeouts')
1037
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
1038
		else:
1039
			self.logger.info("Close connection result: %s", log_data)
1040
			self.async_redis.disconnect()
1041
1042
	def generate_self_id(self):
1043
		"""
1044
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
1045
		So if ws loses a connection it still can reconnect with same id,
1046
		and TornadoHandler can restore webrtc_connections to previous state
1047
		"""
1048
		conn_arg = self.get_argument('id', None)
1049
		self.id, random = create_id(self.user_id, conn_arg)
1050
		if random != conn_arg:
1051
			self.ws_write(self.set_ws_id(random, self.id))
1052
1053
	def open(self):
1054
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
1055
		if sessionStore.exists(session_key):
1056
			self.ip = self.get_client_ip()
1057
			session = SessionStore(session_key)
1058
			self.user_id = int(session["_auth_user_id"])
1059
			self.generate_self_id()
1060
			log_params = {
1061
				'id': self.id,
1062
				'ip': self.ip
1063
			}
1064
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
1065
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
1066
			self.async_redis.connect()
1067
			user_db = self.do_db(User.objects.get, id=self.user_id)
1068
			self.sender_name = user_db.username
1069
			self.sex = user_db.sex_str
1070
			user_rooms = self.get_users_in_current_user_rooms()
1071
			self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
1072
			# get all missed messages
1073
			self.channels = []  # py2 doesn't support clear()
1074
			self.channels.append(self.channel)
1075
			self.channels.append(self.id)
1076
			for room_id in user_rooms:
1077
				self.channels.append(room_id)
1078
			self.listen(self.channels)
1079
			off_messages = self.get_offline_messages()
1080
			for room_id in user_rooms:
1081
				self.add_online_user(room_id, off_messages.get(room_id))
1082
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
1083
			self.connected = True
1084
			Thread(target=self.save_ip).start()
1085
		else:
1086
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
1087
			self.close(403, "Session key %s has been rejected" % session_key)
1088
1089
	def check_origin(self, origin):
1090
		"""
1091
		check whether browser set domain matches origin
1092
		"""
1093
		parsed_origin = urlparse(origin)
1094
		origin = parsed_origin.netloc
1095
		origin_domain = origin.split(':')[0].lower()
1096
		browser_set = self.request.headers.get("Host")
1097
		browser_domain = browser_set.split(':')[0]
1098
		return browser_domain == origin_domain
1099
1100
	def ws_write(self, message):
1101
		"""
1102
		Tries to send message, doesn't throw exception outside
1103
		:type self: MessagesHandler
1104
		"""
1105
		# self.logger.debug('<< THREAD %s >>', os.getppid())
1106
		try:
1107
			if isinstance(message, dict):
1108
				message = json.dumps(message)
1109
			if not isinstance(message, str_type):
1110
				raise ValueError('Wrong message type : %s' % str(message))
1111
			self.logger.debug(">> %.1000s", message)
1112
			self.write_message(message)
1113
		except tornado.websocket.WebSocketClosedError as e:
1114
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
1115
1116
	def get_client_ip(self):
1117
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
1118