Completed
Push — master ( 37a203...0fb357 )
by Andrew
35s
created

MessagesHandler.edit_message()   F

Complexity

Conditions 10

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 0
Metric Value
cc 10
c 2
b 1
f 0
dl 0
loc 31
rs 3.1304

How to fix   Complexity   

Complexity

Complex classes like MessagesHandler.edit_message() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from numbers import Number
7
from threading import Thread
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado import ioloop
20
from tornado.websocket import WebSocketHandler
21
22
from chat.cookies_middleware import create_id
23
from chat.log_filters import id_generator
24
from chat.utils import extract_photo, get_max_key
25
from chat.utils import get_or_create_ip
26
27
try:  # py2
28
	from urlparse import urlparse
29
except ImportError:  # py3
30
	from urllib.parse import urlparse
31
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM, \
32
	TORNADO_REDIS_PORT, WEBRTC_CONNECTION
33
from chat.models import User, Message, Room, get_milliseconds, UserJoinedInfo, RoomUsers, Image
34
35
PY3 = sys.version > '3'
36
str_type = str if PY3 else basestring
37
38
39
sessionStore = SessionStore()
40
41
parent_logger = logging.getLogger(__name__)
42
base_logger = logging.LoggerAdapter(parent_logger, {
43
	'id': 0,
44
	'ip': '000.000.000.000'
45
})
46
47
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
48
#CONNECTION_POOL = tornadoredis.ConnectionPool(
49
#	max_connections=500,
50
#	wait_for_available=True)
51
52
53
class Actions(object):
54
	LOGIN = 'addOnlineUser'
55
	SET_WS_ID = 'setWsId'
56
	LOGOUT = 'removeOnlineUser'
57
	SEND_MESSAGE = 'sendMessage'
58
	PRINT_MESSAGE = 'printMessage'
59
	WEBRTC = 'sendRtcData'
60
	CLOSE_FILE_CONNECTION = 'destroyFileConnection'
61
	CLOSE_CALL_CONNECTION = 'destroyCallConnection'
62
	CANCEL_CALL_CONNECTION = 'cancelCallConnection'
63
	ACCEPT_CALL = 'acceptCall'
64
	ACCEPT_FILE = 'acceptFile'
65
	ROOMS = 'setRooms'
66
	REFRESH_USER = 'setOnlineUsers'
67
	GROWL_MESSAGE = 'growl'
68
	GET_MESSAGES = 'loadMessages'
69
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
70
	DELETE_ROOM = 'deleteRoom'
71
	EDIT_MESSAGE = 'editMessage'
72
	DELETE_MESSAGE = 'deleteMessage'
73
	CREATE_ROOM_CHANNEL = 'addRoom'
74
	INVITE_USER = 'inviteUser'
75
	ADD_USER = 'addUserToDom'
76
	OFFLINE_MESSAGES = 'loadOfflineMessages'
77
	SET_WEBRTC_ID = 'setConnectionId'
78
	SET_WEBRTC_ERROR = 'setError'
79
	OFFER_FILE_CONNECTION = 'offerFile'
80
	OFFER_CALL_CONNECTION = 'offerCall'
81
	REPLY_FILE_CONNECTION = 'replyFile'
82
	REPLY_CALL_CONNECTION = 'replyCall'
83
84
85
class VarNames(object):
86
	WEBRTC_QUED_ID = 'id'
87
	USER = 'user'
88
	USER_ID = 'userId'
89
	TIME = 'time'
90
	CONTENT = 'content'
91
	IMG = 'images'
92
	IMG_B64 = 'b64'
93
	IMG_FILE_NAME = 'fileName'
94
	EVENT = 'action'
95
	MESSAGE_ID = 'id'
96
	GENDER = 'sex'
97
	ROOM_NAME = 'name'
98
	ROOM_ID = 'roomId'
99
	ROOM_USERS = 'users'
100
	CHANNEL = 'channel'
101
	WEBRTC_OPPONENT_ID = 'opponentWsId'
102
	GET_MESSAGES_COUNT = 'count'
103
	GET_MESSAGES_HEADER_ID = 'headerId'
104
	CHANNEL_NAME = 'channel'
105
	IS_ROOM_PRIVATE = 'private'
106
	CONNECTION_ID = 'connId'
107
	HANDLER_NAME = 'handler'
108
	SYMBOL = 'symbol'
109
110
111
class HandlerNames:
112
	CHANNELS = 'channels'
113
	CHAT = 'chat'
114
	GROWL = 'growl'
115
	WEBRTC = 'webrtc'
116
	PEER_CONNECTION = 'peerConnection'
117
	WEBRTC_TRANSFER = 'webrtcTransfer'
118
	WS = 'ws'
119
120
121
class WebRtcRedisStates:
122
	RESPONDED = 'responded'
123
	READY = 'ready'
124
	OFFERED = 'offered'
125
	CLOSED = 'closed'
126
127
128
class RedisPrefix:
129
	USER_ID_CHANNEL_PREFIX = 'u'
130
	DEFAULT_CHANNEL = ALL_ROOM_ID
131
	CONNECTION_ID_LENGTH = 8  # should be secure
132
133
	@classmethod
134
	def generate_user(cls, key):
135
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
136
137
138
class MessagesCreator(object):
139
140
	def __init__(self, *args, **kwargs):
141
		self.sex = None
142
		self.sender_name = None
143
		self.id = None  # child init
144
		self.user_id = 0  # anonymous by default
145
146
	def default(self, content, event, handler):
147
		"""
148
		:return: {"action": event, "content": content, "time": "20:48:57"}
149
		"""
150
		return {
151
			VarNames.EVENT: event,
152
			VarNames.CONTENT: content,
153
			VarNames.USER_ID: self.user_id,
154
			VarNames.TIME: get_milliseconds(),
155
			VarNames.HANDLER_NAME: handler
156
		}
157
158
	def reply_webrtc(self, event, connection_id):
159
		"""
160
		:return: {"action": event, "content": content, "time": "20:48:57"}
161
		"""
162
		return {
163
			VarNames.EVENT: event,
164
			VarNames.CONNECTION_ID: connection_id,
165
			VarNames.USER_ID: self.user_id,
166
			VarNames.USER: self.sender_name,
167
			VarNames.WEBRTC_OPPONENT_ID: self.id,
168
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
169
		}
170
171
	def set_ws_id(self, random, self_id):
172
		return {
173
			VarNames.HANDLER_NAME: HandlerNames.WS,
174
			VarNames.EVENT: Actions.SET_WS_ID,
175
			VarNames.CONTENT: random,
176
			VarNames.WEBRTC_OPPONENT_ID: self_id
177
		}
178
179
	def room_online(self, online, event, channel):
180
		"""
181
		:return: {"action": event, "content": content, "time": "20:48:57"}
182
		"""
183
		room_less = self.default(online, event, HandlerNames.CHAT)
184
		room_less[VarNames.CHANNEL_NAME] = channel
185
		room_less[VarNames.USER] = self.sender_name
186
		room_less[VarNames.GENDER] = self.sex
187
		return room_less
188
189
	def offer_webrtc(self, content, connection_id, room_id, action):
190
		"""
191
		:return: {"action": "call", "content": content, "time": "20:48:57"}
192
		"""
193
		message = self.default(content, action, HandlerNames.WEBRTC)
194
		message[VarNames.USER] = self.sender_name
195
		message[VarNames.CONNECTION_ID] = connection_id
196
		message[VarNames.WEBRTC_OPPONENT_ID] = self.id
197
		message[VarNames.CHANNEL] = room_id
198
		return message
199
200
	def set_connection_id(self, qued_id, connection_id):
201
		return {
202
			VarNames.EVENT: Actions.SET_WEBRTC_ID,
203
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC,
204
			VarNames.CONNECTION_ID: connection_id,
205
			VarNames.WEBRTC_QUED_ID: qued_id
206
		}
207
208
	def set_webrtc_error(self, error, connection_id, qued_id=None):
209
		message = self.default(error, Actions.SET_WEBRTC_ERROR, HandlerNames.PEER_CONNECTION) # TODO file/call
210
		message[VarNames.CONNECTION_ID] = connection_id
211
		if qued_id:
212
			message[VarNames.WEBRTC_QUED_ID] = qued_id
213
		return message
214
215
	@classmethod
216
	def create_message(cls, message, images):
217
		res = {
218
			VarNames.USER_ID: message.sender_id,
219
			VarNames.CONTENT: message.content,
220
			VarNames.TIME: message.time,
221
			VarNames.MESSAGE_ID: message.id,
222
			VarNames.IMG: images
223
		}
224
		return res
225
226
	@classmethod
227
	def create_send_message(cls, message, event, imgs):
228
		"""
229
		:param message:
230
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
231
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
232
		"""
233
		res = cls.create_message(message, imgs)
234
		res[VarNames.EVENT] = event
235
		res[VarNames.CHANNEL] = message.room_id
236
		res[VarNames.SYMBOL] = message.symbol
237
		res[VarNames.HANDLER_NAME] = HandlerNames.CHAT
238
		return res
239
240
	@classmethod
241
	def append_images(cls, messages, images):
242
		res_mess = []
243
		for message in messages:
244
			res_images = cls.prepare_img(images, message.id)
245
			res_mess.append(cls.create_message(message, res_images))
246
		return res_mess
247
248
	@classmethod
249
	def prepare_img(cls, images, message_id):
250
		if images:
251
			return {x.symbol: x.img.url for x in images if x.message_id == message_id}
252
253
	@classmethod
254
	def get_messages(cls, messages, channel, images):
255
		"""
256
		:type messages: list[Messages]
257
		:type channel: str
258
		:type messages: QuerySet[Messages]
259
		"""
260
		return {
261
			VarNames.CONTENT: cls.append_images(messages, images),
262
			VarNames.EVENT: Actions.GET_MESSAGES,
263
			VarNames.CHANNEL: channel,
264
			VarNames.HANDLER_NAME: HandlerNames.CHAT
265
		}
266
267
	@property
268
	def channel(self):
269
		return RedisPrefix.generate_user(self.user_id)
270
271
	def subscribe_direct_channel_message(self, room_id, other_user_id):
272
		return {
273
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
274
			VarNames.ROOM_ID: room_id,
275
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
276
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS
277
		}
278
279
	def subscribe_room_channel_message(self, room_id, room_name):
280
		return {
281
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
282
			VarNames.ROOM_ID: room_id,
283
			VarNames.ROOM_USERS: [self.user_id],
284
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
285
			VarNames.ROOM_NAME: room_name
286
		}
287
288
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
289
		return {
290
			VarNames.EVENT: Actions.INVITE_USER,
291
			VarNames.ROOM_ID: room_id,
292
			VarNames.USER_ID: user_id,
293
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
294
			VarNames.ROOM_NAME: room_name,
295
			VarNames.CONTENT: users
296
		}
297
298
	def add_user_to_room(self, channel, user_id, content):
299
		return {
300
			VarNames.EVENT: Actions.ADD_USER,
301
			VarNames.CHANNEL: channel,
302
			VarNames.USER_ID: user_id,
303
			VarNames.HANDLER_NAME: HandlerNames.CHAT,
304
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
305
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
306
		}
307
308
	def unsubscribe_direct_message(self, room_id):
309
		return {
310
			VarNames.EVENT: Actions.DELETE_ROOM,
311
			VarNames.ROOM_ID: room_id,
312
			VarNames.USER_ID: self.user_id,
313
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
314
			VarNames.TIME: get_milliseconds()
315
		}
316
317
	def load_offline_message(self, offline_messages, channel_key):
318
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
319
		res[VarNames.CHANNEL] = channel_key
320
		return res
321
322
323
class MessagesHandler(MessagesCreator):
324
325
	def __init__(self, *args, **kwargs):
326
		self.closed_channels = None
327
		self.parsable_prefix = 'p'
328
		super(MessagesHandler, self).__init__(*args, **kwargs)
329
		self.webrtc_ids = {}
330
		self.ip = None
331
		from chat import global_redis
332
		self.async_redis_publisher = global_redis.async_redis_publisher
333
		self.sync_redis = global_redis.sync_redis
334
		self.channels = []
335
		self._logger = None
336
		self.async_redis = tornadoredis.Client(port=TORNADO_REDIS_PORT)
337
		self.patch_tornadoredis()
338
		self.pre_process_message = {
339
			Actions.GET_MESSAGES: self.process_get_messages,
340
			Actions.SEND_MESSAGE: self.process_send_message,
341
			Actions.WEBRTC: self.proxy_webrtc,
342
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
343
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
344
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
345
			Actions.ACCEPT_CALL: self.accept_call,
346
			Actions.ACCEPT_FILE: self.accept_file,
347
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
348
			Actions.DELETE_ROOM: self.delete_channel,
349
			Actions.EDIT_MESSAGE: self.edit_message,
350
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
351
			Actions.INVITE_USER: self.invite_user,
352
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
353
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
354
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
355
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
356
		}
357
		self.post_process_message = {
358
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
359
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
360
			Actions.DELETE_ROOM: self.send_client_delete_channel,
361
			Actions.INVITE_USER: self.send_client_new_channel,
362
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
363
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
364
		}
365
366
	def patch_tornadoredis(self):  # TODO remove this
367
		fabric = type(self.async_redis.connection.readline)
368
		self.async_redis.connection.old_read = self.async_redis.connection.readline
369
		def new_read(new_self, callback=None):
370
			try:
371
				return new_self.old_read(callback=callback)
372
			except Exception as e:
373
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
374
				self.logger.error(e)
375
				self.logger.error(
376
					"Exception info: "
377
					"self.id: %s ;;; "
378
					"self.connected = '%s';;; "
379
					"Redis default channel online = '%s';;; "
380
					"self.channels = '%s';;; "
381
					"self.closed_channels  = '%s';;;",
382
					self.id, self.connected, current_online, self.channels, self.closed_channels
383
				)
384
				raise e
385
386
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
387
388
	@property
389
	def connected(self):
390
		raise NotImplemented
391
392
	@connected.setter
393
	def connected(self, value):
394
		raise NotImplemented
395
396
	@tornado.gen.engine
397
	def listen(self, channels):
398
		yield tornado.gen.Task(
399
			self.async_redis.subscribe, channels)
400
		self.async_redis.listen(self.pub_sub_message)
401
402
	@property
403
	def logger(self):
404
		return self._logger if self._logger else base_logger
405
406
	@tornado.gen.engine
407
	def add_channel(self, channel):
408
		self.channels.append(channel)
409
		yield tornado.gen.Task(
410
			self.async_redis.subscribe, (channel,))
411
412
	def evaluate(self, query_set):
413
		self.do_db(len, query_set)
414
		return query_set
415
416
	def do_db(self, callback, *args, **kwargs):
417
		try:
418
			return callback(*args, **kwargs)
419
		except (OperationalError, InterfaceError) as e:
420
			if 'MySQL server has gone away' in str(e):
421
				self.logger.warning('%s, reconnecting' % e)
422
				connection.close()
423
				return callback(*args, **kwargs)
424
			else:
425
				raise e
426
427
	def execute_query(self, query, *args, **kwargs):
428
		cursor = connection.cursor()
429
		cursor.execute(query, *args, **kwargs)
430
		desc = cursor.description
431
		return [
432
			dict(zip([col[0] for col in desc], row))
433
			for row in cursor.fetchall()
434
			]
435
436
	def get_online_from_redis(self, channel, check_self_online=False):
437
		"""
438
		:rtype : dict
439
		returns (dict, bool) if check_type is present
440
		"""
441
		online = self.sync_redis.smembers(channel)
442
		self.logger.debug('!! channel %s redis online: %s', channel, online)
443
		result = set()
444
		user_is_online = False
445
		# redis stores8 REDIS_USER_FORMAT, so parse them
446
		if online:
447
			for raw in online:  # py2 iteritems
448
				decoded = raw.decode('utf-8')
449
				# : char specified in cookies_middleware.py.create_id
450
				user_id = int(decoded.split(':')[0])
451
				if user_id == self.user_id and decoded != self.id:
452
					user_is_online = True
453
				result.add(user_id)
454
		result = list(result)
455
		return (result, user_is_online) if check_self_online else result
456
457
	def add_online_user(self, room_id, offline_messages=None):
458
		"""
459
		adds to redis
460
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
461
		:return:
462
		"""
463
		self.async_redis_publisher.sadd(room_id, self.id)
464
		# since we add user to online first, latest trigger will always show correct online
465
		online, is_online = self.get_online_from_redis(room_id, True)
466
		if not is_online:  # if a new tab has been opened
467
			online.append(self.user_id)
468
			online_user_names_mes = self.room_online(
469
				online,
470
				Actions.LOGIN,
471
				room_id
472
			)
473
			self.logger.info('!! First tab, sending refresh online for all')
474
			self.publish(online_user_names_mes, room_id)
475
			if offline_messages:
476
				self.ws_write(self.load_offline_message(offline_messages, room_id))
477
		else:  # Send user names to self
478
			online_user_names_mes = self.room_online(
479
				online,
480
				Actions.REFRESH_USER,
481
				room_id
482
			)
483
			self.logger.info('!! Second tab, retrieving online for self')
484
			self.ws_write(online_user_names_mes)
485
486
	def publish(self, message, channel, parsable=False):
487
		jsoned_mess = json.dumps(message)
488
		self.logger.debug('<%s> %s', channel, jsoned_mess)
489
		if parsable:
490
			jsoned_mess = self.encode(jsoned_mess)
491
		self.async_redis_publisher.publish(channel, jsoned_mess)
492
493
	def encode(self, message):
494
		"""
495
		Marks message with prefix to specify that
496
		it should be decoded and proccesed before sending to client
497
		@param message: message to mark
498
		@return: marked message
499
		"""
500
		return self.parsable_prefix + message
501
502
	def remove_parsable_prefix(self, message):
503
		if message.startswith(self.parsable_prefix):
504
			return message[1:]
505
506
	def pub_sub_message(self, message):
507
		data = message.body
508
		if isinstance(data, str_type):  # subscribe event
509
			prefixless_str = self.remove_parsable_prefix(data)
510
			if prefixless_str:
511
				dict_message = json.loads(prefixless_str)
512
				res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
513
				if not res:
514
					self.ws_write(prefixless_str)
515
			else:
516
				self.ws_write(data)
517
518
	def ws_write(self, message):
519
		raise NotImplementedError('WebSocketHandler implements')
520
521
	def process_send_message(self, message):
522
		"""
523
		:type message: dict
524
		"""
525
		raw_imgs = message.get(VarNames.IMG)
526
		channel = message[VarNames.CHANNEL]
527
		message_db = Message(
528
			sender_id=self.user_id,
529
			content=message[VarNames.CONTENT],
530
			symbol=get_max_key(raw_imgs)
531
		)
532
		message_db.room_id = channel
533
		self.do_db(message_db.save)
534
		db_images = self.save_images(raw_imgs, message_db.id)
535
		prepared_message = self.create_send_message(
536 View Code Duplication
			message_db,
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
537
			Actions.PRINT_MESSAGE,
538
			self.prepare_img(db_images, message_db.id)
539
		)
540
		self.publish(prepared_message, channel)
541
542
	def close_file_connection(self, in_message):
543
		connection_id = in_message[VarNames.CONNECTION_ID]
544
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
545
		if not self_channel_status:
546
			raise Exception("Access Denied")
547
		if self_channel_status != WebRtcRedisStates.CLOSED:
548
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
549
			if sender_id == self.id:
550
				self.close_file_sender(connection_id)
551
			else:
552
				self.close_file_receiver(connection_id, in_message, sender_id)
553
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
554
555
	def close_call_connection(self, in_message):
556 View Code Duplication
		connection_id = in_message[VarNames.CONNECTION_ID]
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
557
		conn_users = self.sync_redis.shgetall(connection_id)
558
		if conn_users[self.id] in [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED]:
559
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
560
			del conn_users[self.id]
561
			message = {
562
				VarNames.EVENT: Actions.CLOSE_CALL_CONNECTION,
563
				VarNames.CONNECTION_ID: connection_id,
564
				VarNames.USER_ID: self.user_id,
565
				VarNames.WEBRTC_OPPONENT_ID: self.id,
566
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
567
			}
568
			for user in conn_users:
569
				if conn_users[user] != WebRtcRedisStates.CLOSED:
570
					self.publish(message, user)
571
		else:
572
			raise ValidationError("Invalid channel status.")
573
574
	def cancel_call_connection(self, in_message, reply_action):
575
		self.send_call_answer(in_message, WebRtcRedisStates.CLOSED, Actions.CANCEL_CALL_CONNECTION)
576
577
	def close_file_receiver(self, connection_id, in_message, sender_id):
578
		sender_status = self.sync_redis.shget(connection_id, sender_id)
579
		if not sender_status:
580
			raise Exception("Access denied")
581
		if sender_status != WebRtcRedisStates.CLOSED:
582
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
583
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
584
			self.publish(in_message, sender_id)
585
586
	def close_file_sender(self, connection_id):
587
		values = self.sync_redis.shgetall(connection_id)
588
		del values[self.id]
589
		for ws_id in values:
590
			if values[ws_id] == WebRtcRedisStates.CLOSED:
591
				continue
592
			self.publish({
593
				VarNames.EVENT: Actions.CLOSE_FILE_CONNECTION,
594
				VarNames.CONNECTION_ID: connection_id,
595
				VarNames.WEBRTC_OPPONENT_ID: self.id,
596
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
597
			}, ws_id)
598
599
	def accept_file(self, in_message):
600
		connection_id = in_message[VarNames.CONNECTION_ID] # TODO accept all if call
601
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
602
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
603
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
604
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.RESPONDED:
605
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
606
			self.publish({
607
				VarNames.EVENT: Actions.ACCEPT_FILE,
608
				VarNames.CONNECTION_ID: connection_id,
609
				VarNames.WEBRTC_OPPONENT_ID: self.id,
610
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
611
			}, sender_ws_id)
612
		else:
613
			raise ValidationError("Invalid channel status")
614
615
	# todo
616
	# we can use channel_status = self.sync_redis.shgetall(connection_id)
617
	# and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
618
	# if we shgetall and only then do async hset
619
	# we can catch an issue when 2 concurrent users accepted the call
620
	# but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
621
	def accept_call(self, in_message):
622
		connection_id = in_message[VarNames.CONNECTION_ID]
623
		self_status = self.sync_redis.shget(connection_id, self.id)
624
		if self_status == WebRtcRedisStates.RESPONDED:
625 View Code Duplication
			self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.READY)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
626
			channel_status = self.sync_redis.shgetall(connection_id)
627
			del channel_status[self.id]
628
			message = {
629
				VarNames.EVENT: Actions.ACCEPT_CALL,
630
				VarNames.USER_ID: self.user_id,
631
				VarNames.CONNECTION_ID: connection_id,
632
				VarNames.WEBRTC_OPPONENT_ID: self.id,
633
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
634
			}
635
			for key in channel_status:
636
				if channel_status[key] != WebRtcRedisStates.CLOSED:
637
					self.publish(message, key)
638
		else:
639
			raise ValidationError("Invalid channel status")
640
641
	def offer_webrtc_connection(self, in_message):
642
		room_id = in_message[VarNames.CHANNEL]
643
		content = in_message.get(VarNames.CONTENT)
644
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
645 View Code Duplication
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
646
		# use list because sets dont have 1st element which is offerer
647
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
648
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
649
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
650
		self_message = self.set_connection_id(qued_id, connection_id)
651
		self.ws_write(self_message)
652
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
653
		self.publish(opponents_message, room_id, True)
654
655
	def reply_call_connection(self, in_message):
656
		self.send_call_answer(in_message, WebRtcRedisStates.RESPONDED, Actions.REPLY_CALL_CONNECTION)
657
658
	def send_call_answer(self, in_message, status_set, reply_action):
659
		connection_id = in_message[VarNames.CONNECTION_ID]
660
		conn_users = self.sync_redis.shgetall(connection_id)
661
		if conn_users[self.id] == WebRtcRedisStates.OFFERED:
662
			self.async_redis_publisher.hset(connection_id, self.id, status_set)
663
			del conn_users[self.id]
664
			message = self.reply_webrtc(reply_action, connection_id)
665
			for user in conn_users:
666
				if conn_users[user] != WebRtcRedisStates.CLOSED:
667
					self.publish(message, user)
668
		else:
669
			raise ValidationError("Invalid channel status.")
670
671
	def reply_file_connection(self, in_message):
672
		connection_id = in_message[VarNames.CONNECTION_ID]
673
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
674
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
675
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
676
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
677
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
678
			self.publish(self.reply_webrtc(Actions.REPLY_FILE_CONNECTION, connection_id), sender_ws_id)
679
		else:
680
			raise ValidationError("Invalid channel status.")
681
682
	def proxy_webrtc(self, in_message):
683
		"""
684
		:type in_message: dict
685
		"""
686
		connection_id = in_message[VarNames.CONNECTION_ID]
687
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
688
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
689
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
690
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
691
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
692
				self_channel_status, opponent_channel_status
693
			)) # todo receiver should only accept proxy_webrtc from sender, sender can accept all
694
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
695
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
696
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
697
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
698
		self.logger.debug("Forwarding message to channel %s, self %s, other status %s",
699
			channel, self_channel_status, opponent_channel_status
700
		)
701
		self.publish(in_message, channel)
702
703
	def create_new_room(self, message):
704
		room_name = message[VarNames.ROOM_NAME]
705
		if not room_name or len(room_name) > 16:
706
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
707
		room = Room(name=room_name)
708
		self.do_db(room.save)
709
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
710
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
711
		self.publish(subscribe_message, self.channel, True)
712
713
	def invite_user(self, message):
714
		room_id = message[VarNames.ROOM_ID]
715
		user_id = message[VarNames.USER_ID]
716
		if room_id not in self.channels:
717
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
718
		room = self.do_db(Room.objects.get, id=room_id)
719
		if room.is_private:
720
			raise ValidationError("You can't add users to direct room, create a new room instead")
721
		try:
722
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
723
		except IntegrityError:
724
			raise ValidationError("User is already in channel")
725
		users_in_room = {}
726
		for user in room.users.all():
727
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
728
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
729
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
730
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
731
732
	def create_room(self, user_rooms, user_id):
733
		if self.user_id == user_id:
734
			room_ids = list([room['room_id'] for room in self.evaluate(user_rooms)])
735
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
736
		else:
737
			rooms_query = RoomUsers.objects.filter(user_id=user_id, room__in=user_rooms)
738
			query_res = rooms_query.values('room__id', 'room__disabled')
739
		try:
740
			room = self.do_db(query_res.get)
741
			room_id = room['room__id']
742
			self.update_room(room_id, room['room__disabled'])
743
		except RoomUsers.DoesNotExist:
744
			room = Room()
745
			room.save()
746
			room_id = room.id
747
			if self.user_id == user_id:
748
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
749
			else:
750
				RoomUsers.objects.bulk_create([
751
					RoomUsers(user_id=user_id, room_id=room_id),
752
					RoomUsers(user_id=self.user_id, room_id=room_id),
753
				])
754
		return room_id
755
756
	def update_room(self, room_id, disabled):
757
		if not disabled:
758
			raise ValidationError('This room already exist')
759
		else:
760
			Room.objects.filter(id=room_id).update(disabled=False)
761
762
	def create_user_channel(self, message):
763
		user_id = message[VarNames.USER_ID]
764
		# get all self private rooms ids
765
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
766
		# get private room that contains another user from rooms above
767
		room_id = self.create_room(user_rooms, user_id)
768
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
769
		self.publish(subscribe_message, self.channel, True)
770
		other_channel = RedisPrefix.generate_user(user_id)
771
		if self.channel != other_channel:
772
			self.publish(subscribe_message, other_channel, True)
773
774
	def delete_channel(self, message):
775
		room_id = message[VarNames.ROOM_ID]
776
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
777
			raise ValidationError('You are not allowed to exit this room')
778
		room = self.do_db(Room.objects.get, id=room_id)
779
		if room.disabled:
780
			raise ValidationError('Room is already deleted')
781
		if room.name is None:  # if private then disable
782
			room.disabled = True
783
		else: # if public -> leave the room, delete the link
784
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
785
			online = self.get_online_from_redis(room_id)
786
			online.remove(self.user_id)
787
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
788
		room.save()
789
		message = self.unsubscribe_direct_message(room_id)
790
		self.publish(message, room_id, True)
791
792
	def edit_message(self, data):
793
		# ord(next (iter (message['images'])))
794
		message_id = data[VarNames.MESSAGE_ID]
795
		message = Message.objects.get(id=message_id)
796
		if message.sender_id != self.user_id:
797
			raise ValidationError("You can only edit your messages")
798
		if message.time + 600000 < get_milliseconds():
799
			raise ValidationError("You can only edit messages that were send not more than 10 min ago")
800
		if message.deleted:
801
			raise ValidationError("Already deleted")
802
		message.content = data[VarNames.CONTENT]
803
		selector = Message.objects.filter(id=message_id)
804
		if message.content is None:
805
			prep_imgs = None
806
			selector.update(deleted=True)
807
			action = Actions.DELETE_MESSAGE
808
		else:
809
			images = data.get(VarNames.IMG)
810
			if images:
811
				if message.symbol:
812
					self.replace_symbols_if_needed(images, message)
813
				new_symbol = get_max_key(images)
814
				if message.symbol is None or new_symbol > message.symbol:
815
					message.symbol = new_symbol
816
			db_images = self.save_images(images, message.id)
817
			if message.symbol:  # fetch all, including that we just store
818
				db_images = Image.objects.filter(message_id=message.id)
819
			prep_imgs = self.prepare_img(db_images, message_id)
820
			action = Actions.EDIT_MESSAGE
821
			selector.update(content=message.content, symbol=message.symbol)
822
		self.publish(self.create_send_message(message, action, prep_imgs), message.room_id)
823
824
	def save_images(self, images, message_id):
825
		db_images = []
826
		if images:
827
			db_images = [Image(
828
				message_id=message_id,
829
				img=extract_photo(
830
					images[k][VarNames.IMG_B64],
831
					images[k][VarNames.IMG_FILE_NAME]
832
				),
833
				symbol=k) for k in images]
834
			Image.objects.bulk_create(db_images)
835
		return db_images
836
837
	def replace_symbols_if_needed(self, images, message):
838
		# if message was edited user wasn't notified about that and he edits message again
839
		# his symbol can go out of sync
840
		order = ord(message.symbol)
841
		new_dict = []
842
		for img in images:
843
			if img <= message.symbol:
844
				order += 1
845
				new_symb = chr(order)
846
				new_dict.append({
847
					'new': new_symb,
848
					'old': img,
849
					'value': images[img]
850
				})
851
				message.content = message.content.replace(img, new_symb)
852
		for d in new_dict:  # dictionary changed size during iteration
853
			del images[d['old']]
854
			images[d['new']] = d['value']
855
856
	def send_client_new_channel(self, message):
857
		room_id = message[VarNames.ROOM_ID]
858
		self.add_channel(room_id)
859
		self.add_online_user(room_id)
860
861
	def set_opponent_call_channel(self, message):
862
		connection_id = message[VarNames.CONNECTION_ID]
863
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
864
			return True
865
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
866
867
	def send_client_delete_channel(self, message):
868
		room_id = message[VarNames.ROOM_ID]
869
		self.async_redis.unsubscribe((room_id,))
870
		self.async_redis_publisher.hdel(room_id, self.id)
871
		self.channels.remove(room_id)
872
873
	def process_get_messages(self, data):
874
		"""
875
		:type data: dict
876
		"""
877
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
878
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
879
		room_id = data[VarNames.CHANNEL]
880
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
881
		if header_id is None:
882
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
883
		else:
884
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
885
		images = self.do_db(self.get_message_images, messages)
886
		response = self.get_messages(messages, room_id, images)
887
		self.ws_write(response)
888
889
	def get_message_images(self, messages):
890
		ids = [message.id for message in messages if message.symbol]
891
		if ids:
892
			images = Image.objects.filter(message_id__in=ids)
893
		else:
894
			images = []
895
		self.logger.info('!! Messages have %d images', len(images))
896
		return images
897
898
	def get_offline_messages(self):
899
		res = {}
900
		off_mess = Message.objects.filter(
901
			id__gt=F('room__roomusers__last_read_message_id'),
902
			deleted=False,
903
			room__roomusers__user_id=self.user_id
904
		)
905
		images = self.do_db(self.get_message_images, off_mess)
906
		for message in off_mess:
907
			prep_m = self.create_message(message, self.prepare_img(images, message.id))
908
			res.setdefault(message.room_id, []).append(prep_m)
909
		return res
910
911
	def get_users_in_current_user_rooms(self):
912
		"""
913
		{
914
			"ROOM_ID:1": {
915
				"name": "All",
916
				"users": {
917
					"USER_ID:admin": {
918
						"name": "USER_NAME:admin",
919
						"sex": "SEX:Secret"
920
					},
921
					"USER_ID_2": {
922
						"name": "USER_NAME:Mike",
923
						"sex": "Male"
924
					}
925
				},
926
				"isPrivate": true
927
			}
928
		}
929
		"""
930
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
931
		res = {room['id']: {
932
				VarNames.ROOM_NAME: room['name'],
933
				VarNames.ROOM_USERS: {}
934
			} for room in user_rooms}
935
		room_ids = (room_id for room_id in res)
936
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
937
		for user in rooms_users:
938
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
939
		return res
940
941
	def set_js_user_structure(self, user_dict, user_id, name, sex):
942
		user_dict[user_id] = {
943
			VarNames.USER: name,
944
			VarNames.GENDER: GENDERS[sex]
945
		}
946
947
	def save_ip(self):
948
		if (self.do_db(UserJoinedInfo.objects.filter(
949
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
950
			return
951
		ip_address = get_or_create_ip(self.ip, self.logger)
952
		UserJoinedInfo.objects.create(
953
			ip=ip_address,
954
			user_id=self.user_id
955
		)
956
957
	def publish_logout(self, channel, log_data):
958
		# seems like async solves problem with connection lost and wrong data status
959
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
960
		online, is_online = self.get_online_from_redis(channel, True)
961
		log_data[channel] = {'online': online, 'is_online': is_online}
962
		if not is_online:
963
			message = self.room_online(online, Actions.LOGOUT, channel)
964
			self.publish(message, channel)
965
			return True
966
967
968
class AntiSpam(object):
969
970
	def __init__(self):
971
		self.spammed = 0
972
		self.info = {}
973
974
	def check_spam(self, json_message):
975
		message_length = len(json_message)
976
		info_key = int(round(time.time() * 100))
977
		self.info[info_key] = message_length
978
		if message_length > MAX_MESSAGE_SIZE:
979
			self.spammed += 1
980
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
981
		self.check_timed_spam()
982
983
	def check_timed_spam(self):
984
		# TODO implement me
985
		pass
986
		# raise ValidationError("You're chatting too much, calm down a bit!")
987
988
989
class TornadoHandler(WebSocketHandler, MessagesHandler):
990
991
	def __init__(self, *args, **kwargs):
992
		super(TornadoHandler, self).__init__(*args, **kwargs)
993
		self.__connected__ = False
994
		self.anti_spam = AntiSpam()
995
996
	@property
997
	def connected(self):
998
		return self.__connected__
999
1000
	@connected.setter
1001
	def connected(self, value):
1002
		self.__connected__ = value
1003
1004
	def data_received(self, chunk):
1005
		pass
1006
1007
	def on_message(self, json_message):
1008
		try:
1009
			if not self.connected:
1010
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
1011
			if not json_message:
1012
				raise Exception('Skipping null message')
1013
			# self.anti_spam.check_spam(json_message)
1014
			self.logger.debug('<< %.1000s', json_message)
1015
			message = json.loads(json_message)
1016
			if message[VarNames.EVENT] not in self.pre_process_message:
1017
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
1018
			channel = message.get(VarNames.CHANNEL)
1019
			if channel and channel not in self.channels:
1020
				raise Exception('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
1021
			self.pre_process_message[message[VarNames.EVENT]](message)
1022
		except ValidationError as e:
1023
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
1024
			self.ws_write(error_message)
1025
1026
	def on_close(self):
1027
		if self.async_redis.subscribed:
1028
			self.logger.info("Close event, unsubscribing from %s", self.channels)
1029
			self.async_redis.unsubscribe(self.channels)
1030
		else:
1031
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
1032
		log_data = {}
1033
		gone_offline = False
1034
		for channel in self.channels:
1035
			if not isinstance(channel, Number):
1036
				continue
1037
			self.sync_redis.srem(channel, self.id)
1038
			if self.connected:
1039
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
1040
		if gone_offline:
1041
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
1042
			self.logger.info("Updated %s last read message", res)
1043
		self.disconnect(json.dumps(log_data))
1044
1045
	def disconnect(self, log_data, tries=0):
1046
		"""
1047
		Closes a connection if it's not in proggress, otherwice timeouts closing
1048
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
1049
		"""
1050
		self.connected = False
1051
		self.closed_channels = self.channels
1052
		self.channels = []
1053
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
1054
			self.logger.debug('Closing a connection timeouts')
1055
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
1056
		else:
1057
			self.logger.info("Close connection result: %s", log_data)
1058
			self.async_redis.disconnect()
1059
1060
	def generate_self_id(self):
1061
		"""
1062
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
1063
		So if ws loses a connection it still can reconnect with same id,
1064
		and TornadoHandler can restore webrtc_connections to previous state
1065
		"""
1066
		conn_arg = self.get_argument('id', None)
1067
		self.id, random = create_id(self.user_id, conn_arg)
1068
		if random != conn_arg:
1069
			self.ws_write(self.set_ws_id(random, self.id))
1070
1071
	def open(self):
1072
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
1073
		if sessionStore.exists(session_key):
1074
			self.ip = self.get_client_ip()
1075
			session = SessionStore(session_key)
1076
			self.user_id = int(session["_auth_user_id"])
1077
			self.generate_self_id()
1078
			log_params = {
1079
				'id': self.id,
1080
				'ip': self.ip
1081
			}
1082
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
1083
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
1084
			self.async_redis.connect()
1085
			user_db = self.do_db(User.objects.get, id=self.user_id)
1086
			self.sender_name = user_db.username
1087
			self.sex = user_db.sex_str
1088
			user_rooms = self.get_users_in_current_user_rooms()
1089
			self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
1090
			# get all missed messages
1091
			self.channels = []  # py2 doesn't support clear()
1092
			self.channels.append(self.channel)
1093
			self.channels.append(self.id)
1094
			for room_id in user_rooms:
1095
				self.channels.append(room_id)
1096
			self.listen(self.channels)
1097
			off_messages = self.get_offline_messages()
1098
			for room_id in user_rooms:
1099
				self.add_online_user(room_id, off_messages.get(room_id))
1100
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
1101
			self.connected = True
1102
			Thread(target=self.save_ip).start()
1103
		else:
1104
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
1105
			self.close(403, "Session key %s has been rejected" % session_key)
1106
1107
	def check_origin(self, origin):
1108
		"""
1109
		check whether browser set domain matches origin
1110
		"""
1111
		parsed_origin = urlparse(origin)
1112
		origin = parsed_origin.netloc
1113
		origin_domain = origin.split(':')[0].lower()
1114
		browser_set = self.request.headers.get("Host")
1115
		browser_domain = browser_set.split(':')[0]
1116
		return browser_domain == origin_domain
1117
1118
	def ws_write(self, message):
1119
		"""
1120
		Tries to send message, doesn't throw exception outside
1121
		:type self: MessagesHandler
1122
		"""
1123
		# self.logger.debug('<< THREAD %s >>', os.getppid())
1124
		try:
1125
			if isinstance(message, dict):
1126
				message = json.dumps(message)
1127
			if not isinstance(message, str_type):
1128
				raise ValueError('Wrong message type : %s' % str(message))
1129
			self.logger.debug(">> %.1000s", message)
1130
			self.write_message(message)
1131
		except tornado.websocket.WebSocketClosedError as e:
1132
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
1133
1134
	def get_client_ip(self):
1135
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
1136