Completed
Push — master ( b0aa07...452d95 )
by Andrew
04:18 queued 03:00
created

MessagesHandler.listen()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 5
rs 9.4285
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, USER_ROOMS_QUERY, GENDERS, GET_DIRECT_ROOM_ID
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo
30
31
PY3 = sys.version > '3'
32
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions:
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	GROWL_MESSAGE = 'growl'
54
	GET_MESSAGES = 'loadMessages'
55
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
56
	DELETE_ROOM = 'deleteRoom'
57
	CREATE_ROOM_CHANNEL = 'addRoom'
58
	INVITE_USER = 'inviteUser'
59
	ADD_USER = 'addUserToAll'
60
61
62
class VarNames:
63
	CALL_TYPE = 'type'
64
	USER = 'user'
65
	USER_ID =  'userId'
66
	TIME = 'time'
67
	CONTENT = 'content'
68
	IMG = 'image'
69
	EVENT = 'action'
70
	MESSAGE_ID = 'id'
71
	GENDER = 'sex'
72
	ROOM_NAME = 'name'
73
	ROOM_ID = 'roomId'
74
	ROOM_USERS = 'users'
75
	CHANNEL = 'channel'
76
	GET_MESSAGES_COUNT = 'count'
77
	GET_MESSAGES_HEADER_ID = 'headerId'
78
	CHANNEL_NAME = 'channel'
79
	IS_ROOM_PRIVATE = 'private'
80
	#ROOM_NAME = 'roomName'
81
	# ROOM_ID = 'roomId'
82
83
84
class CallType:
85
	OFFER = 'offer'
86
87
88
class HandlerNames:
89
	NAME = 'handler'
90
	CHANNELS = 'channels'
91
	CHAT = 'chat'
92
	GROWL = 'growl'
93
	WEBRTC = 'webrtc'
94
95
96
class RedisPrefix:
97
	USER_ID_CHANNEL_PREFIX = 'u'
98
	ROOM_CHANNEL_PREFIX = 'r'
99
	__ROOM_ONLINE__ = 'o:{}'
100
101
	@classmethod
102
	def generate_user(cls, key):
103
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
104
105
	@classmethod
106
	def generate_room(cls, key):
107
		return cls.ROOM_CHANNEL_PREFIX + str(key)
108
109
	@classmethod
110
	def extract_id(cls, channel):
111
		return int(channel[1:])
112
113
RedisPrefix.DEFAULT_CHANNEL = RedisPrefix.generate_room(ALL_ROOM_ID)
114
115
116
class MessagesCreator(object):
117
118
	def __init__(self, *args, **kwargs):
119
		super(MessagesCreator, self).__init__(*args, **kwargs)
120
		self.sex = None
121
		self.sender_name = None
122
		self.user_id = 0  # anonymous by default
123
124
	def default(self, content, event, handler):
125
		"""
126
		:return: {"action": event, "content": content, "time": "20:48:57"}
127
		"""
128
		return {
129
			VarNames.EVENT: event,
130
			VarNames.CONTENT: content,
131
			VarNames.USER_ID: self.user_id,
132
			VarNames.TIME: get_milliseconds(),
133
			HandlerNames.NAME: handler
134
		}
135
136
	def room_online(self, online, event, channel):
137
		"""
138
		:return: {"action": event, "content": content, "time": "20:48:57"}
139
		"""
140
		room_less = self.default(online, event, HandlerNames.CHAT)
141
		room_less[VarNames.CHANNEL_NAME] = channel
142
		room_less[VarNames.USER] = self.sender_name
143
		room_less[VarNames.GENDER] = self.sex
144
		return room_less
145
146
	def offer_call(self, content, message_type):
147
		"""
148
		:return: {"action": "call", "content": content, "time": "20:48:57"}
149
		"""
150
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
151
		message[VarNames.CALL_TYPE] = message_type
152
		message[VarNames.USER] = self.sender_name
153
		return message
154
155
	@classmethod
156
	def create_message(cls, message):
157
		res = {
158
			VarNames.USER_ID: message.sender_id,
159
			VarNames.CONTENT: message.content,
160
			VarNames.TIME: message.time,
161
			VarNames.MESSAGE_ID: message.id,
162
		}
163
		if message.img.name:
164
			res[VarNames.IMG] = message.img.url
165
		return res
166
167
	@classmethod
168
	def create_send_message(cls, message):
169
		"""
170
		:param message:
171
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
172
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
173
		"""
174
		channel = RedisPrefix.generate_room(message.room_id)
175
		res = cls.create_message(message)
176
		res[VarNames.EVENT] = Actions.PRINT_MESSAGE
177
		res[VarNames.CHANNEL] = channel
178
		res[HandlerNames.NAME] = HandlerNames.CHAT
179
		return res
180
181
	@classmethod
182
	def get_messages(cls, messages, channel):
183
		"""
184
		:type messages: list[Messages]
185
		:type channel: str
186
		:type messages: QuerySet[Messages]
187
		"""
188
		return {
189
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
190
			VarNames.EVENT: Actions.GET_MESSAGES,
191
			VarNames.CHANNEL: channel,
192
			HandlerNames.NAME: HandlerNames.CHAT
193
		}
194
195
	@property
196
	def stored_redis_user(self):
197
		return  self.user_id
198
199
	@property
200
	def channel(self):
201
		return RedisPrefix.generate_user(self.user_id)
202
203
	def subscribe_direct_channel_message(self, room_id, other_user_id):
204
		return {
205
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
206
			VarNames.ROOM_ID: room_id,
207
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
208
			HandlerNames.NAME: HandlerNames.CHANNELS
209
		}
210
211
	def subscribe_room_channel_message(self, room_id, room_name):
212
		return {
213
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
214
			VarNames.ROOM_ID: room_id,
215
			VarNames.ROOM_USERS: [self.user_id],
216
			HandlerNames.NAME: HandlerNames.CHANNELS,
217
			VarNames.ROOM_NAME: room_name
218
		}
219
220
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
221
		return {
222
			VarNames.EVENT: Actions.INVITE_USER,
223
			VarNames.ROOM_ID: room_id,
224
			VarNames.USER_ID: user_id,
225
			HandlerNames.NAME: HandlerNames.CHANNELS,
226
			VarNames.ROOM_NAME: room_name,
227
			VarNames.CONTENT: users
228
		}
229
230
	def add_user_to_room(self, channel, user_id, content):
231
		return {
232
			VarNames.EVENT: Actions.ADD_USER,
233
			VarNames.CHANNEL: channel,
234
			VarNames.USER_ID: user_id,
235
			HandlerNames.NAME: HandlerNames.CHAT,
236
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
237
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
238
		}
239
240
	def unsubscribe_direct_message(self, room_id):
241
		return {
242
			VarNames.EVENT: Actions.DELETE_ROOM,
243
			VarNames.ROOM_ID: room_id,
244
			VarNames.USER_ID: self.user_id,
245
			HandlerNames.NAME: HandlerNames.CHANNELS,
246
			VarNames.TIME: get_milliseconds()
247
		}
248
249
250
class MessagesHandler(MessagesCreator):
251
252
	def __init__(self, *args, **kwargs):
253
		self.parsable_prefix = 'p'
254
		super(MessagesHandler, self).__init__(*args, **kwargs)
255
		self.id = id(self)
256
		self.log_id = str(self.id % 10000).rjust(4, '0')
257
		self.ip = None
258
		log_params = {
259
			'user_id': '000',
260
			'id': self.log_id,
261
			'ip': 'initializing'
262
		}
263
		from chat import global_redis
264
		self.async_redis_publisher = global_redis.async_redis_publisher
265
		self.sync_redis = global_redis.sync_redis
266
		self.channels = []
267
		self.call_receiver_channel = None
268
		self.logger = logging.LoggerAdapter(logger, log_params)
269
		self.async_redis = tornadoredis.Client()
270
		self.pre_process_message = {
271
			Actions.GET_MESSAGES: self.process_get_messages,
272
			Actions.SEND_MESSAGE: self.process_send_message,
273
			Actions.CALL: self.process_call,
274
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
275
			Actions.DELETE_ROOM: self.delete_channel,
276
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
277
			Actions.INVITE_USER: self.invite_user,
278
		}
279
		self.post_process_message = {
280
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
281
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
282
			Actions.DELETE_ROOM: self.send_client_delete_channel,
283
			Actions.INVITE_USER: self.send_client_new_channel,
284
			Actions.CALL: self.set_opponent_call_channel
285
		}
286
287
	@tornado.gen.engine
288
	def listen(self, channels):
289
		yield tornado.gen.Task(
290
			self.async_redis.subscribe, channels)
291
		self.async_redis.listen(self.new_message)
292
293
	@tornado.gen.engine
294
	def add_channel(self, channel):
295
		self.channels.append(channel)
296
		yield tornado.gen.Task(
297
			self.async_redis.subscribe, channel)
298
299
	def do_db(self, callback, *args, **kwargs):
300
		try:
301
			return callback(*args, **kwargs)
302
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
303
			self.logger.warning('%s, reconnecting' % e)  # TODO
304
			connection.close()
305
			return callback(*args, **kwargs)
306
307
	def execute_query(self, query, *args, **kwargs):
308
		cursor = connection.cursor()
309
		cursor.execute(query, *args, **kwargs)
310
		return cursor.fetchall()
311
312
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
313
		"""
314
		:rtype : dict
315
		returns (dict, bool) if check_type is present
316
		"""
317
		online = self.sync_redis.hgetall(channel)
318
		self.logger.debug('!! redis online: %s', online)
319
		result = set()
320
		user_is_online = False
321
		# redis stores REDIS_USER_FORMAT, so parse them
322
		if online:
323
			for key_hash, raw_user_id in online.items():  # py2 iteritems
324
				user_id = int(raw_user_id.decode('utf-8'))
325
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
326
					user_is_online = True
327
				result.add(user_id)
328
		result = list(result)
329
		return (result, user_is_online) if check_user_id else result
330
331
	def add_online_user(self, room_id):
332
		"""
333
		adds to redis
334
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
335
		:return:
336
		"""
337
		channel_key = RedisPrefix.generate_room(room_id)
338
		online = self.get_online_from_redis(channel_key)
339
		self.async_redis_publisher.hset(channel_key, self.id, self.stored_redis_user)
340
		if self.user_id not in online:  # if a new tab has been opened
341
			online.append(self.user_id)
342
			online_user_names_mes = self.room_online(
343
				online,
344
				Actions.LOGIN,
345
				channel_key
346
			)
347
			self.logger.info('!! First tab, sending refresh online for all')
348
			self.publish(online_user_names_mes, channel_key)
349
		else:  # Send user names to self
350
			online_user_names_mes = self.room_online(
351
				online,
352
				Actions.REFRESH_USER,
353
				channel_key
354
			)
355
			self.logger.info('!! Second tab, retrieving online for self')
356
			self.safe_write(online_user_names_mes)
357
358
	def publish(self, message, channel=None, parsable=False):
359
		if channel is None:
360
			raise ValidationError('lolol')
361
		jsoned_mess = json.dumps(message)
362
		self.logger.debug('<%s> %s', channel, jsoned_mess)
363
		if parsable:
364
			jsoned_mess = self.encode(jsoned_mess)
365
		self.async_redis_publisher.publish(channel, jsoned_mess)
366
367
	def encode(self, message):
368
		"""
369
		Marks message with prefix to specify that
370
		it should be decoded and proccesed before sending to client
371
		@param message: message to mark
372
		@return: marked message
373
		"""
374
		return self.parsable_prefix + message
375
376
	def decode(self, message):
377
		"""
378
		Check if message should be proccessed by server before writing to client
379
		@param message: message to check
380
		@return: Object structure of message if it should be processed, None if not
381
		"""
382
		if message.startswith(self.parsable_prefix):
383
			return json.loads(message[1:])
384
385
	def new_message(self, message):
386
		data = message.body
387
		if type(data) is not int:  # subscribe event
388
			decoded = self.decode(data)
389
			if decoded:
390
				data = decoded
391
			self.safe_write(data)
392
			if decoded:
393
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
394
395
	def safe_write(self, message):
396
		raise NotImplementedError('WebSocketHandler implements')
397
398
	def process_send_message(self, message):
399
		"""
400
		:type message: dict
401
		"""
402
		channel = message[VarNames.CHANNEL]
403
		message_db = Message(
404
			sender_id=self.user_id,
405
			content=message[VarNames.CONTENT]
406
		)
407
		channel_id = RedisPrefix.extract_id(channel)
408
		message_db.room_id = channel_id
409
		if VarNames.IMG in message:
410
			message_db.img = extract_photo(message[VarNames.IMG])
411
		self.do_db(message_db.save)  # exit on hacked id with exception
412
		prepared_message = self.create_send_message(message_db)
413
		self.publish(prepared_message, channel)
414
415
	def process_call(self, in_message):
416
		"""
417
		:type in_message: dict
418
		"""
419
		call_type = in_message.get(VarNames.CALL_TYPE)
420
		set_opponent_channel = False
421
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
422
		if call_type == CallType.OFFER:
423
			to_channel = in_message[VarNames.CHANNEL]
424
			room_id = RedisPrefix.extract_id(to_channel)
425
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
426
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
427
			set_opponent_channel = True
428
			out_message[VarNames.CHANNEL] = to_channel
429
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
430
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
431
432
	def create_new_room(self, message):
433
		room_name = message[VarNames.ROOM_NAME]
434
		if not room_name or len(room_name) > 16:
435
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
436
		room = Room(name=room_name)
437
		self.do_db(room.save)
438
		room.users.add(self.user_id)
439
		room.save()
440
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
441
		self.publish(subscribe_message, self.channel, True)
442
443
	def invite_user(self, message):
444
		room_id = message[VarNames.ROOM_ID]
445
		user_id = message[VarNames.USER_ID]
446
		channel = RedisPrefix.generate_room(room_id)
447
		if channel not in self.channels:
448
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
449
		room = self.do_db(Room.objects.get, id=room_id)
450
		if room.is_private:
451
			raise ValidationError("You can't add users to direct room, create a new room instead")
452
		try:
453
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
454
		except IntegrityError:
455
			raise ValidationError("User is already in channel")
456
		users_in_room = {}
457
		for user in room.users.all():
458
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
459
		self.publish(self.add_user_to_room(channel, user_id, users_in_room[user_id]), channel)
460
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
461
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
462
463
	def create_user_channel(self, message):
464
		user_id = message[VarNames.USER_ID]
465
		query_res = self.do_db(self.execute_query, GET_DIRECT_ROOM_ID, [self.user_id, user_id])
466
		if len(query_res) > 0:
467
			result = query_res[0]
468
			room_id = result[0]
469
			disabled = result[1]
470
			if disabled is None:
471
				raise ValidationError('This room already exist')
472
			else:
473
				Room.objects.filter(id=room_id).update(disabled=None)
474
		else:
475
			room = Room()
476
			room.save()
477
			room.users.add(self.user_id, user_id)
478
			room.save()
479
			room_id = room.id
480
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
481
		self.publish(subscribe_message, self.channel, True)
482
		other_channel = RedisPrefix.generate_user(user_id)
483
		if self.channel != other_channel:
484
			self.publish(subscribe_message, other_channel, True)
485
486
	def delete_channel(self, message):
487
		room_id = message[VarNames.ROOM_ID]
488
		channel = RedisPrefix.generate_room(room_id)
489
		if channel not in self.channels or room_id == ALL_ROOM_ID:
490
			raise ValidationError('You are not allowed to exit this room')
491
		room = self.do_db(Room.objects.get, id=room_id)
492
		if room.disabled is not None:
493
			raise ValidationError('Room is already deleted')
494
		if room.name is None:  # if private then disable
495
			room.disabled = True
496
		else: # if public -> leave the room, delete the link
497
			room.users.remove(self.user_id)
498
			online = self.get_online_from_redis(channel)
499
			online.remove(self.user_id)
500
			self.publish(self.room_online(online, Actions.LOGOUT, channel), channel)
501
		room.save()
502
		message = self.unsubscribe_direct_message(room_id)
503
		self.publish(message, channel, True)
504
505
	def send_client_new_channel(self, message):
506
		room_id = message[VarNames.ROOM_ID]
507
		channel = RedisPrefix.generate_room(room_id)
508
		self.add_channel(channel)
509
		self.add_online_user(room_id)# TODO doesnt work if already subscribed
510
511
	def set_opponent_call_channel(self, message):
512
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
513
514
	def send_client_delete_channel(self, message):
515
		room_id = message[VarNames.ROOM_ID]
516
		channel = RedisPrefix.generate_room(room_id)
517
		self.async_redis.unsubscribe(channel)
518
		self.async_redis_publisher.hdel(channel, self.id)
519
		self.channels.remove(channel)
520
521
	def process_get_messages(self, data):
522
		"""
523
		:type data: dict
524
		"""
525
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
526
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
527
		channel = data[VarNames.CHANNEL]
528
		room_id = RedisPrefix.extract_id(channel)
529
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
530
		if header_id is None:
531
			messages = Message.objects.filter(Q(room_id=room_id)).order_by('-pk')[:count]
532
		else:
533
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id)).order_by('-pk')[:count]
534
		response = self.do_db(self.get_messages, messages, channel)
535
		self.safe_write(response)
536
537
	def get_users_in_current_user_rooms(self):
538
		"""
539
		{
540
			"ROOM_ID:1": {
541
				"name": "All",
542
				"users": {
543
					"USER_ID:admin": {
544
						"name": "USER_NAME:admin",
545
						"sex": "SEX:Secret"
546
					},
547
					"USER_ID_2": {
548
						"name": "USER_NAME:Mike",
549
						"sex": "Male"
550
					}
551
				},
552
				"isPrivate": true
553
			}
554
		}
555
		"""
556
		query_res = self.execute_query(USER_ROOMS_QUERY, [self.user_id])
557
		res = {}
558
		for user in query_res:
559
			user_id = user[0]
560
			user_name = user[1]
561
			user_sex = user[2]
562
			room_id = user[3]
563
			room_name = user[4]
564
			if room_id not in res:
565
				res[room_id] = {
566
					VarNames.ROOM_NAME: room_name,
567
					VarNames.ROOM_USERS: {}
568
				}
569
			self.set_js_user_structure(res[room_id][VarNames.ROOM_USERS], user_id, user_name, user_sex)
570
		return res
571
572
	def set_js_user_structure(self, user_dict, user_id, name, sex):
573
		user_dict[user_id] = {
574
			VarNames.USER: name,
575
			VarNames.GENDER: GENDERS[sex]
576
		}
577
578
	def save_ip(self):
579
		if (self.do_db(UserJoinedInfo.objects.filter(
580
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
581
			return
582
		ip_address = self.get_or_create_ip()
583
		UserJoinedInfo.objects.create(
584
			ip=ip_address,
585
			user_id=self.user_id
586
		)
587
588
	def get_or_create_ip(self):
589
		try:
590
			ip_address = IpAddress.objects.get(ip=self.ip)
591
		except IpAddress.DoesNotExist:
592
			try:
593
				if not api_url:
594
					raise Exception('api url is absent')
595
				self.logger.debug("Creating ip record %s", self.ip)
596
				f = urlopen(api_url % self.ip)
597
				raw_response = f.read().decode("utf-8")
598
				response = json.loads(raw_response)
599
				if response['status'] != "success":
600
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
601
				ip_address = IpAddress.objects.create(
602
					ip=self.ip,
603
					isp=response['isp'],
604
					country=response['country'],
605
					region=response['regionName'],
606
					city=response['city'],
607
					country_code=response['countryCode']
608
				)
609
			except Exception as e:
610
				self.logger.error("Error while creating ip with country info, because %s", e)
611
				ip_address = IpAddress.objects.create(ip=self.ip)
612
		return ip_address
613
614
615
class AntiSpam(object):
616
617
	def __init__(self):
618
		self.spammed = 0
619
		self.info = {}
620
621
	def check_spam(self, json_message):
622
		message_length = len(json_message)
623
		info_key = int(round(time.time() * 100))
624
		self.info[info_key] = message_length
625
		if message_length > MAX_MESSAGE_SIZE:
626
			self.spammed += 1
627
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
628
		self.check_timed_spam()
629
630
	def check_timed_spam(self):
631
		# TODO implement me
632
		pass
633
		# raise ValidationError("You're chatting too much, calm down a bit!")
634
635
636
class TornadoHandler(WebSocketHandler, MessagesHandler):
637
638
	def __init__(self, *args, **kwargs):
639
		super(TornadoHandler, self).__init__(*args, **kwargs)
640
		self.connected = False
641
		self.anti_spam = AntiSpam()
642
643
	def data_received(self, chunk):
644
		pass
645
646
	def on_message(self, json_message):
647
		try:
648
			if not self.connected:
649
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
650
			if not json_message:
651
				raise ValidationError('Skipping null message')
652
			# self.anti_spam.check_spam(json_message)
653
			self.logger.debug('<< %s', json_message)
654
			message = json.loads(json_message)
655
			if message[VarNames.EVENT] not in self.pre_process_message:
656
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
657
			channel = message.get(VarNames.CHANNEL)
658
			if channel and channel not in self.channels:
659
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
660
			self.pre_process_message[message[VarNames.EVENT]](message)
661
		except ValidationError as e:
662
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
663
			self.safe_write(error_message)
664
665
	def on_close(self):
666
		if self.async_redis.subscribed:
667
			self.async_redis.unsubscribe(self.channels)
668
		log_data = {}
669
		for channel in self.channels:
670
			if channel.startswith(RedisPrefix.ROOM_CHANNEL_PREFIX):
671
				self.sync_redis.hdel(channel, self.id)
672
				if self.connected:
673
					# seems like async solves problem with connection lost and wrong data status
674
					# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
675
					online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
676
					log_data[channel] = {'online': online, 'is_online': is_online}
677
					if not is_online:
678
						message = self.room_online(online, Actions.LOGOUT, channel)
679
						self.publish(message, channel)
680
681
		self.logger.info("Close connection result: %s", json.dumps(log_data))
682
		self.async_redis.disconnect()
683
684
	def open(self):
685
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
686
		if sessionStore.exists(session_key):
687
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
688
			self.ip = self.get_client_ip()
689
			session = SessionStore(session_key)
690
			self.user_id = int(session["_auth_user_id"])
691
			log_params = {
692
				'user_id': str(self.user_id).zfill(3),
693
				'id': self.log_id,
694
				'ip': self.ip
695
			}
696
			self.logger = logging.LoggerAdapter(logger, log_params)
697
			self.async_redis.connect()
698
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
699
			self.sender_name = user_db.username
700
			self.sex = user_db.sex_str
701
			user_rooms = self.get_users_in_current_user_rooms()
702
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
703
			self.channels.clear()
704
			self.channels.append(self.channel)
705
			for room_id in user_rooms:
706
				self.channels.append(RedisPrefix.generate_room(room_id))
707
			self.listen(self.channels)
708
			for room_id in user_rooms:
709
				self.add_online_user(room_id)
710
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
711
			self.connected = True
712
			Thread(target=self.save_ip).start()
713
		else:
714
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
715
			self.close(403, "Session key %s has been rejected" % session_key)
716
717
	def check_origin(self, origin):
718
		"""
719
		check whether browser set domain matches origin
720
		"""
721
		parsed_origin = urlparse(origin)
722
		origin = parsed_origin.netloc
723
		origin_domain = origin.split(':')[0].lower()
724
		browser_set = self.request.headers.get("Host")
725
		browser_domain = browser_set.split(':')[0]
726
		return browser_domain == origin_domain
727
728
	def safe_write(self, message):
729
		"""
730
		Tries to send message, doesn't throw exception outside
731
		:type self: MessagesHandler
732
		"""
733
		# self.logger.debug('<< THREAD %s >>', os.getppid())
734
		try:
735
			if isinstance(message, dict):
736
				message = json.dumps(message)
737
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
738
				raise ValueError('Wrong message type : %s' % str(message))
739
			self.logger.debug(">> %s", message)
740
			self.write_message(message)
741
		except tornado.websocket.WebSocketClosedError as e:
742
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
743
744
	def get_client_ip(self):
745
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
746