Completed
Push — master ( afc7e8...b138db )
by Andrew
34s
created

MessagesHandler.reply_file_connection()   A

Complexity

Conditions 3

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 10
rs 9.4285
1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from numbers import Number
7
from threading import Thread
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado import ioloop
20
from tornado.websocket import WebSocketHandler
21
22
from chat.cookies_middleware import create_id
23
from chat.log_filters import id_generator
24
from chat.utils import extract_photo
25
from chat.utils import get_or_create_ip
26
27
try:  # py2
28
	from urlparse import urlparse
29
except ImportError:  # py3
30
	from urllib.parse import urlparse
31
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM, \
32
	TORNADO_REDIS_PORT, WEBRTC_CONNECTION
33
from chat.models import User, Message, Room, get_milliseconds, UserJoinedInfo, RoomUsers, Image
34
35
PY3 = sys.version > '3'
36
str_type = str if PY3 else basestring
37
38
39
sessionStore = SessionStore()
40
41
parent_logger = logging.getLogger(__name__)
42
base_logger = logging.LoggerAdapter(parent_logger, {
43
	'id': 0,
44
	'ip': '000.000.000.000'
45
})
46
47
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
48
#CONNECTION_POOL = tornadoredis.ConnectionPool(
49
#	max_connections=500,
50
#	wait_for_available=True)
51
52
53
class Actions(object):
54
	LOGIN = 'addOnlineUser'
55
	SET_WS_ID = 'setWsId'
56
	LOGOUT = 'removeOnlineUser'
57
	SEND_MESSAGE = 'sendMessage'
58
	PRINT_MESSAGE = 'printMessage'
59
	WEBRTC = 'sendRtcData'
60
	CLOSE_FILE_CONNECTION = 'destroyFileConnection'
61
	CLOSE_CALL_CONNECTION = 'destroyCallConnection'
62
	CANCEL_CALL_CONNECTION = 'cancelCallConnection'
63
	ACCEPT_CALL = 'acceptCall'
64
	ACCEPT_FILE = 'acceptFile'
65
	ROOMS = 'setRooms'
66
	REFRESH_USER = 'setOnlineUsers'
67
	GROWL_MESSAGE = 'growl'
68
	GET_MESSAGES = 'loadMessages'
69
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
70
	DELETE_ROOM = 'deleteRoom'
71
	EDIT_MESSAGE = 'editMessage'
72
	DELETE_MESSAGE = 'deleteMessage'
73
	CREATE_ROOM_CHANNEL = 'addRoom'
74
	INVITE_USER = 'inviteUser'
75
	ADD_USER = 'addUserToDom'
76
	OFFLINE_MESSAGES = 'loadOfflineMessages'
77
	SET_WEBRTC_ID = 'setConnectionId'
78
	SET_WEBRTC_ERROR = 'setError'
79
	OFFER_FILE_CONNECTION = 'offerFile'
80
	OFFER_CALL_CONNECTION = 'offerCall'
81
	REPLY_FILE_CONNECTION = 'replyFile'
82
	REPLY_CALL_CONNECTION = 'replyCall'
83
84
85
class VarNames(object):
86
	WEBRTC_QUED_ID = 'id'
87
	USER = 'user'
88
	USER_ID = 'userId'
89
	TIME = 'time'
90
	CONTENT = 'content'
91
	IMG = 'images'
92
	IMG_B64 = 'b64'
93
	IMG_FILE_NAME = 'fileName'
94
	EVENT = 'action'
95
	MESSAGE_ID = 'id'
96
	GENDER = 'sex'
97
	ROOM_NAME = 'name'
98
	ROOM_ID = 'roomId'
99
	ROOM_USERS = 'users'
100
	CHANNEL = 'channel'
101
	WEBRTC_OPPONENT_ID = 'opponentWsId'
102
	GET_MESSAGES_COUNT = 'count'
103
	GET_MESSAGES_HEADER_ID = 'headerId'
104
	CHANNEL_NAME = 'channel'
105
	IS_ROOM_PRIVATE = 'private'
106
	CONNECTION_ID = 'connId'
107
	HANDLER_NAME = 'handler'
108
109
110
class HandlerNames:
111
	CHANNELS = 'channels'
112
	CHAT = 'chat'
113
	GROWL = 'growl'
114
	WEBRTC = 'webrtc'
115
	PEER_CONNECTION = 'peerConnection'
116
	WEBRTC_TRANSFER = 'webrtcTransfer'
117
	WS = 'ws'
118
119
120
class WebRtcRedisStates:
121
	RESPONDED = 'responded'
122
	READY = 'ready'
123
	OFFERED = 'offered'
124
	CLOSED = 'closed'
125
126
127
class RedisPrefix:
128
	USER_ID_CHANNEL_PREFIX = 'u'
129
	DEFAULT_CHANNEL = ALL_ROOM_ID
130
	CONNECTION_ID_LENGTH = 8  # should be secure
131
132
	@classmethod
133
	def generate_user(cls, key):
134
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
135
136
137
class MessagesCreator(object):
138
139
	def __init__(self, *args, **kwargs):
140
		self.sex = None
141
		self.sender_name = None
142
		self.id = None  # child init
143
		self.user_id = 0  # anonymous by default
144
145
	def default(self, content, event, handler):
146
		"""
147
		:return: {"action": event, "content": content, "time": "20:48:57"}
148
		"""
149
		return {
150
			VarNames.EVENT: event,
151
			VarNames.CONTENT: content,
152
			VarNames.USER_ID: self.user_id,
153
			VarNames.TIME: get_milliseconds(),
154
			VarNames.HANDLER_NAME: handler
155
		}
156
157
	def reply_webrtc(self, event, connection_id):
158
		"""
159
		:return: {"action": event, "content": content, "time": "20:48:57"}
160
		"""
161
		return {
162
			VarNames.EVENT: event,
163
			VarNames.CONNECTION_ID: connection_id,
164
			VarNames.USER_ID: self.user_id,
165
			VarNames.USER: self.sender_name,
166
			VarNames.WEBRTC_OPPONENT_ID: self.id,
167
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
168
		}
169
170
	def set_ws_id(self, random, self_id):
171
		return {
172
			VarNames.HANDLER_NAME: HandlerNames.WS,
173
			VarNames.EVENT: Actions.SET_WS_ID,
174
			VarNames.CONTENT: random,
175
			VarNames.WEBRTC_OPPONENT_ID: self_id
176
		}
177
178
	def room_online(self, online, event, channel):
179
		"""
180
		:return: {"action": event, "content": content, "time": "20:48:57"}
181
		"""
182
		room_less = self.default(online, event, HandlerNames.CHAT)
183
		room_less[VarNames.CHANNEL_NAME] = channel
184
		room_less[VarNames.USER] = self.sender_name
185
		room_less[VarNames.GENDER] = self.sex
186
		return room_less
187
188
	def offer_webrtc(self, content, connection_id, room_id, action):
189
		"""
190
		:return: {"action": "call", "content": content, "time": "20:48:57"}
191
		"""
192
		message = self.default(content, action, HandlerNames.WEBRTC)
193
		message[VarNames.USER] = self.sender_name
194
		message[VarNames.CONNECTION_ID] = connection_id
195
		message[VarNames.WEBRTC_OPPONENT_ID] = self.id
196
		message[VarNames.CHANNEL] = room_id
197
		return message
198
199
	def set_connection_id(self, qued_id, connection_id):
200
		return {
201
			VarNames.EVENT: Actions.SET_WEBRTC_ID,
202
			VarNames.HANDLER_NAME: HandlerNames.WEBRTC,
203
			VarNames.CONNECTION_ID: connection_id,
204
			VarNames.WEBRTC_QUED_ID: qued_id
205
		}
206
207
	def set_webrtc_error(self, error, connection_id, qued_id=None):
208
		message = self.default(error, Actions.SET_WEBRTC_ERROR, HandlerNames.PEER_CONNECTION) # TODO file/call
209
		message[VarNames.CONNECTION_ID] = connection_id
210
		if qued_id:
211
			message[VarNames.WEBRTC_QUED_ID] = qued_id
212
		return message
213
214
	@classmethod
215
	def create_message(cls, message, images):
216
		res = {
217
			VarNames.USER_ID: message.sender_id,
218
			VarNames.CONTENT: message.content,
219
			VarNames.TIME: message.time,
220
			VarNames.MESSAGE_ID: message.id,
221
			VarNames.IMG: images
222
		}
223
		return res
224
225
	@classmethod
226
	def create_send_message(cls, message, event, imgs):
227
		"""
228
		:param message:
229
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
230
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
231
		"""
232
		res = cls.create_message(message, imgs)
233
		res[VarNames.EVENT] = event
234
		res[VarNames.CHANNEL] = message.room_id
235
		res[VarNames.HANDLER_NAME] = HandlerNames.CHAT
236
		return res
237
238
	@classmethod
239
	def append_images(cls, messages, images):
240
		res_mess = []
241
		for message in messages:
242
			res_images = cls.prepare_img(images, message.id)
243
			res_mess.append(cls.create_message(message, res_images))
244
		return res_mess
245
246
	@classmethod
247
	def prepare_img(cls, images, message_id):
248
		if images:
249
			return {x.symbol: x.img.url for x in images if x.message_id == message_id}
250
251
	@classmethod
252
	def get_messages(cls, messages, channel, images):
253
		"""
254
		:type messages: list[Messages]
255
		:type channel: str
256
		:type messages: QuerySet[Messages]
257
		"""
258
		return {
259
			VarNames.CONTENT: cls.append_images(messages, images),
260
			VarNames.EVENT: Actions.GET_MESSAGES,
261
			VarNames.CHANNEL: channel,
262
			VarNames.HANDLER_NAME: HandlerNames.CHAT
263
		}
264
265
	@property
266
	def channel(self):
267
		return RedisPrefix.generate_user(self.user_id)
268
269
	def subscribe_direct_channel_message(self, room_id, other_user_id):
270
		return {
271
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
272
			VarNames.ROOM_ID: room_id,
273
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
274
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS
275
		}
276
277
	def subscribe_room_channel_message(self, room_id, room_name):
278
		return {
279
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
280
			VarNames.ROOM_ID: room_id,
281
			VarNames.ROOM_USERS: [self.user_id],
282
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
283
			VarNames.ROOM_NAME: room_name
284
		}
285
286
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
287
		return {
288
			VarNames.EVENT: Actions.INVITE_USER,
289
			VarNames.ROOM_ID: room_id,
290
			VarNames.USER_ID: user_id,
291
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
292
			VarNames.ROOM_NAME: room_name,
293
			VarNames.CONTENT: users
294
		}
295
296
	def add_user_to_room(self, channel, user_id, content):
297
		return {
298
			VarNames.EVENT: Actions.ADD_USER,
299
			VarNames.CHANNEL: channel,
300
			VarNames.USER_ID: user_id,
301
			VarNames.HANDLER_NAME: HandlerNames.CHAT,
302
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
303
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
304
		}
305
306
	def unsubscribe_direct_message(self, room_id):
307
		return {
308
			VarNames.EVENT: Actions.DELETE_ROOM,
309
			VarNames.ROOM_ID: room_id,
310
			VarNames.USER_ID: self.user_id,
311
			VarNames.HANDLER_NAME: HandlerNames.CHANNELS,
312
			VarNames.TIME: get_milliseconds()
313
		}
314
315
	def load_offline_message(self, offline_messages, channel_key):
316
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
317
		res[VarNames.CHANNEL] = channel_key
318
		return res
319
320
321
class MessagesHandler(MessagesCreator):
322
323
	def __init__(self, *args, **kwargs):
324
		self.closed_channels = None
325
		self.parsable_prefix = 'p'
326
		super(MessagesHandler, self).__init__(*args, **kwargs)
327
		self.webrtc_ids = {}
328
		self.ip = None
329
		from chat import global_redis
330
		self.async_redis_publisher = global_redis.async_redis_publisher
331
		self.sync_redis = global_redis.sync_redis
332
		self.channels = []
333
		self._logger = None
334
		self.async_redis = tornadoredis.Client(port=TORNADO_REDIS_PORT)
335
		self.patch_tornadoredis()
336
		self.pre_process_message = {
337
			Actions.GET_MESSAGES: self.process_get_messages,
338
			Actions.SEND_MESSAGE: self.process_send_message,
339
			Actions.WEBRTC: self.proxy_webrtc,
340
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
341
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
342
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
343
			Actions.ACCEPT_CALL: self.accept_call,
344
			Actions.ACCEPT_FILE: self.accept_file,
345
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
346
			Actions.DELETE_ROOM: self.delete_channel,
347
			Actions.EDIT_MESSAGE: self.edit_message,
348
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
349
			Actions.INVITE_USER: self.invite_user,
350
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
351
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
352
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
353
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
354
		}
355
		self.post_process_message = {
356
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
357
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
358
			Actions.DELETE_ROOM: self.send_client_delete_channel,
359
			Actions.INVITE_USER: self.send_client_new_channel,
360
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
361
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
362
		}
363
364
	def patch_tornadoredis(self):  # TODO remove this
365
		fabric = type(self.async_redis.connection.readline)
366
		self.async_redis.connection.old_read = self.async_redis.connection.readline
367
		def new_read(new_self, callback=None):
368
			try:
369
				return new_self.old_read(callback=callback)
370
			except Exception as e:
371
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
372
				self.logger.error(e)
373
				self.logger.error(
374
					"Exception info: "
375
					"self.id: %s ;;; "
376
					"self.connected = '%s';;; "
377
					"Redis default channel online = '%s';;; "
378
					"self.channels = '%s';;; "
379
					"self.closed_channels  = '%s';;;",
380
					self.id, self.connected, current_online, self.channels, self.closed_channels
381
				)
382
				raise e
383
384
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
385
386
	@property
387
	def connected(self):
388
		raise NotImplemented
389
390
	@connected.setter
391
	def connected(self, value):
392
		raise NotImplemented
393
394
	@tornado.gen.engine
395
	def listen(self, channels):
396
		yield tornado.gen.Task(
397
			self.async_redis.subscribe, channels)
398
		self.async_redis.listen(self.pub_sub_message)
399
400
	@property
401
	def logger(self):
402
		return self._logger if self._logger else base_logger
403
404
	@tornado.gen.engine
405
	def add_channel(self, channel):
406
		self.channels.append(channel)
407
		yield tornado.gen.Task(
408
			self.async_redis.subscribe, (channel,))
409
410
	def evaluate(self, query_set):
411
		self.do_db(len, query_set)
412
		return query_set
413
414
	def do_db(self, callback, *args, **kwargs):
415
		try:
416
			return callback(*args, **kwargs)
417
		except (OperationalError, InterfaceError) as e:
418
			if 'MySQL server has gone away' in str(e):
419
				self.logger.warning('%s, reconnecting' % e)
420
				connection.close()
421
				return callback(*args, **kwargs)
422
			else:
423
				raise e
424
425
	def execute_query(self, query, *args, **kwargs):
426
		cursor = connection.cursor()
427
		cursor.execute(query, *args, **kwargs)
428
		desc = cursor.description
429
		return [
430
			dict(zip([col[0] for col in desc], row))
431
			for row in cursor.fetchall()
432
			]
433
434
	def get_online_from_redis(self, channel, check_self_online=False):
435
		"""
436
		:rtype : dict
437
		returns (dict, bool) if check_type is present
438
		"""
439
		online = self.sync_redis.smembers(channel)
440
		self.logger.debug('!! channel %s redis online: %s', channel, online)
441
		result = set()
442
		user_is_online = False
443
		# redis stores8 REDIS_USER_FORMAT, so parse them
444
		if online:
445
			for raw in online:  # py2 iteritems
446
				decoded = raw.decode('utf-8')
447
				# : char specified in cookies_middleware.py.create_id
448
				user_id = int(decoded.split(':')[0])
449
				if user_id == self.user_id and decoded != self.id:
450
					user_is_online = True
451
				result.add(user_id)
452
		result = list(result)
453
		return (result, user_is_online) if check_self_online else result
454
455
	def add_online_user(self, room_id, offline_messages=None):
456
		"""
457
		adds to redis
458
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
459
		:return:
460
		"""
461
		self.async_redis_publisher.sadd(room_id, self.id)
462
		# since we add user to online first, latest trigger will always show correct online
463
		online, is_online = self.get_online_from_redis(room_id, True)
464
		if not is_online:  # if a new tab has been opened
465
			online.append(self.user_id)
466
			online_user_names_mes = self.room_online(
467
				online,
468
				Actions.LOGIN,
469
				room_id
470
			)
471
			self.logger.info('!! First tab, sending refresh online for all')
472
			self.publish(online_user_names_mes, room_id)
473
			if offline_messages:
474
				self.ws_write(self.load_offline_message(offline_messages, room_id))
475
		else:  # Send user names to self
476
			online_user_names_mes = self.room_online(
477
				online,
478
				Actions.REFRESH_USER,
479
				room_id
480
			)
481
			self.logger.info('!! Second tab, retrieving online for self')
482
			self.ws_write(online_user_names_mes)
483
484
	def publish(self, message, channel, parsable=False):
485
		jsoned_mess = json.dumps(message)
486
		self.logger.debug('<%s> %s', channel, jsoned_mess)
487
		if parsable:
488
			jsoned_mess = self.encode(jsoned_mess)
489
		self.async_redis_publisher.publish(channel, jsoned_mess)
490
491
	def encode(self, message):
492
		"""
493
		Marks message with prefix to specify that
494
		it should be decoded and proccesed before sending to client
495
		@param message: message to mark
496
		@return: marked message
497
		"""
498
		return self.parsable_prefix + message
499
500
	def remove_parsable_prefix(self, message):
501
		if message.startswith(self.parsable_prefix):
502
			return message[1:]
503
504
	def pub_sub_message(self, message):
505
		data = message.body
506
		if isinstance(data, str_type):  # subscribe event
507
			prefixless_str = self.remove_parsable_prefix(data)
508
			if prefixless_str:
509
				dict_message = json.loads(prefixless_str)
510
				res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
511
				if not res:
512
					self.ws_write(prefixless_str)
513
			else:
514
				self.ws_write(data)
515
516
	def ws_write(self, message):
517
		raise NotImplementedError('WebSocketHandler implements')
518
519
	def process_send_message(self, message):
520
		"""
521
		:type message: dict
522
		"""
523
		channel = message[VarNames.CHANNEL]
524
		message_db = Message(
525
			sender_id=self.user_id,
526
			content=message[VarNames.CONTENT]
527
		)
528
		message_db.room_id = channel
529
		self.do_db(message_db.save)
530
		res_imgs = self.parse_imgs(message.get(VarNames.IMG), message_db.id)
531
		prepared_message = self.create_send_message(message_db, Actions.PRINT_MESSAGE, self.prepare_img(res_imgs, message_db.id))
532
		self.publish(prepared_message, channel)
533
534
	def parse_imgs(self, imgs, mess_id):
535
		res_imgs = []
536 View Code Duplication
		if imgs:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
537
			fetch = False
538
			for k in imgs:
539
				b64 =imgs[k].get(VarNames.IMG_B64)
540
				if b64:
541
					img = extract_photo(imgs[k][VarNames.IMG_B64], imgs[k][VarNames.IMG_FILE_NAME])
542
					res_imgs.append(Image(message_id=mess_id, img=img, symbol=k))
543
				else:
544
					fetch = True
545
			fetched_messages = None
546
			if fetch:
547
				fetched_messages = Image.objects.filter(message_id=mess_id)
548
				len(fetched_messages)  # fetch
549
			if res_imgs:
550
				Image.objects.bulk_create(res_imgs)
551
			if fetched_messages:
552
				for m in fetched_messages:
553
					res_imgs.append(m)
554
		return res_imgs
555
556 View Code Duplication
	def close_file_connection(self, in_message):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
557
		connection_id = in_message[VarNames.CONNECTION_ID]
558
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
559
		if not self_channel_status:
560
			raise Exception("Access Denied")
561
		if self_channel_status != WebRtcRedisStates.CLOSED:
562
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
563
			if sender_id == self.id:
564
				self.close_sender(connection_id)
565
			else:
566
				self.close_receiver(connection_id, in_message, sender_id)
567
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
568
569
	def close_call_connection(self, in_message):
570
		connection_id = in_message[VarNames.CONNECTION_ID]
571
		conn_users = self.sync_redis.shgetall(connection_id)
572
		if conn_users[self.id] in [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED]:
573
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
574
			del conn_users[self.id]
575
			message = {
576
				VarNames.EVENT: Actions.CLOSE_CALL_CONNECTION,
577
				VarNames.CONNECTION_ID: connection_id,
578
				VarNames.USER_ID: self.user_id,
579
				VarNames.WEBRTC_OPPONENT_ID: self.id,
580
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
581
			}
582
			for user in conn_users:
583
				if conn_users[user] != WebRtcRedisStates.CLOSED:
584
					self.publish(message, user)
585
		else:
586
			raise ValidationError("Invalid channel status.")
587
588 View Code Duplication
	def cancel_call_connection(self, in_message):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
589
		connection_id = in_message[VarNames.CONNECTION_ID]
590
		conn_users = self.sync_redis.shgetall(connection_id)
591
		if conn_users[self.id] == WebRtcRedisStates.OFFERED:
592
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
593
			del conn_users[self.id]
594
			message = self.reply_webrtc(Actions.CANCEL_CALL_CONNECTION, connection_id)
595
			for user in conn_users:
596
				if conn_users[user] != WebRtcRedisStates.CLOSED:
597
					self.publish(message, user)
598
		else:
599
			raise ValidationError("Invalid channel status.")
600
601
	def close_receiver(self, connection_id, in_message, sender_id): # TODO for call we should close all
602
		sender_status = self.sync_redis.shget(connection_id, sender_id)
603
		if not sender_status:
604
			raise Exception("Access denied")
605
		if sender_status != WebRtcRedisStates.CLOSED:
606
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
607
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
608
			self.publish(in_message, sender_id)
609
610
	def close_sender(self, connection_id):
611
		values = self.sync_redis.shgetall(connection_id)
612
		del values[self.id]
613
		for ws_id in values:
614
			if values[ws_id] == WebRtcRedisStates.CLOSED:
615
				continue
616
			self.publish({
617
				VarNames.EVENT: Actions.CLOSE_FILE_CONNECTION,
618
				VarNames.CONNECTION_ID: connection_id,
619
				VarNames.WEBRTC_OPPONENT_ID: self.id,
620
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
621
			}, ws_id)
622
623
	def accept_file(self, in_message):
624
		connection_id = in_message[VarNames.CONNECTION_ID] # TODO accept all if call
625 View Code Duplication
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
626
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
627
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
628
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.RESPONDED:
629
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
630
			self.publish({
631
				VarNames.EVENT: Actions.ACCEPT_FILE,
632
				VarNames.CONNECTION_ID: connection_id,
633
				VarNames.WEBRTC_OPPONENT_ID: self.id,
634
				VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION,
635
			}, sender_ws_id)
636
		else:
637
			raise ValidationError("Invalid channel status")
638
639
	# todo if we shgetall and only then do async hset
640
	# todo we can catch an issue when 2 concurrent users accepted the call
641
	# todo but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
642
	# def accept_call(self, in_message):
643
	# 	connection_id = in_message[VarNames.CONNECTION_ID]
644
	# 	channel_status = self.sync_redis.shgetall(connection_id)
645 View Code Duplication
	# 	if channel_status and channel_status[self.id] == WebRtcRedisStates.RESPONDED:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
646
	# 		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
647
	# 		for key in channel_status:  # del channel_status[self.id] not needed as self in responded
648
	# 			if channel_status[key] == WebRtcRedisStates.READY:
649
	# 				self.publish({
650
	# 					VarNames.EVENT: Actions.ACCEPT_CALL,
651
	# 					VarNames.CONNECTION_ID: connection_id,
652
	# 					VarNames.WEBRTC_OPPONENT_ID: self.id,
653
	# 					VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
654
	# 				}, key)
655
	# 	else:
656
	# 		raise ValidationError("Invalid channel status")
657
658
	def accept_call(self, in_message):
659
		connection_id = in_message[VarNames.CONNECTION_ID]
660
		self_status = self.sync_redis.shget(connection_id, self.id)
661
		if self_status == WebRtcRedisStates.RESPONDED:
662
			self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.READY)
663
			channel_status = self.sync_redis.shgetall(connection_id)
664
			del channel_status[self.id]
665
			message = {
666
				VarNames.EVENT: Actions.ACCEPT_CALL,
667
				VarNames.USER_ID: self.user_id,
668
				VarNames.CONNECTION_ID: connection_id,
669
				VarNames.WEBRTC_OPPONENT_ID: self.id,
670
				VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER,
671
			}
672
			for key in channel_status:
673
				if channel_status[key] != WebRtcRedisStates.CLOSED:
674
					self.publish(message, key)
675
		else:
676
			raise ValidationError("Invalid channel status")
677
678
	def offer_webrtc_connection(self, in_message):
679
		room_id = in_message[VarNames.CHANNEL]
680
		content = in_message.get(VarNames.CONTENT)
681
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
682
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
683
		# use list because sets dont have 1st element which is offerer
684
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
685
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
686
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
687
		self_message = self.set_connection_id(qued_id, connection_id)
688
		self.ws_write(self_message)
689
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
690
		self.publish(opponents_message, room_id, True)
691
692 View Code Duplication
	def reply_call_connection(self, in_message):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
693
		connection_id = in_message[VarNames.CONNECTION_ID]
694
		conn_users = self.sync_redis.shgetall(connection_id)
695
		if conn_users[self.id] == WebRtcRedisStates.OFFERED:
696
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
697
			del conn_users[self.id]
698
			message = self.reply_webrtc(Actions.REPLY_CALL_CONNECTION, connection_id)
699
			for user in conn_users:
700
				if conn_users[user] != WebRtcRedisStates.CLOSED:
701
					self.publish(message, user)
702
		else:
703
			raise ValidationError("Invalid channel status.")
704
705
	def reply_file_connection(self, in_message):
706
		connection_id = in_message[VarNames.CONNECTION_ID]
707
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
708
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
709
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
710
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
711
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
712
			self.publish(self.reply_webrtc(Actions.REPLY_FILE_CONNECTION, connection_id), sender_ws_id)
713
		else:
714
			raise ValidationError("Invalid channel status.")
715
716
	def proxy_webrtc(self, in_message):
717
		"""
718
		:type in_message: dict
719
		"""
720
		connection_id = in_message[VarNames.CONNECTION_ID]
721
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
722
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
723
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
724
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
725
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
726
				self_channel_status, opponent_channel_status
727
			)) # todo receiver should only accept proxy_webrtc from sender, sender can accept all
728
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
729
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
730
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
731
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
732
		self.logger.debug("Forwarding message to channel %s, self %s, other status %s",
733
			channel, self_channel_status, opponent_channel_status
734
		)
735
		self.publish(in_message, channel)
736
737
	def create_new_room(self, message):
738
		room_name = message[VarNames.ROOM_NAME]
739
		if not room_name or len(room_name) > 16:
740
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
741
		room = Room(name=room_name)
742
		self.do_db(room.save)
743
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
744
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
745
		self.publish(subscribe_message, self.channel, True)
746
747
	def invite_user(self, message):
748
		room_id = message[VarNames.ROOM_ID]
749
		user_id = message[VarNames.USER_ID]
750
		if room_id not in self.channels:
751
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
752
		room = self.do_db(Room.objects.get, id=room_id)
753
		if room.is_private:
754
			raise ValidationError("You can't add users to direct room, create a new room instead")
755
		try:
756
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
757
		except IntegrityError:
758
			raise ValidationError("User is already in channel")
759
		users_in_room = {}
760
		for user in room.users.all():
761
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
762
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
763
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
764
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
765
766
	def create_room(self, user_rooms, user_id):
767
		if self.user_id == user_id:
768
			room_ids = list([room['room_id'] for room in self.evaluate(user_rooms)])
769
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
770
		else:
771
			rooms_query = RoomUsers.objects.filter(user_id=user_id, room__in=user_rooms)
772
			query_res = rooms_query.values('room__id', 'room__disabled')
773
		try:
774
			room = self.do_db(query_res.get)
775
			room_id = room['room__id']
776
			self.update_room(room_id, room['room__disabled'])
777
		except RoomUsers.DoesNotExist:
778
			room = Room()
779
			room.save()
780
			room_id = room.id
781
			if self.user_id == user_id:
782
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
783
			else:
784
				RoomUsers.objects.bulk_create([
785
					RoomUsers(user_id=user_id, room_id=room_id),
786
					RoomUsers(user_id=self.user_id, room_id=room_id),
787
				])
788
		return room_id
789
790
	def update_room(self, room_id, disabled):
791
		if not disabled:
792
			raise ValidationError('This room already exist')
793
		else:
794
			Room.objects.filter(id=room_id).update(disabled=False)
795
796
	def create_user_channel(self, message):
797
		user_id = message[VarNames.USER_ID]
798
		# get all self private rooms ids
799
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
800
		# get private room that contains another user from rooms above
801
		room_id = self.create_room(user_rooms, user_id)
802
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
803
		self.publish(subscribe_message, self.channel, True)
804
		other_channel = RedisPrefix.generate_user(user_id)
805
		if self.channel != other_channel:
806
			self.publish(subscribe_message, other_channel, True)
807
808
	def delete_channel(self, message):
809
		room_id = message[VarNames.ROOM_ID]
810
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
811
			raise ValidationError('You are not allowed to exit this room')
812
		room = self.do_db(Room.objects.get, id=room_id)
813
		if room.disabled:
814
			raise ValidationError('Room is already deleted')
815
		if room.name is None:  # if private then disable
816
			room.disabled = True
817
		else: # if public -> leave the room, delete the link
818
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
819
			online = self.get_online_from_redis(room_id)
820
			online.remove(self.user_id)
821
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
822
		room.save()
823
		message = self.unsubscribe_direct_message(room_id)
824
		self.publish(message, room_id, True)
825
826
	def edit_message(self, data):
827
		message_id = data[VarNames.MESSAGE_ID]
828
		message = Message.objects.get(id=message_id)
829
		if message.sender_id != self.user_id:
830
			raise ValidationError("You can only edit your messages")
831
		if message.time + 600000 < get_milliseconds():
832
			raise ValidationError("You can only edit messages that were send not more than 10 min ago")
833
		if message.deleted:
834
			raise ValidationError("Already deleted")
835
		message.content = data[VarNames.CONTENT]
836
		selector = Message.objects.filter(id=message_id)
837
		if message.content is None:
838
			imgs = None
839
			selector.update(deleted=True)
840
			action = Actions.DELETE_MESSAGE
841
		else:
842
			imgs = self.parse_imgs(data.get(VarNames.IMG), message.id)
843
			prep_imgs = self.prepare_img(imgs, message_id)
844
			action = Actions.EDIT_MESSAGE
845
			selector.update(content=message.content)
846
		self.publish(self.create_send_message(message, action, prep_imgs), message.room_id)
847
848
	def send_client_new_channel(self, message):
849
		room_id = message[VarNames.ROOM_ID]
850
		self.add_channel(room_id)
851
		self.add_online_user(room_id)
852
853
	def set_opponent_call_channel(self, message):
854
		connection_id = message[VarNames.CONNECTION_ID]
855
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
856
			return True
857
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
858
859
	def send_client_delete_channel(self, message):
860
		room_id = message[VarNames.ROOM_ID]
861
		self.async_redis.unsubscribe((room_id,))
862
		self.async_redis_publisher.hdel(room_id, self.id)
863
		self.channels.remove(room_id)
864
865
	def process_get_messages(self, data):
866
		"""
867
		:type data: dict
868
		"""
869
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
870
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
871
		room_id = data[VarNames.CHANNEL]
872
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
873
		if header_id is None:
874
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
875
		else:
876
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
877
		images = self.do_db(self.get_message_images, messages)
878
		response = self.get_messages(messages, room_id, images)
879
		self.ws_write(response)
880
881
	def get_message_images(self, messages):
882
		ids =[message.id for message in messages]
883
		images = Image.objects.filter(message_id__in=ids)
884
		self.logger.info('!! Messages have %d images', len(images))
885
		return images
886
887
	def get_offline_messages(self):
888
		res = {}
889
		off_mess = Message.objects.filter(
890
			id__gt=F('room__roomusers__last_read_message_id'),
891
			deleted=False,
892
			room__roomusers__user_id=self.user_id
893
		)
894
		images = self.do_db(self.get_message_images, off_mess)
895
		for message in off_mess:
896
			prep_m = self.create_message(message, self.prepare_img(images, message.id))
897
			res.setdefault(message.room_id, []).append(prep_m)
898
		return res
899
900
	def get_users_in_current_user_rooms(self):
901
		"""
902
		{
903
			"ROOM_ID:1": {
904
				"name": "All",
905
				"users": {
906
					"USER_ID:admin": {
907
						"name": "USER_NAME:admin",
908
						"sex": "SEX:Secret"
909
					},
910
					"USER_ID_2": {
911
						"name": "USER_NAME:Mike",
912
						"sex": "Male"
913
					}
914
				},
915
				"isPrivate": true
916
			}
917
		}
918
		"""
919
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
920
		res = {room['id']: {
921
				VarNames.ROOM_NAME: room['name'],
922
				VarNames.ROOM_USERS: {}
923
			} for room in user_rooms}
924
		room_ids = (room_id for room_id in res)
925
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
926
		for user in rooms_users:
927
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
928
		return res
929
930
	def set_js_user_structure(self, user_dict, user_id, name, sex):
931
		user_dict[user_id] = {
932
			VarNames.USER: name,
933
			VarNames.GENDER: GENDERS[sex]
934
		}
935
936
	def save_ip(self):
937
		if (self.do_db(UserJoinedInfo.objects.filter(
938
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
939
			return
940
		ip_address = get_or_create_ip(self.ip, self.logger)
941
		UserJoinedInfo.objects.create(
942
			ip=ip_address,
943
			user_id=self.user_id
944
		)
945
946
	def publish_logout(self, channel, log_data):
947
		# seems like async solves problem with connection lost and wrong data status
948
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
949
		online, is_online = self.get_online_from_redis(channel, True)
950
		log_data[channel] = {'online': online, 'is_online': is_online}
951
		if not is_online:
952
			message = self.room_online(online, Actions.LOGOUT, channel)
953
			self.publish(message, channel)
954
			return True
955
956
957
class AntiSpam(object):
958
959
	def __init__(self):
960
		self.spammed = 0
961
		self.info = {}
962
963
	def check_spam(self, json_message):
964
		message_length = len(json_message)
965
		info_key = int(round(time.time() * 100))
966
		self.info[info_key] = message_length
967
		if message_length > MAX_MESSAGE_SIZE:
968
			self.spammed += 1
969
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
970
		self.check_timed_spam()
971
972
	def check_timed_spam(self):
973
		# TODO implement me
974
		pass
975
		# raise ValidationError("You're chatting too much, calm down a bit!")
976
977
978
class TornadoHandler(WebSocketHandler, MessagesHandler):
979
980
	def __init__(self, *args, **kwargs):
981
		super(TornadoHandler, self).__init__(*args, **kwargs)
982
		self.__connected__ = False
983
		self.anti_spam = AntiSpam()
984
985
	@property
986
	def connected(self):
987
		return self.__connected__
988
989
	@connected.setter
990
	def connected(self, value):
991
		self.__connected__ = value
992
993
	def data_received(self, chunk):
994
		pass
995
996
	def on_message(self, json_message):
997
		try:
998
			if not self.connected:
999
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
1000
			if not json_message:
1001
				raise Exception('Skipping null message')
1002
			# self.anti_spam.check_spam(json_message)
1003
			self.logger.debug('<< %.1000s', json_message)
1004
			message = json.loads(json_message)
1005
			if message[VarNames.EVENT] not in self.pre_process_message:
1006
				raise Exception("event {} is unknown".format(message[VarNames.EVENT]))
1007
			channel = message.get(VarNames.CHANNEL)
1008
			if channel and channel not in self.channels:
1009
				raise Exception('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
1010
			self.pre_process_message[message[VarNames.EVENT]](message)
1011
		except ValidationError as e:
1012
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
1013
			self.ws_write(error_message)
1014
1015
	def on_close(self):
1016
		if self.async_redis.subscribed:
1017
			self.logger.info("Close event, unsubscribing from %s", self.channels)
1018
			self.async_redis.unsubscribe(self.channels)
1019
		else:
1020
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
1021
		log_data = {}
1022
		gone_offline = False
1023
		for channel in self.channels:
1024
			if not isinstance(channel, Number):
1025
				continue
1026
			self.sync_redis.srem(channel, self.id)
1027
			if self.connected:
1028
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
1029
		if gone_offline:
1030
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
1031
			self.logger.info("Updated %s last read message", res)
1032
		self.disconnect(json.dumps(log_data))
1033
1034
	def disconnect(self, log_data, tries=0):
1035
		"""
1036
		Closes a connection if it's not in proggress, otherwice timeouts closing
1037
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
1038
		"""
1039
		self.connected = False
1040
		self.closed_channels = self.channels
1041
		self.channels = []
1042
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
1043
			self.logger.debug('Closing a connection timeouts')
1044
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
1045
		else:
1046
			self.logger.info("Close connection result: %s", log_data)
1047
			self.async_redis.disconnect()
1048
1049
	def generate_self_id(self):
1050
		"""
1051
		When user opens new tab in browser wsHandler.wsConnectionId stores Id of current ws
1052
		So if ws loses a connection it still can reconnect with same id,
1053
		and TornadoHandler can restore webrtc_connections to previous state
1054
		"""
1055
		conn_arg = self.get_argument('id', None)
1056
		self.id, random = create_id(self.user_id, conn_arg)
1057
		if random != conn_arg:
1058
			self.ws_write(self.set_ws_id(random, self.id))
1059
1060
	def open(self):
1061
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
1062
		if sessionStore.exists(session_key):
1063
			self.ip = self.get_client_ip()
1064
			session = SessionStore(session_key)
1065
			self.user_id = int(session["_auth_user_id"])
1066
			self.generate_self_id()
1067
			log_params = {
1068
				'id': self.id,
1069
				'ip': self.ip
1070
			}
1071
			self._logger = logging.LoggerAdapter(parent_logger, log_params)
1072
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
1073
			self.async_redis.connect()
1074
			user_db = self.do_db(User.objects.get, id=self.user_id)
1075
			self.sender_name = user_db.username
1076
			self.sex = user_db.sex_str
1077
			user_rooms = self.get_users_in_current_user_rooms()
1078
			self.ws_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
1079
			# get all missed messages
1080
			self.channels = []  # py2 doesn't support clear()
1081
			self.channels.append(self.channel)
1082
			self.channels.append(self.id)
1083
			for room_id in user_rooms:
1084
				self.channels.append(room_id)
1085
			self.listen(self.channels)
1086
			off_messages = self.get_offline_messages()
1087
			for room_id in user_rooms:
1088
				self.add_online_user(room_id, off_messages.get(room_id))
1089
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
1090
			self.connected = True
1091
			Thread(target=self.save_ip).start()
1092
		else:
1093
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
1094
			self.close(403, "Session key %s has been rejected" % session_key)
1095
1096
	def check_origin(self, origin):
1097
		"""
1098
		check whether browser set domain matches origin
1099
		"""
1100
		parsed_origin = urlparse(origin)
1101
		origin = parsed_origin.netloc
1102
		origin_domain = origin.split(':')[0].lower()
1103
		browser_set = self.request.headers.get("Host")
1104
		browser_domain = browser_set.split(':')[0]
1105
		return browser_domain == origin_domain
1106
1107
	def ws_write(self, message):
1108
		"""
1109
		Tries to send message, doesn't throw exception outside
1110
		:type self: MessagesHandler
1111
		"""
1112
		# self.logger.debug('<< THREAD %s >>', os.getppid())
1113
		try:
1114
			if isinstance(message, dict):
1115
				message = json.dumps(message)
1116
			if not isinstance(message, str_type):
1117
				raise ValueError('Wrong message type : %s' % str(message))
1118
			self.logger.debug(">> %.1000s", message)
1119
			self.write_message(message)
1120
		except tornado.websocket.WebSocketClosedError as e:
1121
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
1122
1123
	def get_client_ip(self):
1124
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
1125