Completed
Push — master ( 465e79...7365f5 )
by Andrew
32s
created

MessagesHandler.process_images()   B

Complexity

Conditions 6

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 11
rs 8
1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from numbers import Number
7
from threading import Thread
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado import ioloop
20
from tornado.websocket import WebSocketHandler
21
22
from chat.cookies_middleware import create_id
23
from chat.log_filters import id_generator
24
from chat.utils import extract_photo, get_max_key
25
from chat.utils import get_or_create_ip
26
27
try:  # py2
28
	from urlparse import urlparse
29
except ImportError:  # py3
30
	from urllib.parse import urlparse
31
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM, \
32
	TORNADO_REDIS_PORT, WEBRTC_CONNECTION
33
from chat.models import User, Message, Room, get_milliseconds, UserJoinedInfo, RoomUsers, Image
34
35
PY3 = sys.version > '3'
36
str_type = str if PY3 else basestring
37
38
39
sessionStore = SessionStore()
40
41
parent_logger = logging.getLogger(__name__)
42
base_logger = logging.LoggerAdapter(parent_logger, {
43
	'id': 0,
44
	'ip': '000.000.000.000'
45
})
46
47
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
48
#CONNECTION_POOL = tornadoredis.ConnectionPool(
49
#	max_connections=500,
50
#	wait_for_available=True)
51
52
53
class Actions(object):
54
	LOGIN = 'addOnlineUser'
55
	SET_WS_ID = 'setWsId'
56
	LOGOUT = 'removeOnlineUser'
57
	SEND_MESSAGE = 'sendMessage'
58
	PRINT_MESSAGE = 'printMessage'
59
	WEBRTC = 'sendRtcData'
60
	CLOSE_FILE_CONNECTION = 'destroyFileConnection'
61
	CLOSE_CALL_CONNECTION = 'destroyCallConnection'
62
	CANCEL_CALL_CONNECTION = 'cancelCallConnection'
63
	ACCEPT_CALL = 'acceptCall'
64
	ACCEPT_FILE = 'acceptFile'
65
	ROOMS = 'setRooms'
66
	REFRESH_USER = 'setOnlineUsers'
67
	GROWL_MESSAGE = 'growl'
68
	GET_MESSAGES = 'loadMessages'
69
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
70
	DELETE_ROOM = 'deleteRoom'
71
	EDIT_MESSAGE = 'editMessage'
72
	DELETE_MESSAGE = 'deleteMessage'
73
	CREATE_ROOM_CHANNEL = 'addRoom'
74
	INVITE_USER = 'inviteUser'
75
	ADD_USER = 'addUserToDom'
76
	OFFLINE_MESSAGES = 'loadOfflineMessages'
77
	SET_WEBRTC_ID = 'setConnectionId'
78
	SET_WEBRTC_ERROR = 'setError'
79
	OFFER_FILE_CONNECTION = 'offerFile'
80
	OFFER_CALL_CONNECTION = 'offerCall'
81
	REPLY_FILE_CONNECTION = 'replyFile'
82
	REPLY_CALL_CONNECTION = 'replyCall'
83
84
85
class VarNames(object):
86
	WEBRTC_QUED_ID = 'id'
87
	USER = 'user'
88
	USER_ID = 'userId'
89
	TIME = 'time'
90
	CONTENT = 'content'
91
	IMG = 'images'
92
	IMG_B64 = 'b64'
93
	IMG_FILE_NAME = 'fileName'
94
	EVENT = 'action'
95
	MESSAGE_ID = 'id'
96
	GENDER = 'sex'
97
	ROOM_NAME = 'name'
98
	ROOM_ID = 'roomId'
99
	ROOM_USERS = 'users'
100
	CHANNEL = 'channel'
101
	WEBRTC_OPPONENT_ID = 'opponentWsId'
102
	GET_MESSAGES_COUNT = 'count'
103
	GET_MESSAGES_HEADER_ID = 'headerId'
104
	CHANNEL_NAME = 'channel'
105
	IS_ROOM_PRIVATE = 'private'
106
	CONNECTION_ID = 'connId'
107
	HANDLER_NAME = 'handler'
108
	SYMBOL = 'symbol'
109
110
111
class HandlerNames:
112
	CHANNELS = 'channels'
113
	CHAT = 'chat'
114
	GROWL = 'growl'
115
	WEBRTC = 'webrtc'
116
	PEER_CONNECTION = 'peerConnection'
117
	WEBRTC_TRANSFER = 'webrtcTransfer'
118
	WS = 'ws'
119
120
121
class WebRtcRedisStates:
122
	RESPONDED = 'responded'
123
	READY = 'ready'
124
	OFFERED = 'offered'
125
	CLOSED = 'closed'
126
127
128
class RedisPrefix:
129
	USER_ID_CHANNEL_PREFIX = 'u'
130
	DEFAULT_CHANNEL = ALL_ROOM_ID
131
	CONNECTION_ID_LENGTH = 8  # should be secure
132
133
	@classmethod
134
	def generate_user(cls, key):
135
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
136
137
138
class MessagesCreator(object):
139
140
	def __init__(self, *args, **kwargs):
141
		self.sex = None
142
		self.sender_name = None
143
		self.id = None  # child init
144
		self.user_id = 0  # anonymous by default
145
146
	def default(self, content, event, handler):
147
		"""
148
		:return: {"action": event, "content": content, "time": "20:48:57"}
149
		"""
150
		return {
151
			VarNames.EVENT: event,
152
			VarNames.CONTENT: content,
153
			VarNames.USER_ID: self.user_id,
154
			VarNames.TIME: get_milliseconds(),
155
			VarNames.HANDLER_NAME: handler
156
		}
157
158
	def reply_webrtc(self, event, connection_id):
159
		"""
160
		:return: {"action": event, "content": content, "time": "20:48:57"}
161
		"""
162
		return {
163
			VarNames.EVENT: event,
164
			VarNames.CONNECTION_ID: connection_id,
165
			VarNames.USER_ID: self.user_id,
166
			VarNames.USER: self.sender_name,
167
			VarNames.WEBRTC_OPPONENT_ID: self.id,
168
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
169
		}
170
171
	def set_ws_id(self, random, self_id):
172
		return {
173
			VarNames.HANDLER_NAME: HandlerNames.WS,
174
			VarNames.EVENT: Actions.SET_WS_ID,
175
			VarNames.CONTENT: random,
176
			VarNames.WEBRTC_OPPONENT_ID: self_id
177
		}
178
179
	def room_online(self, online, event, channel):
180
		"""
181
		:return: {"action": event, "content": content, "time": "20:48:57"}
182
		"""
183
		room_less = self.default(online, event, HandlerNames.CHAT)
184
		room_less[VarNames.CHANNEL_NAME] = channel
185
		room_less[VarNames.USER] = self.sender_name
186
		room_less[VarNames.GENDER] = self.sex
187
		return room_less
188
189
	def offer_webrtc(self, content, connection_id, room_id, action):
190
		"""
191
		:return: {"action": "call", "content": content, "time": "20:48:57"}
192
		"""
193
		message = self.default(content, action, HandlerNames.WEBRTC)
194
		message[VarNames.USER] = self.sender_name
195
		message[VarNames.CONNECTION_ID] = connection_id
196
		message[VarNames.WEBRTC_OPPONENT_ID] = self.id
197
		message[VarNames.CHANNEL] = room_id
198
		return message
199
200
	def set_connection_id(self, qued_id, connection_id):
201
		return {
202
			VarNames.EVENT: Actions.SET_WEBRTC_ID,
203
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC,
204
			VarNames.CONNECTION_ID: connection_id,
205
			VarNames.WEBRTC_QUED_ID: qued_id
206
		}
207
208
	def set_webrtc_error(self, error, connection_id, qued_id=None):
209
		message = self.default(error, Actions.SET_WEBRTC_ERROR, HandlerNames.PEER_CONNECTION) # TODO file/call
210
		message[VarNames.CONNECTION_ID] = connection_id
211
		if qued_id:
212
			message[VarNames.WEBRTC_QUED_ID] = qued_id
213
		return message
214
215
	@classmethod
216
	def create_message(cls, message, images):
217
		res = {
218
			VarNames.USER_ID: message.sender_id,
219
			VarNames.CONTENT: message.content,
220
			VarNames.TIME: message.time,
221
			VarNames.MESSAGE_ID: message.id,
222
			VarNames.IMG: images
223
		}
224
		return res
225
226
	@classmethod
227
	def create_send_message(cls, message, event, imgs):
228
		"""
229
		:param message:
230
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
231
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
232
		"""
233
		res = cls.create_message(message, imgs)
234
		res[VarNames.EVENT] = event
235
		res[VarNames.CHANNEL] = message.room_id
236
		res[VarNames.SYMBOL] = message.symbol
237
		res[VarNames.HANDLER_NAME] = HandlerNames.CHAT
238
		return res
239
240
	@classmethod
241
	def append_images(cls, messages, images):
242
		res_mess = []
243
		for message in messages:
244
			res_images = cls.prepare_img(images, message.id)
245
			res_mess.append(cls.create_message(message, res_images))
246
		return res_mess
247
248
	@classmethod
249
	def prepare_img(cls, images, message_id):
250
		if images:
251
			return {x.symbol: x.img.url for x in images if x.message_id == message_id}
252
253
	@classmethod
254
	def get_messages(cls, messages, channel, images):
255
		"""
256
		:type messages: list[Messages]
257
		:type channel: str
258
		:type messages: QuerySet[Messages]
259
		"""
260
		return {
261
			VarNames.CONTENT: cls.append_images(messages, images),
262
			VarNames.EVENT: Actions.GET_MESSAGES,
263
			VarNames.CHANNEL: channel,
264
			VarNames.HANDLER_NAME: HandlerNames.CHAT
265
		}
266
267
	@property
268
	def channel(self):
269
		return RedisPrefix.generate_user(self.user_id)
270
271
	def subscribe_direct_channel_message(self, room_id, other_user_id):
272
		return {
273
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
274
			VarNames.ROOM_ID: room_id,
275
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
276
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS
277
		}
278
279
	def subscribe_room_channel_message(self, room_id, room_name):
280
		return {
281
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
282
			VarNames.ROOM_ID: room_id,
283
			VarNames.ROOM_USERS: [self.user_id],
284
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
285
			VarNames.ROOM_NAME: room_name
286
		}
287
288
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
289
		return {
290
			VarNames.EVENT: Actions.INVITE_USER,
291
			VarNames.ROOM_ID: room_id,
292
			VarNames.USER_ID: user_id,
293
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
294
			VarNames.ROOM_NAME: room_name,
295
			VarNames.CONTENT: users
296
		}
297
298
	def add_user_to_room(self, channel, user_id, content):
299
		return {
300
			VarNames.EVENT: Actions.ADD_USER,
301
			VarNames.CHANNEL: channel,
302
			VarNames.USER_ID: user_id,
303
			VarNames.HANDLER_NAME: HandlerNames.CHAT,
304
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
305
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
306
		}
307
308
	def unsubscribe_direct_message(self, room_id):
309
		return {
310
			VarNames.EVENT: Actions.DELETE_ROOM,
311
			VarNames.ROOM_ID: room_id,
312
			VarNames.USER_ID: self.user_id,
313
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
314
			VarNames.TIME: get_milliseconds()
315
		}
316
317
	def load_offline_message(self, offline_messages, channel_key):
318
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
319
		res[VarNames.CHANNEL] = channel_key
320
		return res
321
322
323
class MessagesHandler(MessagesCreator):
324
325
	def __init__(self, *args, **kwargs):
326
		self.closed_channels = None
327
		self.parsable_prefix = 'p'
328
		super(MessagesHandler, self).__init__(*args, **kwargs)
329
		self.webrtc_ids = {}
330
		self.ip = None
331
		from chat import global_redis
332
		self.async_redis_publisher = global_redis.async_redis_publisher
333
		self.sync_redis = global_redis.sync_redis
334
		self.channels = []
335
		self._logger = None
336
		self.async_redis = tornadoredis.Client(port=TORNADO_REDIS_PORT)
337
		self.patch_tornadoredis()
338
		self.pre_process_message = {
339
			Actions.GET_MESSAGES: self.process_get_messages,
340
			Actions.SEND_MESSAGE: self.process_send_message,
341
			Actions.WEBRTC: self.proxy_webrtc,
342
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
343
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
344
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
345
			Actions.ACCEPT_CALL: self.accept_call,
346
			Actions.ACCEPT_FILE: self.accept_file,
347
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
348
			Actions.DELETE_ROOM: self.delete_channel,
349
			Actions.EDIT_MESSAGE: self.edit_message,
350
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
351
			Actions.INVITE_USER: self.invite_user,
352
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
353
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
354
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
355
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
356
		}
357
		self.post_process_message = {
358
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
359
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
360
			Actions.DELETE_ROOM: self.send_client_delete_channel,
361
			Actions.INVITE_USER: self.send_client_new_channel,
362
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
363
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
364
		}
365
366
	def patch_tornadoredis(self):  # TODO remove this
367
		fabric = type(self.async_redis.connection.readline)
368
		self.async_redis.connection.old_read = self.async_redis.connection.readline
369
		def new_read(new_self, callback=None):
370
			try:
371
				return new_self.old_read(callback=callback)
372
			except Exception as e:
373
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
374
				self.logger.error(e)
375
				self.logger.error(
376
					"Exception info: "
377
					"self.id: %s ;;; "
378
					"self.connected = '%s';;; "
379
					"Redis default channel online = '%s';;; "
380
					"self.channels = '%s';;; "
381
					"self.closed_channels  = '%s';;;",
382
					self.id, self.connected, current_online, self.channels, self.closed_channels
383
				)
384
				raise e
385
386
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
387
388
	@property
389
	def connected(self):
390
		raise NotImplemented
391
392
	@connected.setter
393
	def connected(self, value):
394
		raise NotImplemented
395
396
	@tornado.gen.engine
397
	def listen(self, channels):
398
		yield tornado.gen.Task(
399
			self.async_redis.subscribe, channels)
400
		self.async_redis.listen(self.pub_sub_message)
401
402
	@property
403
	def logger(self):
404
		return self._logger if self._logger else base_logger
405
406
	@tornado.gen.engine
407
	def add_channel(self, channel):
408
		self.channels.append(channel)
409
		yield tornado.gen.Task(
410
			self.async_redis.subscribe, (channel,))
411
412
	def evaluate(self, query_set):
413
		self.do_db(len, query_set)
414
		return query_set
415
416
	def do_db(self, callback, *args, **kwargs):
417
		try:
418
			return callback(*args, **kwargs)
419
		except (OperationalError, InterfaceError) as e:
420
			if 'MySQL server has gone away' in str(e):
421
				self.logger.warning('%s, reconnecting' % e)
422
				connection.close()
423
				return callback(*args, **kwargs)
424
			else:
425
				raise e
426
427
	def execute_query(self, query, *args, **kwargs):
428
		cursor = connection.cursor()
429
		cursor.execute(query, *args, **kwargs)
430
		desc = cursor.description
431
		return [
432
			dict(zip([col[0] for col in desc], row))
433
			for row in cursor.fetchall()
434
			]
435
436
	def get_online_from_redis(self, channel, check_self_online=False):
437
		"""
438
		:rtype : dict
439
		returns (dict, bool) if check_type is present
440
		"""
441
		online = self.sync_redis.smembers(channel)
442
		self.logger.debug('!! channel %s redis online: %s', channel, online)
443
		result = set()
444
		user_is_online = False
445
		# redis stores8 REDIS_USER_FORMAT, so parse them
446
		if online:
447
			for raw in online:  # py2 iteritems
448
				decoded = raw.decode('utf-8')
449
				# : char specified in cookies_middleware.py.create_id
450
				user_id = int(decoded.split(':')[0])
451
				if user_id == self.user_id and decoded != self.id:
452
					user_is_online = True
453
				result.add(user_id)
454
		result = list(result)
455
		return (result, user_is_online) if check_self_online else result
456
457
	def add_online_user(self, room_id, offline_messages=None):
458
		"""
459
		adds to redis
460
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
461
		:return:
462
		"""
463
		self.async_redis_publisher.sadd(room_id, self.id)
464
		# since we add user to online first, latest trigger will always show correct online
465
		online, is_online = self.get_online_from_redis(room_id, True)
466
		if not is_online:  # if a new tab has been opened
467
			online.append(self.user_id)
468
			online_user_names_mes = self.room_online(
469
				online,
470
				Actions.LOGIN,
471
				room_id
472
			)
473
			self.logger.info('!! First tab, sending refresh online for all')
474
			self.publish(online_user_names_mes, room_id)
475
			if offline_messages:
476
				self.ws_write(self.load_offline_message(offline_messages, room_id))
477
		else:  # Send user names to self
478
			online_user_names_mes = self.room_online(
479
				online,
480
				Actions.REFRESH_USER,
481
				room_id
482
			)
483
			self.logger.info('!! Second tab, retrieving online for self')
484
			self.ws_write(online_user_names_mes)
485
486
	def publish(self, message, channel, parsable=False):
487
		jsoned_mess = json.dumps(message)
488
		self.logger.debug('<%s> %s', channel, jsoned_mess)
489
		if parsable:
490
			jsoned_mess = self.encode(jsoned_mess)
491
		self.async_redis_publisher.publish(channel, jsoned_mess)
492
493
	def encode(self, message):
494
		"""
495
		Marks message with prefix to specify that
496
		it should be decoded and proccesed before sending to client
497
		@param message: message to mark
498
		@return: marked message
499
		"""
500
		return self.parsable_prefix + message
501
502
	def remove_parsable_prefix(self, message):
503
		if message.startswith(self.parsable_prefix):
504
			return message[1:]
505
506
	def pub_sub_message(self, message):
507
		data = message.body
508
		if isinstance(data, str_type):  # subscribe event
509
			prefixless_str = self.remove_parsable_prefix(data)
510
			if prefixless_str:
511
				dict_message = json.loads(prefixless_str)
512
				res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
513
				if not res:
514
					self.ws_write(prefixless_str)
515
			else:
516
				self.ws_write(data)
517
518
	def ws_write(self, message):
519
		raise NotImplementedError('WebSocketHandler implements')
520
521
	def process_send_message(self, message):
522
		"""
523
		:type message: dict
524
		"""
525
		raw_imgs = message.get(VarNames.IMG)
526
		channel = message[VarNames.CHANNEL]
527
		message_db = Message(
528
			sender_id=self.user_id,
529
			content=message[VarNames.CONTENT],
530
			symbol=get_max_key(raw_imgs)
531
		)
532
		message_db.room_id = channel
533
		self.do_db(message_db.save)
534
		db_images = self.save_images(raw_imgs, message_db.id)
535
		prepared_message = self.create_send_message(
536 View Code Duplication
			message_db,
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
537
			Actions.PRINT_MESSAGE,
538
			self.prepare_img(db_images, message_db.id)
539
		)
540
		self.publish(prepared_message, channel)
541
542
	def close_file_connection(self, in_message):
543
		connection_id = in_message[VarNames.CONNECTION_ID]
544
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
545
		if not self_channel_status:
546
			raise Exception("Access Denied")
547
		if self_channel_status != WebRtcRedisStates.CLOSED:
548
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
549
			if sender_id == self.id:
550
				self.close_file_sender(connection_id)
551
			else:
552
				self.close_file_receiver(connection_id, in_message, sender_id)
553
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
554
555
	def close_call_connection(self, in_message):
556 View Code Duplication
		connection_id = in_message[VarNames.CONNECTION_ID]
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
557
		conn_users = self.sync_redis.shgetall(connection_id)
558
		if conn_users[self.id] in [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED]:
559
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
560
			del conn_users[self.id]
561
			message = {
562
				VarNames.EVENT: Actions.CLOSE_CALL_CONNECTION,
563
				VarNames.CONNECTION_ID: connection_id,
564
				VarNames.USER_ID: self.user_id,
565
				VarNames.WEBRTC_OPPONENT_ID: self.id,
566
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
567
			}
568
			for user in conn_users:
569
				if conn_users[user] != WebRtcRedisStates.CLOSED:
570
					self.publish(message, user)
571
		else:
572
			raise ValidationError("Invalid channel status.")
573
574
	def cancel_call_connection(self, in_message, reply_action):
575
		self.send_call_answer(in_message, WebRtcRedisStates.CLOSED, Actions.CANCEL_CALL_CONNECTION)
576
577
	def close_file_receiver(self, connection_id, in_message, sender_id):
578
		sender_status = self.sync_redis.shget(connection_id, sender_id)
579
		if not sender_status:
580
			raise Exception("Access denied")
581
		if sender_status != WebRtcRedisStates.CLOSED:
582
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
583
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
584
			self.publish(in_message, sender_id)
585
586
	def close_file_sender(self, connection_id):
587
		values = self.sync_redis.shgetall(connection_id)
588
		del values[self.id]
589
		for ws_id in values:
590
			if values[ws_id] == WebRtcRedisStates.CLOSED:
591
				continue
592
			self.publish({
593
				VarNames.EVENT: Actions.CLOSE_FILE_CONNECTION,
594
				VarNames.CONNECTION_ID: connection_id,
595
				VarNames.WEBRTC_OPPONENT_ID: self.id,
596
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
597
			}, ws_id)
598
599
	def accept_file(self, in_message):
600
		connection_id = in_message[VarNames.CONNECTION_ID] # TODO accept all if call
601
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
602
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
603
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
604
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.RESPONDED:
605
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
606
			self.publish({
607
				VarNames.EVENT: Actions.ACCEPT_FILE,
608
				VarNames.CONNECTION_ID: connection_id,
609
				VarNames.WEBRTC_OPPONENT_ID: self.id,
610
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
611
			}, sender_ws_id)
612
		else:
613
			raise ValidationError("Invalid channel status")
614
615
	# todo
616
	# we can use channel_status = self.sync_redis.shgetall(connection_id)
617
	# and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
618
	# if we shgetall and only then do async hset
619
	# we can catch an issue when 2 concurrent users accepted the call
620
	# but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
621
	def accept_call(self, in_message):
622
		connection_id = in_message[VarNames.CONNECTION_ID]
623
		self_status = self.sync_redis.shget(connection_id, self.id)
624
		if self_status == WebRtcRedisStates.RESPONDED:
625 View Code Duplication
			self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.READY)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
626
			channel_status = self.sync_redis.shgetall(connection_id)
627
			del channel_status[self.id]
628
			message = {
629
				VarNames.EVENT: Actions.ACCEPT_CALL,
630
				VarNames.USER_ID: self.user_id,
631
				VarNames.CONNECTION_ID: connection_id,
632
				VarNames.WEBRTC_OPPONENT_ID: self.id,
633
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
634
			}
635
			for key in channel_status:
636
				if channel_status[key] != WebRtcRedisStates.CLOSED:
637
					self.publish(message, key)
638
		else:
639
			raise ValidationError("Invalid channel status")
640
641
	def offer_webrtc_connection(self, in_message):
642
		room_id = in_message[VarNames.CHANNEL]
643
		content = in_message.get(VarNames.CONTENT)
644
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
645 View Code Duplication
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
646
		# use list because sets dont have 1st element which is offerer
647
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
648
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
649
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
650
		self_message = self.set_connection_id(qued_id, connection_id)
651
		self.ws_write(self_message)
652
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
653
		self.publish(opponents_message, room_id, True)
654
655
	def reply_call_connection(self, in_message):
656
		self.send_call_answer(in_message, WebRtcRedisStates.RESPONDED, Actions.REPLY_CALL_CONNECTION)
657
658
	def send_call_answer(self, in_message, status_set, reply_action):
659
		connection_id = in_message[VarNames.CONNECTION_ID]
660
		conn_users = self.sync_redis.shgetall(connection_id)
661
		if conn_users[self.id] == WebRtcRedisStates.OFFERED:
662
			self.async_redis_publisher.hset(connection_id, self.id, status_set)
663
			del conn_users[self.id]
664
			message = self.reply_webrtc(reply_action, connection_id)
665
			for user in conn_users:
666
				if conn_users[user] != WebRtcRedisStates.CLOSED:
667
					self.publish(message, user)
668
		else:
669
			raise ValidationError("Invalid channel status.")
670
671
	def reply_file_connection(self, in_message):
672
		connection_id = in_message[VarNames.CONNECTION_ID]
673
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
674
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
675
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
676
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
677
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
678
			self.publish(self.reply_webrtc(Actions.REPLY_FILE_CONNECTION, connection_id), sender_ws_id)
679
		else:
680
			raise ValidationError("Invalid channel status.")
681
682
	def proxy_webrtc(self, in_message):
683
		"""
684
		:type in_message: dict
685
		"""
686
		connection_id = in_message[VarNames.CONNECTION_ID]
687
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
688
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
689
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
690
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
691
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
692
				self_channel_status, opponent_channel_status
693
			)) # todo receiver should only accept proxy_webrtc from sender, sender can accept all
694
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
695
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
696
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
697
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
698
		self.logger.debug("Forwarding message to channel %s, self %s, other status %s",
699
			channel, self_channel_status, opponent_channel_status
700
		)
701
		self.publish(in_message, channel)
702
703
	def create_new_room(self, message):
704
		room_name = message[VarNames.ROOM_NAME]
705
		if not room_name or len(room_name) > 16:
706
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
707
		room = Room(name=room_name)
708
		self.do_db(room.save)
709
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
710
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
711
		self.publish(subscribe_message, self.channel, True)
712
713
	def invite_user(self, message):
714
		room_id = message[VarNames.ROOM_ID]
715
		user_id = message[VarNames.USER_ID]
716
		if room_id not in self.channels:
717
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
718
		room = self.do_db(Room.objects.get, id=room_id)
719
		if room.is_private:
720
			raise ValidationError("You can't add users to direct room, create a new room instead")
721
		try:
722
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
723
		except IntegrityError:
724
			raise ValidationError("User is already in channel")
725
		users_in_room = {}
726
		for user in room.users.all():
727
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
728
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
729
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
730
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
731
732
	def create_room(self, user_rooms, user_id):
733
		if self.user_id == user_id:
734
			room_ids = list([room['room_id'] for room in self.evaluate(user_rooms)])
735
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
736
		else:
737
			rooms_query = RoomUsers.objects.filter(user_id=user_id, room__in=user_rooms)
738
			query_res = rooms_query.values('room__id', 'room__disabled')
739
		try:
740
			room = self.do_db(query_res.get)
741
			room_id = room['room__id']
742
			self.update_room(room_id, room['room__disabled'])
743
		except RoomUsers.DoesNotExist:
744
			room = Room()
745
			room.save()
746
			room_id = room.id
747
			if self.user_id == user_id:
748
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
749
			else:
750
				RoomUsers.objects.bulk_create([
751
					RoomUsers(user_id=user_id, room_id=room_id),
752
					RoomUsers(user_id=self.user_id, room_id=room_id),
753
				])
754
		return room_id
755
756
	def update_room(self, room_id, disabled):
757
		if not disabled:
758
			raise ValidationError('This room already exist')
759
		else:
760
			Room.objects.filter(id=room_id).update(disabled=False)
761
762
	def create_user_channel(self, message):
763
		user_id = message[VarNames.USER_ID]
764
		# get all self private rooms ids
765
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
766
		# get private room that contains another user from rooms above
767
		room_id = self.create_room(user_rooms, user_id)
768
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
769
		self.publish(subscribe_message, self.channel, True)
770
		other_channel = RedisPrefix.generate_user(user_id)
771
		if self.channel != other_channel:
772
			self.publish(subscribe_message, other_channel, True)
773
774
	def delete_channel(self, message):
775
		room_id = message[VarNames.ROOM_ID]
776
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
777
			raise ValidationError('You are not allowed to exit this room')
778
		room = self.do_db(Room.objects.get, id=room_id)
779
		if room.disabled:
780
			raise ValidationError('Room is already deleted')
781
		if room.name is None:  # if private then disable
782
			room.disabled = True
783
		else: # if public -> leave the room, delete the link
784
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
785
			online = self.get_online_from_redis(room_id)
786
			online.remove(self.user_id)
787
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
788
		room.save()
789
		message = self.unsubscribe_direct_message(room_id)
790
		self.publish(message, room_id, True)
791
792
	def edit_message(self, data):
793
		# ord(next (iter (message['images'])))
794
		message_id = data[VarNames.MESSAGE_ID]
795
		message = Message.objects.get(id=message_id)
796
		if message.sender_id != self.user_id:
797
			raise ValidationError("You can only edit your messages")
798
		if message.time + 600000 < get_milliseconds():
799
			raise ValidationError("You can only edit messages that were send not more than 10 min ago")
800
		if message.deleted:
801
			raise ValidationError("Already deleted")
802
		message.content = data[VarNames.CONTENT]
803
		selector = Message.objects.filter(id=message_id)
804
		if message.content is None:
805
			action = Actions.DELETE_MESSAGE
806
			prep_imgs = None
807
			selector.update(deleted=True)
808
		else:
809
			action = Actions.EDIT_MESSAGE
810
			prep_imgs = self.process_images(data.get(VarNames.IMG), message)
811
			selector.update(content=message.content, symbol=message.symbol)
812
		self.publish(self.create_send_message(message, action, prep_imgs), message.room_id)
813
814
	def process_images(self, images, message):
815
		if images:
816
			if message.symbol:
817
				self.replace_symbols_if_needed(images, message)
818
			new_symbol = get_max_key(images)
819
			if message.symbol is None or new_symbol > message.symbol:
820
				message.symbol = new_symbol
821
		db_images = self.save_images(images, message.id)
822
		if message.symbol:  # fetch all, including that we just store
823
			db_images = Image.objects.filter(message_id=message.id)
824
		return  self.prepare_img(db_images, message.id)
825
826
	def save_images(self, images, message_id):
827
		db_images = []
828
		if images:
829
			db_images = [Image(
830
				message_id=message_id,
831
				img=extract_photo(
832
					images[k][VarNames.IMG_B64],
833
					images[k][VarNames.IMG_FILE_NAME]
834
				),
835
				symbol=k) for k in images]
836
			Image.objects.bulk_create(db_images)
837
		return db_images
838
839
	def replace_symbols_if_needed(self, images, message):
840
		# if message was edited user wasn't notified about that and he edits message again
841
		# his symbol can go out of sync
842
		order = ord(message.symbol)
843
		new_dict = []
844
		for img in images:
845
			if img <= message.symbol:
846
				order += 1
847
				new_symb = chr(order)
848
				new_dict.append({
849
					'new': new_symb,
850
					'old': img,
851
					'value': images[img]
852
				})
853
				message.content = message.content.replace(img, new_symb)
854
		for d in new_dict:  # dictionary changed size during iteration
855
			del images[d['old']]
856
			images[d['new']] = d['value']
857
858
	def send_client_new_channel(self, message):
859
		room_id = message[VarNames.ROOM_ID]
860
		self.add_channel(room_id)
861
		self.add_online_user(room_id)
862
863
	def set_opponent_call_channel(self, message):
864
		connection_id = message[VarNames.CONNECTION_ID]
865
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
866
			return True
867
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
868
869
	def send_client_delete_channel(self, message):
870
		room_id = message[VarNames.ROOM_ID]
871
		self.async_redis.unsubscribe((room_id,))
872
		self.async_redis_publisher.hdel(room_id, self.id)
873
		self.channels.remove(room_id)
874
875
	def process_get_messages(self, data):
876
		"""
877
		:type data: dict
878
		"""
879
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
880
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
881
		room_id = data[VarNames.CHANNEL]
882
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
883
		if header_id is None:
884
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
885
		else:
886
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
887
		images = self.do_db(self.get_message_images, messages)
888
		response = self.get_messages(messages, room_id, images)
889
		self.ws_write(response)
890
891
	def get_message_images(self, messages):
892
		ids = [message.id for message in messages if message.symbol]
893
		if ids:
894
			images = Image.objects.filter(message_id__in=ids)
895
		else:
896
			images = []
897
		self.logger.info('!! Messages have %d images', len(images))
898
		return images
899
900
	def get_offline_messages(self):
901
		res = {}
902
		off_mess = Message.objects.filter(
903
			id__gt=F('room__roomusers__last_read_message_id'),
904
			deleted=False,
905
			room__roomusers__user_id=self.user_id
906
		)
907
		images = self.do_db(self.get_message_images, off_mess)
908
		for message in off_mess:
909
			prep_m = self.create_message(message, self.prepare_img(images, message.id))
910
			res.setdefault(message.room_id, []).append(prep_m)
911
		return res
912
913
	def get_users_in_current_user_rooms(self):
914
		"""
915
		{
916
			"ROOM_ID:1": {
917
				"name": "All",
918
				"users": {
919
					"USER_ID:admin": {
920
						"name": "USER_NAME:admin",
921
						"sex": "SEX:Secret"
922
					},
923
					"USER_ID_2": {
924
						"name": "USER_NAME:Mike",
925
						"sex": "Male"
926
					}
927
				},
928
				"isPrivate": true
929
			}
930
		}
931
		"""
932
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
933
		res = {room['id']: {
934
				VarNames.ROOM_NAME: room['name'],
935
				VarNames.ROOM_USERS: {}
936
			} for room in user_rooms}
937
		room_ids = (room_id for room_id in res)
938
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
939
		for user in rooms_users:
940
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
941
		return res
942
943
	def set_js_user_structure(self, user_dict, user_id, name, sex):
944
		user_dict[user_id] = {
945
			VarNames.USER: name,
946
			VarNames.GENDER: GENDERS[sex]
947
		}
948
949
	def save_ip(self):
950
		if (self.do_db(UserJoinedInfo.objects.filter(
951
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
952
			return
953
		ip_address = get_or_create_ip(self.ip, self.logger)
954
		UserJoinedInfo.objects.create(
955
			ip=ip_address,
956
			user_id=self.user_id
957
		)
958
959
	def publish_logout(self, channel, log_data):
960
		# seems like async solves problem with connection lost and wrong data status
961
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
962
		online, is_online = self.get_online_from_redis(channel, True)
963
		log_data[channel] = {'online': online, 'is_online': is_online}
964
		if not is_online:
965
			message = self.room_online(online, Actions.LOGOUT, channel)
966
			self.publish(message, channel)
967
			return True
968
969
970
class AntiSpam(object):
971
972
	def __init__(self):
973
		self.spammed = 0
974
		self.info = {}
975
976
	def check_spam(self, json_message):
977
		message_length = len(json_message)
978
		info_key = int(round(time.time() * 100))
979
		self.info[info_key] = message_length
980
		if message_length > MAX_MESSAGE_SIZE:
981
			self.spammed += 1
982
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
983
		self.check_timed_spam()
984
985
	def check_timed_spam(self):
986
		# TODO implement me
987
		pass
988
		# raise ValidationError("You're chatting too much, calm down a bit!")
989
990
991
class TornadoHandler(WebSocketHandler, MessagesHandler):
992
993
	def __init__(self, *args, **kwargs):
994
		super(TornadoHandler, self).__init__(*args, **kwargs)
995
		self.__connected__ = False
996
		self.anti_spam = AntiSpam()
997
998
	@property
999
	def connected(self):
1000
		return self.__connected__
1001
1002
	@connected.setter
1003
	def connected(self, value):
1004
		self.__connected__ = value
1005
1006
	def data_received(self, chunk):
1007
		pass
1008
1009
	def on_message(self, json_message):
1010
		try:
1011
			if not self.connected:
1012
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
1013
			if not json_message:
1014
				raise Exception('Skipping null message')
1015
			# self.anti_spam.check_spam(json_message)
1016
			self.logger.debug('<< %.1000s', json_message)
1017
			message = json.loads(json_message)
1018
			if message[VarNames.EVENT] not in self.pre_process_message:
1019
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
1020
			channel = message.get(VarNames.CHANNEL)
1021
			if channel and channel not in self.channels:
1022
				raise Exception('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
1023
			self.pre_process_message[message[VarNames.EVENT]](message)
1024
		except ValidationError as e:
1025
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
1026
			self.ws_write(error_message)
1027
1028
	def on_close(self):
1029
		if self.async_redis.subscribed:
1030
			self.logger.info("Close event, unsubscribing from %s", self.channels)
1031
			self.async_redis.unsubscribe(self.channels)
1032
		else:
1033
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
1034
		log_data = {}
1035
		gone_offline = False
1036
		for channel in self.channels:
1037
			if not isinstance(channel, Number):
1038
				continue
1039
			self.sync_redis.srem(channel, self.id)
1040
			if self.connected:
1041
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
1042
		if gone_offline:
1043
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
1044
			self.logger.info("Updated %s last read message", res)
1045
		self.disconnect(json.dumps(log_data))
1046
1047
	def disconnect(self, log_data, tries=0):
1048
		"""
1049
		Closes a connection if it's not in proggress, otherwice timeouts closing
1050
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
1051
		"""
1052
		self.connected = False
1053
		self.closed_channels = self.channels
1054
		self.channels = []
1055
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
1056
			self.logger.debug('Closing a connection timeouts')
1057
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
1058
		else:
1059
			self.logger.info("Close connection result: %s", log_data)
1060
			self.async_redis.disconnect()
1061
1062
	def generate_self_id(self):
1063
		"""
1064
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
1065
		So if ws loses a connection it still can reconnect with same id,
1066
		and TornadoHandler can restore webrtc_connections to previous state
1067
		"""
1068
		conn_arg = self.get_argument('id', None)
1069
		self.id, random = create_id(self.user_id, conn_arg)
1070
		if random != conn_arg:
1071
			self.ws_write(self.set_ws_id(random, self.id))
1072
1073
	def open(self):
1074
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
1075
		if sessionStore.exists(session_key):
1076
			self.ip = self.get_client_ip()
1077
			session = SessionStore(session_key)
1078
			self.user_id = int(session["_auth_user_id"])
1079
			self.generate_self_id()
1080
			log_params = {
1081
				'id': self.id,
1082
				'ip': self.ip
1083
			}
1084
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
1085
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
1086
			self.async_redis.connect()
1087
			user_db = self.do_db(User.objects.get, id=self.user_id)
1088
			self.sender_name = user_db.username
1089
			self.sex = user_db.sex_str
1090
			user_rooms = self.get_users_in_current_user_rooms()
1091
			self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
1092
			# get all missed messages
1093
			self.channels = []  # py2 doesn't support clear()
1094
			self.channels.append(self.channel)
1095
			self.channels.append(self.id)
1096
			for room_id in user_rooms:
1097
				self.channels.append(room_id)
1098
			self.listen(self.channels)
1099
			off_messages = self.get_offline_messages()
1100
			for room_id in user_rooms:
1101
				self.add_online_user(room_id, off_messages.get(room_id))
1102
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
1103
			self.connected = True
1104
			Thread(target=self.save_ip).start()
1105
		else:
1106
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
1107
			self.close(403, "Session key %s has been rejected" % session_key)
1108
1109
	def check_origin(self, origin):
1110
		"""
1111
		check whether browser set domain matches origin
1112
		"""
1113
		parsed_origin = urlparse(origin)
1114
		origin = parsed_origin.netloc
1115
		origin_domain = origin.split(':')[0].lower()
1116
		browser_set = self.request.headers.get("Host")
1117
		browser_domain = browser_set.split(':')[0]
1118
		return browser_domain == origin_domain
1119
1120
	def ws_write(self, message):
1121
		"""
1122
		Tries to send message, doesn't throw exception outside
1123
		:type self: MessagesHandler
1124
		"""
1125
		# self.logger.debug('<< THREAD %s >>', os.getppid())
1126
		try:
1127
			if isinstance(message, dict):
1128
				message = json.dumps(message)
1129
			if not isinstance(message, str_type):
1130
				raise ValueError('Wrong message type : %s' % str(message))
1131
			self.logger.debug(">> %.1000s", message)
1132
			self.write_message(message)
1133
		except tornado.websocket.WebSocketClosedError as e:
1134
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
1135
1136
	def get_client_ip(self):
1137
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
1138