Completed
Push — master ( 23aeee...23429a )
by Andrew
01:03
created

MessagesCreator.offer_call()   A

Complexity

Conditions 1

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 10
rs 9.4285
1
import json
2
import logging
3
import sys
4
from threading import Thread
5
from  urllib.request import urlopen
6
7
import redis
8
import time
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.ioloop
12
import tornado.web
13
import tornado.websocket
14
import tornadoredis
15
from django.conf import settings
16
from django.core.exceptions import ValidationError
17
from django.db import connection, OperationalError, InterfaceError
18
from django.db.models import Q
19
from redis_sessions.session import SessionStore
20
from tornado.websocket import WebSocketHandler
21
import os
22
23
from chat.log_filters import id_generator
24
25
try:
26
	from urllib.parse import urlparse  # py2
27
except ImportError:
28
	from urlparse import urlparse  # py3
29
30
from chat.settings import MAX_MESSAGE_SIZE, ANONYMOUS_REDIS_ROOM
31
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo
32
from chat.utils import check_user
33
34
PY3 = sys.version > '3'
35
36
user_cookie_name = settings.USER_COOKIE_NAME
37
api_url = getattr(settings, "IP_API_URL", None)
38
39
ANONYMOUS_GENDER = 'Alien'
40
SESSION_USER_VAR_KEY = 'user_name'
41
42
MESSAGE_ID_VAR_NAME = 'id'
43
RECEIVER_USERNAME_VAR_NAME = 'receiverName'
44
RECEIVER_USERID_VAR_NAME = 'receiverId'
45
CALL_TYPE_VAR_NAME = 'type'
46
COUNT_VAR_NAME = 'count'
47
HEADER_ID_VAR_NAME = 'headerId'
48
USER_VAR_NAME = 'user'
49
USER_ID_VAR_NAME = 'userId'
50
TIME_VAR_NAME = 'time'
51
OLD_NAME_VAR_NAME = 'oldName'
52
IS_ANONYMOUS_VAR_NAME = 'anonymous'
53
CONTENT_VAR_NAME = 'content'
54
EVENT_VAR_NAME = 'action'
55
GENDER_VAR_NAME = 'sex'
56
57
REFRESH_USER_EVENT = 'onlineUsers'
58
SYSTEM_MESSAGE_EVENT = 'system'
59
GROWL_MESSAGE_EVENT = 'growl'
60
GET_MESSAGES_EVENT = 'messages'
61
GET_MINE_USERNAME_EVENT = 'me'
62
ROOMS_EVENT = 'rooms'  # thread ex "main" , channel ex. 'r:main', "i:3"
63
LOGIN_EVENT = 'joined'
64
LOGOUT_EVENT = 'left'
65
SEND_MESSAGE_EVENT = 'send'
66
WEBRTC_EVENT = 'webrtc'
67
CALL_EVENT = 'call'
68
CHANGE_ANONYMOUS_NAME_EVENT = 'changed'
69
70
REDIS_USERNAME_CHANNEL_PREFIX = 'u:%s'
71
REDIS_USERID_CHANNEL_PREFIX = 'i:%s'
72
REDIS_ROOM_CHANNEL_PREFIX = 'r:%d'
73
REDIS_ONLINE_USERS = "online_users"
74
75
76
try:
77
	anonymous_default_room = Room.objects.get(name=ANONYMOUS_REDIS_ROOM)
78
except Room.DoesNotExist:
79
	anonymous_default_room = Room()
80
	anonymous_default_room.name = ANONYMOUS_REDIS_ROOM
81
	anonymous_default_room.save()
82
83
ANONYMOUS_REDIS_CHANNEL = REDIS_ROOM_CHANNEL_PREFIX % anonymous_default_room.id
84
ANONYMOUS_ROOM_NAMES = {anonymous_default_room.id: anonymous_default_room.name}
85
86
sessionStore = SessionStore()
87
88
logger = logging.getLogger(__name__)
89
90
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
91
CONNECTION_POOL = tornadoredis.ConnectionPool(
92
	max_connections=500,
93
	wait_for_available=True)
94
95
96
class MessagesCreator(object):
97
98
	def __init__(self, *args, **kwargs):
99
		super(MessagesCreator, self).__init__(*args, **kwargs)
100
		self.sex = ANONYMOUS_GENDER
101
		self.sender_name = None
102
		self.user_id = 0  # anonymous by default
103
104
	def online_user_names(self, user_names_dict, action):
105
		"""
106
		:type user_names_dict: dict
107
		:return: { Nick: male, NewName: alien, Joana: female}
108
		"""
109
		default_message = self.default(user_names_dict, action)
110
		default_message.update({
111
			USER_VAR_NAME: self.sender_name,
112
			IS_ANONYMOUS_VAR_NAME: self.sex == ANONYMOUS_GENDER
113
		})
114
		return default_message
115
116
	def change_user_nickname(self, old_nickname, online):
117
		"""
118
		set self.sender_name to new nickname before call it
119
		:return: {action : changed, content: { Nick: male, NewName: alien}, oldName : OldName, user: NewName}
120
		:type old_nickname: str
121
		:type online: dict
122
		"""
123
		default_message = self.online_user_names(online, CHANGE_ANONYMOUS_NAME_EVENT)
124
		default_message[OLD_NAME_VAR_NAME] = old_nickname
125
		return default_message
126
127
	@classmethod
128
	def default(cls, content, event=SYSTEM_MESSAGE_EVENT):
129
		"""
130
		:return: {"action": event, "content": content, "time": "20:48:57"}
131
		"""
132
		return {
133
			EVENT_VAR_NAME: event,
134
			CONTENT_VAR_NAME: content,
135
			TIME_VAR_NAME: get_milliseconds()
136
		}
137
138
	def offer_call(self, content, type):
139
		"""
140
		:return: {"action": "call", "content": content, "time": "20:48:57"}
141
		"""
142
		return {
143
			EVENT_VAR_NAME: CALL_EVENT,
144
			USER_VAR_NAME: self.sender_name,
145
			USER_ID_VAR_NAME: self.user_id,
146
			CONTENT_VAR_NAME: content,
147
			CALL_TYPE_VAR_NAME: type
148
		}
149
150
	@classmethod
151
	def create_send_message(cls, message):
152
		"""
153
		:type message: Message
154
		"""
155
		result = cls.get_message(message)
156
		result[EVENT_VAR_NAME] = SEND_MESSAGE_EVENT
157
		return result
158
159
	@classmethod
160
	def get_message(cls, message):
161
		"""
162
		:param message:
163
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
164
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
165
		"""
166
		result = {
167
			USER_VAR_NAME: message.sender.username,
168
			USER_ID_VAR_NAME: message.sender.id,
169
			CONTENT_VAR_NAME: message.content,
170
			TIME_VAR_NAME: message.time,
171
			MESSAGE_ID_VAR_NAME: message.id,
172
		}
173
		if message.receiver is not None:
174
			result[RECEIVER_USERNAME_VAR_NAME] = message.receiver.username
175
		return result
176
177
	@classmethod
178
	def get_messages(cls, messages):
179
		"""
180
		:type messages: list[Messages]
181
		:type messages: QuerySet[Messages]
182
		"""
183
		return {
184
			CONTENT_VAR_NAME: [cls.create_send_message(message) for message in messages],
185
			EVENT_VAR_NAME: GET_MESSAGES_EVENT
186
		}
187
188
	def send_anonymous(self, content, receiver_anonymous, receiver_id):
189
		default_message = self.default(content, SEND_MESSAGE_EVENT)
190
		default_message[USER_VAR_NAME] = self.sender_name
191
		if receiver_anonymous is not None:
192
			default_message[RECEIVER_USERNAME_VAR_NAME] = receiver_anonymous
193
			default_message[RECEIVER_USERID_VAR_NAME] = receiver_id
194
		return default_message
195
196
	@property
197
	def stored_redis_user(self):
198
		return '%s:%s:%d' % (self.sender_name, self.sex, self.user_id)
199
200
	@property
201
	def channel(self):
202
		if self.user_id == 0:
203
			return REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name
204
		else:
205
			return REDIS_USERID_CHANNEL_PREFIX % self.user_id
206
207
	@staticmethod
208
	def online_js_structure(name, sex, user_id):
209
		return {
210
			name: {
211
				GENDER_VAR_NAME: sex,
212
				USER_ID_VAR_NAME: user_id
213
			}
214
		}
215
216
	@property
217
	def online_self_js_structure(self):
218
		return self.online_js_structure(self.sender_name, self.sex, self.user_id)
219
220
221
class MessagesHandler(MessagesCreator):
222
223
	def __init__(self, *args, **kwargs):
224
		super(MessagesHandler, self).__init__(*args, **kwargs)
225
		self.log_id = str(id(self) % 10000).rjust(4, '0')
226
		self.ip = None
227
		log_params = {
228
			'username': '00000000',
229
			'id': self.log_id,
230
			'ip': 'initializing'
231
		}
232
		self.logger = logging.LoggerAdapter(logger, log_params)
233
		self.async_redis = tornadoredis.Client()
234
		self.process_message = {
235
			GET_MINE_USERNAME_EVENT: self.process_change_username,
236
			GET_MESSAGES_EVENT: self.process_get_messages,
237
			SEND_MESSAGE_EVENT: self.process_send_message,
238
			CALL_EVENT: self.process_call
239
		}
240
241
	def do_db(self, callback, *arg, **args):
242
		try:
243
			return callback(*arg, **args)
244
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
245
			self.logger.warning('%s, reconnecting' % e)  # TODO
246
			connection.close()
247
			return callback(*arg, **args)
248
249
	def get_online_from_redis(self, check_name=None, check_id=None):
250
		"""
251
		:rtype : dict
252
		returns (dict, bool) if check_type is present
253
		"""
254
		online = self.sync_redis.hgetall(REDIS_ONLINE_USERS)
255
		self.logger.debug('!! redis online: %s', online)
256
		result = {}
257
		user_is_online = False
258
		# redis stores REDIS_USER_FORMAT, so parse them
259
		if online:
260
			for key, raw_user_sex in online.items():  # py2 iteritems
261
				(name, sex, user_id) = raw_user_sex.decode('utf-8').split(':')
262
				if name == check_name and check_id != int(key.decode('utf-8')):
263
					user_is_online = True
264
				result.update(self.online_js_structure(name, sex, user_id))
265
		if check_id:
266
			return result, user_is_online
267
		else:
268
			return result
269
270
	def add_online_user(self):
271
		"""
272
		adds to redis
273
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
274
		:return:
275
		"""
276
		online = self.get_online_from_redis()
277
		self.async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user)
278
		if self.sender_name not in online:  # if a new tab has been opened
279
			online.update(self.online_self_js_structure)
280
			online_user_names_mes = self.online_user_names(online, LOGIN_EVENT)
281
			self.logger.info('!! First tab, sending refresh online for all')
282
			self.publish(online_user_names_mes)
283
		else:  # Send user names to self
284
			online_user_names_mes = self.online_user_names(online, REFRESH_USER_EVENT)
285
			self.logger.info('!! Second tab, retrieving online for self')
286
			self.safe_write(online_user_names_mes)
287
		# send usernamechat
288
		username_message = self.default(self.sender_name, GET_MINE_USERNAME_EVENT)
289
		self.safe_write(username_message)
290
291
	def set_username(self, session_key):
292
		"""
293
		Case registered: Fetch userName and its channels from database. returns them
294
		Case anonymous: generates a new name and saves it to session. returns default channel
295
		:return: channels user should subscribe
296
		"""
297
		session = SessionStore(session_key)
298
		try:
299
			self.user_id = int(session["_auth_user_id"])
300
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
301
			self.sender_name = user_db.username
302
			self.sex = user_db.sex_str
303
			rooms = user_db.rooms.all()  # do_db is used already
304
			room_names = {}
305
			channels = [self.channel, ]
306
			for room in rooms:
307
				room_names[room.id] = room.name
308
				channels.append(REDIS_ROOM_CHANNEL_PREFIX % room.id)
309
			rooms_message = self.default(room_names, ROOMS_EVENT)
310
			self.logger.info("!! User %s subscribes for %s", self.sender_name, room_names)
311
		except (KeyError, User.DoesNotExist):
312
			# Anonymous
313
			self.sender_name = session.get(SESSION_USER_VAR_KEY)
314
			if self.sender_name is None:
315
				self.sender_name = id_generator(8)
316
				session[SESSION_USER_VAR_KEY] = self.sender_name
317
				session.save()
318
				self.logger.info("!! A new user log in, created username %s", self.sender_name)
319
			else:
320
				self.logger.info("!! Anonymous with name %s has logged", self.sender_name)
321
			channels = [ANONYMOUS_REDIS_CHANNEL, self.channel]
322
			rooms_message = self.default(ANONYMOUS_ROOM_NAMES, ROOMS_EVENT)
323
		self.safe_write(rooms_message)
324
		return channels
325
326
	def publish(self, message, channel=ANONYMOUS_REDIS_CHANNEL):
327
		jsoned_mess = json.dumps(message)
328
		self.logger.debug('<%s> %s', channel, jsoned_mess)
329
		self.async_redis_publisher.publish(channel, jsoned_mess)
330
331
	# TODO really parse every single message for 1 action?
332
	def check_and_finish_change_name(self, message):
333
		if self.sex == ANONYMOUS_GENDER:
334
			parsed_message = json.loads(message)
335
			if parsed_message[EVENT_VAR_NAME] == CHANGE_ANONYMOUS_NAME_EVENT\
336
					and parsed_message[OLD_NAME_VAR_NAME] == self.sender_name:
337
				self.async_redis.unsubscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name)
338
				self.sender_name = parsed_message[USER_VAR_NAME]
339
				self.async_redis.subscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name)
340
				self.async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user)
341
342
	def new_message(self, message):
343
		if type(message.body) is not int:  # subscribe event
344
			self.safe_write(message.body)
345
			self.check_and_finish_change_name(message.body)
346
347
	def safe_write(self, message):
348
		raise NotImplementedError('WebSocketHandler implements')
349
350
	def process_send_message(self, message):
351
		"""
352
		:type message: dict
353
		"""
354
		content = message[CONTENT_VAR_NAME]
355
		receiver_id = message.get(RECEIVER_USERID_VAR_NAME)  # if receiver_id is None then its a private message
356
		receiver_name = message.get(RECEIVER_USERNAME_VAR_NAME)
357
		self.logger.info('!! Sending message %s to username:%s, id:%s', content, receiver_name, receiver_id)
358
		save_to_db = True
359
		receiver_channel = None  # public by default
360
		if receiver_id is not None and receiver_id != 0:
361
			receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id
362
		elif receiver_name is not None:
363
			receiver_channel = REDIS_USERNAME_CHANNEL_PREFIX % receiver_name
364
			save_to_db = False
365
		self.publish_message(content, receiver_channel, receiver_id, receiver_name, save_to_db)
366
367
	def process_call(self, message):
368
		"""
369
		:type message: dict
370
		"""
371
		# TODO refactor this, duplicate code with process_webrtc and send message
372
		receiver_id = message.get(RECEIVER_USERID_VAR_NAME)  # if receiver_id is None then its a private message
373
		receiver_name = message.get(RECEIVER_USERNAME_VAR_NAME)
374
		cal_type = message.get(CALL_TYPE_VAR_NAME)
375
		self.logger.info('!! Offering a call to username:%s, id:%s', receiver_name, receiver_id)
376
		receiver_channel = None  # public by default
377
		if receiver_id is not None and receiver_id != 0:
378
			receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id
379
		elif receiver_name is not None:
380
			receiver_channel = REDIS_USERNAME_CHANNEL_PREFIX % receiver_name
381
		self.publish(self.offer_call(message.get(CONTENT_VAR_NAME), message.get(CALL_TYPE_VAR_NAME)), receiver_channel)
382
383
	def publish_message(self, content, receiver_channel, receiver_id, receiver_name, save_to_db):
384
		if self.user_id != 0 and save_to_db:
385
			self.logger.debug('!! Saving it to db')
386
			message_db = Message(sender_id=self.user_id, content=content, receiver_id=receiver_id)
387
			self.do_db(message_db.save)  # exit on hacked id with exception
388
			prepared_message = self.create_send_message(message_db)
389
		else:
390
			self.logger.debug('!! NOT saving it')
391
			prepared_message = self.send_anonymous(content, receiver_name, receiver_id)
392
		if receiver_id is None:
393
			self.logger.debug('!! Detected as public')
394
			self.publish(prepared_message)
395
		else:
396
			self.publish(prepared_message, self.channel)
397
			self.logger.debug('!! Detected as private, channel %s', receiver_channel)
398
			if receiver_channel != self.channel:
399
				self.publish(prepared_message, receiver_channel)
400
401
	def process_change_username(self, message):
402
		"""
403
		:type message: dict
404
		"""
405
		self.logger.info('!! Changing username to %s', message[CONTENT_VAR_NAME])
406
		new_username = message[CONTENT_VAR_NAME]
407
		try:
408
			self.do_db(check_user, new_username)
409
			online = self.get_online_from_redis()
410
			if new_username in online:
411
				self.logger.info('!! This name is already used')
412
				raise ValidationError('This name is already used by another anonymous!')
413
			session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
414
			session = SessionStore(session_key)
415
			session[SESSION_USER_VAR_KEY] = new_username
416
			session.save()
417
			try:
418
				del online[self.sender_name]
419
			except KeyError:  # if two or more change_username events in fast time
420
				pass
421
			old_username = self.sender_name
422
			# temporary set username for creating messages with self.online_self_js_structure, change_user_nickname
423
			self.sender_name = new_username
424
			online.update(self.online_self_js_structure)
425
			message_all = self.change_user_nickname(old_username, online)
426
			self.sender_name = old_username  # see check_and_finish_change_name
427
			self.publish(message_all)
428
		except ValidationError as e:
429
			self.safe_write(self.default(str(e.message), event=GROWL_MESSAGE_EVENT))
430
431
	def process_get_messages(self, data):
432
		"""
433
		:type data: dict
434
		"""
435
		header_id = data.get(HEADER_ID_VAR_NAME, None)
436
		count = int(data.get(COUNT_VAR_NAME, 10))
437
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
438
		if header_id is None:
439
			messages = Message.objects.filter(
440
				# Only public or private or private
441
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
442
			).order_by('-pk')[:count]
443
		else:
444
			messages = Message.objects.filter(
445
				Q(id__lt=header_id),
446
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
447
			).order_by('-pk')[:count]
448
		response = self.do_db(self.get_messages, messages)
449
		self.safe_write(response)
450
451
	def save_ip(self):
452
		user_id = None if self.user_id == 0 else self.user_id
453
		anon_name = self.sender_name if self.user_id == 0 else None
454
		if (self.do_db(UserJoinedInfo.objects.filter(
455
				Q(ip__ip=self.ip) & Q(anon_name=anon_name) & Q(user_id=user_id)).exists)):
456
			return
457
		ip_address = self.get_or_create_ip()
458
		UserJoinedInfo.objects.create(
459
			ip=ip_address,
460
			user_id=user_id,
461
			anon_name=anon_name
462
		)
463
464
	def get_or_create_ip(self):
465
		try:
466
			ip_address = IpAddress.objects.get(ip=self.ip)
467
		except IpAddress.DoesNotExist:
468
			try:
469
				if not api_url:
470
					raise Exception('api url is absent')
471
				self.logger.debug("Creating ip record %s", self.ip)
472
				f = urlopen(api_url % self.ip)
473
				raw_response = f.read().decode("utf-8")
474
				response = json.loads(raw_response)
475
				if response['status'] != "success":
476
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
477
				ip_address = IpAddress.objects.create(
478
					ip=self.ip,
479
					isp=response['isp'],
480
					country=response['country'],
481
					region=response['regionName'],
482
					city=response['city']
483
				)
484
			except Exception as e:
485
				self.logger.error("Error while creating ip with country info, because %s", e)
486
				ip_address = IpAddress.objects.create(ip=self.ip)
487
		return ip_address
488
489
490
class AntiSpam(object):
491
492
	def __init__(self):
493
		self.spammed = 0
494
		self.info = {}
495
496
	def check_spam(self, json_message):
497
		message_length = len(json_message)
498
		info_key = int(round(time.time() * 100))
499
		self.info[info_key] = message_length
500
		if message_length > MAX_MESSAGE_SIZE:
501
			self.spammed += 1
502
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
503
		self.check_timed_spam()
504
505
	def check_timed_spam(self):
506
		# TODO implement me
507
		pass
508
		# raise ValidationError("You're chatting too much, calm down a bit!")
509
510
511
class TornadoHandler(WebSocketHandler, MessagesHandler):
512
513
	def __init__(self, *args, **kwargs):
514
		super(TornadoHandler, self).__init__(*args, **kwargs)
515
		self.connected = False
516
		self.anti_spam = AntiSpam()
517
		from chat import global_redis
518
		self.async_redis_publisher = global_redis.async_redis_publisher
519
		self.sync_redis = global_redis.sync_redis
520
521
	@tornado.gen.engine
522
	def listen(self, channels):
523
		"""
524
		self.channel should been set before calling
525
		"""
526
		yield tornado.gen.Task(
527
			self.async_redis.subscribe, channels)
528
		self.async_redis.listen(self.new_message)
529
530
	def data_received(self, chunk):
531
		pass
532
533
	def on_message(self, json_message):
534
		try:
535
			if not self.connected:
536
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
537
			if not json_message:
538
				raise ValidationError('Skipping null message')
539
			self.anti_spam.check_spam(json_message)
540
			self.logger.debug('<< %s', json_message)
541
			message = json.loads(json_message)
542
			self.process_message[message[EVENT_VAR_NAME]](message)
543
		except ValidationError as e:
544
			self.logger.warning("Message won't be send. Reason: %s", e.message)
545
			self.safe_write(self.default(str(e.message), event=GROWL_MESSAGE_EVENT))
546
547
	def on_close(self):
548
		try:
549
			self_id = id(self)
550
			self.async_redis_publisher.hdel(REDIS_ONLINE_USERS, self_id)
551
			if self.connected:
552
				# seems like async solves problem with connection lost and wrong data status
553
				# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
554
				online, is_online = self.get_online_from_redis(self.sender_name, self_id)
555
				self.logger.info('!! Closing connection, redis current online %s', online)
556
				if not is_online:
557
					message = self.online_user_names(online, LOGOUT_EVENT)
558
					self.logger.debug('!! User closed the last tab, refreshing online for all')
559
					self.publish(message)
560
				else:
561
					self.logger.debug('!! User is still online in other tabs')
562
			else:
563
				self.logger.warning('Dropping connection for not connected user')
564
		finally:
565
			if self.async_redis.subscribed:
566
				#  TODO unsubscribe of all subscribed                  !IMPORTANT
567
				self.async_redis.unsubscribe([
568
					ANONYMOUS_REDIS_CHANNEL,
569
					self.channel
570
				])
571
			self.async_redis.disconnect()
572
573
	def open(self):
574
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
575
		if sessionStore.exists(session_key):
576
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, id(self))
577
			self.async_redis.connect()
578
			channels = self.set_username(session_key)
579
			self.ip = self.get_client_ip()
580
			log_params = {
581
				'username': self.sender_name.rjust(8),
582
				'id': self.log_id,
583
				'ip': self.ip
584
			}
585
			self.logger = logging.LoggerAdapter(logger, log_params)
586
			self.listen(channels)
587
			self.add_online_user()
588
			self.connected = True
589
			Thread(target=self.save_ip).start()
590
		else:
591
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
592
			self.close(403, "Session key %s has been rejected" % session_key)
593
594
	def check_origin(self, origin):
595
		"""
596
		check whether browser set domain matches origin
597
		"""
598
		parsed_origin = urlparse(origin)
599
		origin = parsed_origin.netloc
600
		origin_domain = origin.split(':')[0].lower()
601
		browser_set = self.request.headers.get("Host")
602
		browser_domain = browser_set.split(':')[0]
603
		return browser_domain == origin_domain
604
605
	def safe_write(self, message):
606
		"""
607
		Tries to send message, doesn't throw exception outside
608
		:type self: MessagesHandler
609
		"""
610
		self.logger.debug('<< THREAD %s >>', os.getppid())
611
		try:
612
			if isinstance(message, dict):
613
				message = json.dumps(message)
614
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
615
				raise ValueError('Wrong message type : %s' % str(message))
616
			self.logger.debug(">> %s", message)
617
			self.write_message(message)
618
		except tornado.websocket.WebSocketClosedError as e:
619
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
620
621
	def get_client_ip(self):
622
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
623