Completed
Push — master ( 752250...a6f716 )
by Andrew
01:11
created

MessagesHandler   F

Complexity

Total Complexity 63

Size/Duplication

Total Lines 366
Duplicated Lines 0 %

Importance

Changes 12
Bugs 2 Features 0
Metric Value
c 12
b 2
f 0
dl 0
loc 366
rs 3.6585
wmc 63

24 Methods

Rating   Name   Duplication   Size   Complexity  
B __init__() 0 31 1
A publish() 0 8 3
A set_js_user_structure() 0 4 1
B create_user_channel() 0 24 4
B add_online_user() 0 26 2
A encode() 0 8 1
A save_ip() 0 8 2
A listen() 0 5 1
A new_message() 0 9 4
A create_new_room() 0 10 3
A safe_write() 0 2 1
B get_or_create_ip() 0 25 5
A do_db() 0 7 2
A add_channel() 0 5 1
A send_client_delete_channel() 0 6 1
C process_send_message() 0 31 7
B delete_channel() 0 18 5
A decode() 0 8 2
B get_users_in_current_user_rooms() 0 36 3
A send_client_new_channel() 0 5 1
B get_online_from_redis() 0 18 6
A invite_user() 0 17 4
A process_get_messages() 0 19 2
A process_call() 0 8 1

How to fix   Complexity   

Complex Class

Complex classes like MessagesHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, USER_ROOMS_QUERY, GENDERS, GET_DIRECT_ROOM_ID
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo
30
31
PY3 = sys.version > '3'
32
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions:
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	SYSTEM_MESSAGE = 'system'
54
	GROWL_MESSAGE = 'growl'
55
	GET_MESSAGES = 'messages'
56
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
57
	DELETE_ROOM = 'deleteRoom'
58
	CREATE_ROOM_CHANNEL = 'addRoom'
59
	INVITE_USER = 'inviteUser'
60
61
62
class VarNames:
63
	RECEIVER_ID = 'receiverId'
64
	RECEIVER_NAME = 'receiverName'
65
	CALL_TYPE = 'type'
66
	USER = 'user'
67
	USER_ID =  'userId'
68
	TIME = 'time'
69
	CONTENT = 'content'
70
	IMG = 'image'
71
	EVENT = 'action'
72
	MESSAGE_ID = 'id'
73
	GENDER = 'sex'
74
	ROOM_NAME = 'name'
75
	ROOM_ID = 'roomId'
76
	ROOM_USERS = 'users'
77
	CHANNEL = 'channel'
78
	CHANNEL_NAME = 'channel'
79
	IS_ROOM_PRIVATE = 'private'
80
	#ROOM_NAME = 'roomName'
81
	# ROOM_ID = 'roomId'
82
83
84
class HandlerNames:
85
	NAME = 'handler'
86
	CHANNELS = 'channels'
87
	CHAT = 'chat'
88
	GROWL = 'growl'
89
	WEBRTC = 'webrtc'
90
91
92
class RedisPrefix:
93
	USER_ID_CHANNEL_PREFIX = 'u'
94
	ROOM_CHANNEL_PREFIX = 'r'
95
	__ROOM_ONLINE__ = 'o:{}'
96
97
	@classmethod
98
	def generate_user(cls, key):
99
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
100
101
	@classmethod
102
	def generate_room(cls, key):
103
		return cls.ROOM_CHANNEL_PREFIX + str(key)
104
105
RedisPrefix.DEFAULT_CHANNEL = RedisPrefix.generate_room(ALL_ROOM_ID)
106
107
108
class MessagesCreator(object):
109
110
	def __init__(self, *args, **kwargs):
111
		super(MessagesCreator, self).__init__(*args, **kwargs)
112
		self.sex = None
113
		self.sender_name = None
114
		self.user_id = 0  # anonymous by default
115
116
	def default(self, content, event, handler):
117
		"""
118
		:return: {"action": event, "content": content, "time": "20:48:57"}
119
		"""
120
		return {
121
			VarNames.EVENT: event,
122
			VarNames.CONTENT: content,
123
			VarNames.USER_ID: self.user_id,
124
			VarNames.TIME: get_milliseconds(),
125
			HandlerNames.NAME: handler
126
		}
127
128
	def room_online(self, online, event, channel):
129
		"""
130
		:return: {"action": event, "content": content, "time": "20:48:57"}
131
		"""
132
		room_less = self.default(online, event, HandlerNames.CHAT)
133
		room_less[VarNames.CHANNEL_NAME] = channel
134
		room_less[VarNames.USER] = self.sender_name
135
		room_less[VarNames.GENDER] = self.sex
136
		return room_less
137
138
	def offer_call(self, content, message_type):
139
		"""
140
		:return: {"action": "call", "content": content, "time": "20:48:57"}
141
		"""
142
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
143
		message[VarNames.CALL_TYPE] = message_type
144
		return message
145
146
	@classmethod
147
	def create_send_message(cls, message):
148
		"""
149
		:param message:
150
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
151
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
152
		"""
153
		if message.receiver_id:
154
			channel = RedisPrefix.generate_user(message.receiver_id)
155
		elif message.room_id:
156
			channel = RedisPrefix.generate_room(message.room_id)
157
		else:
158
			raise ValidationError('Channel is none')
159
		result = {
160
			VarNames.USER_ID: message.sender_id,
161
			VarNames.CONTENT: message.content,
162
			VarNames.TIME: message.time,
163
			VarNames.MESSAGE_ID: message.id,
164
			VarNames.EVENT: Actions.PRINT_MESSAGE,
165
			VarNames.CHANNEL: channel,
166
			HandlerNames.NAME: HandlerNames.CHAT
167
		}
168
		if message.img.name:
169
			result[VarNames.IMG] = message.img.url
170
		if message.receiver_id:
171
			result[VarNames.RECEIVER_ID] = message.receiver.id
172
			result[VarNames.RECEIVER_NAME] = message.receiver.username
173
		return result
174
175
	@classmethod
176
	def get_messages(cls, messages):
177
		"""
178
		:type messages: list[Messages]
179
		:type messages: QuerySet[Messages]
180
		"""
181
		return {
182
			VarNames.CONTENT: [cls.create_send_message(message) for message in messages],
183
			VarNames.EVENT: Actions.GET_MESSAGES
184
		}
185
186
	@property
187
	def stored_redis_user(self):
188
		return  self.user_id
189
190
	@property
191
	def channel(self):
192
		return RedisPrefix.generate_user(self.user_id)
193
194
	def subscribe_direct_channel_message(self, room_id, other_user_id):
195
		return {
196
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
197
			VarNames.ROOM_ID: room_id,
198
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
199
			HandlerNames.NAME: HandlerNames.CHANNELS
200
		}
201
202
	def subscribe_room_channel_message(self, room_id, room_name):
203
		return {
204
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
205
			VarNames.ROOM_ID: room_id,
206
			VarNames.ROOM_USERS: [self.user_id],
207
			HandlerNames.NAME: HandlerNames.CHANNELS,
208
			VarNames.ROOM_NAME: room_name
209
		}
210
211
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
212
		return {
213
			VarNames.EVENT: Actions.INVITE_USER,
214
			VarNames.ROOM_ID: room_id,
215
			VarNames.USER_ID: user_id,
216
			HandlerNames.NAME: HandlerNames.CHANNELS,
217
			VarNames.ROOM_NAME: room_name,
218
			VarNames.CONTENT: users
219
220
		}
221
222
	def unsubscribe_direct_message(self, room_id):
223
		return {
224
			VarNames.EVENT: Actions.DELETE_ROOM,
225
			VarNames.ROOM_ID: room_id,
226
			VarNames.USER_ID: self.user_id,
227
			HandlerNames.NAME: HandlerNames.CHANNELS,
228
			VarNames.TIME: get_milliseconds()
229
		}
230
231
232
class MessagesHandler(MessagesCreator):
233
234
	def __init__(self, *args, **kwargs):
235
		self.parsable_prefix = 'p'
236
		super(MessagesHandler, self).__init__(*args, **kwargs)
237
		self.id = id(self)
238
		self.log_id = str(self.id % 10000).rjust(4, '0')
239
		self.ip = None
240
		log_params = {
241
			'user_id': '000',
242
			'id': self.log_id,
243
			'ip': 'initializing'
244
		}
245
		from chat import global_redis
246
		self.async_redis_publisher = global_redis.async_redis_publisher
247
		self.sync_redis = global_redis.sync_redis
248
		self.channels = []
249
		self.logger = logging.LoggerAdapter(logger, log_params)
250
		self.async_redis = tornadoredis.Client()
251
		self.pre_process_message = {
252
			Actions.GET_MESSAGES: self.process_get_messages,
253
			Actions.SEND_MESSAGE: self.process_send_message,
254
			Actions.CALL: self.process_call,
255
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
256
			Actions.DELETE_ROOM: self.delete_channel,
257
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
258
			Actions.INVITE_USER: self.invite_user,
259
		}
260
		self.post_process_message = {
261
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
262
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
263
			Actions.DELETE_ROOM: self.send_client_delete_channel,
264
			Actions.INVITE_USER: self.send_client_new_channel
265
		}
266
267
	@tornado.gen.engine
268
	def listen(self, channels):
269
		yield tornado.gen.Task(
270
			self.async_redis.subscribe, channels)
271
		self.async_redis.listen(self.new_message)
272
273
	@tornado.gen.engine
274
	def add_channel(self, channel):
275
		self.channels.append(channel)
276
		yield tornado.gen.Task(
277
			self.async_redis.subscribe, channel)
278
279
	def do_db(self, callback, *arg, **args):
280
		try:
281
			return callback(*arg, **args)
282
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
283
			self.logger.warning('%s, reconnecting' % e)  # TODO
284
			connection.close()
285
			return callback(*arg, **args)
286
287
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
288
		"""
289
		:rtype : dict
290
		returns (dict, bool) if check_type is present
291
		"""
292
		online = self.sync_redis.hgetall(channel)
293
		self.logger.debug('!! redis online: %s', online)
294
		result = set()
295
		user_is_online = False
296
		# redis stores REDIS_USER_FORMAT, so parse them
297
		if online:
298
			for key_hash, raw_user_id in online.items():  # py2 iteritems
299
				user_id = int(raw_user_id.decode('utf-8'))
300
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
301
					user_is_online = True
302
				result.add(user_id)
303
		result = list(result)
304
		return (result, user_is_online) if check_user_id else result
305
306
	def add_online_user(self, room_id):
307
		"""
308
		adds to redis
309
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
310
		:return:
311
		"""
312
		channel_key = RedisPrefix.generate_room(room_id)
313
		online = self.get_online_from_redis(channel_key)
314
		self.async_redis_publisher.hset(channel_key, self.id, self.stored_redis_user)
315
		if self.user_id not in online:  # if a new tab has been opened
316
			online.append(self.user_id)
317
			online_user_names_mes = self.room_online(
318
				online,
319
				Actions.LOGIN,
320
				channel_key
321
			)
322
			self.logger.info('!! First tab, sending refresh online for all')
323
			self.publish(online_user_names_mes, channel_key)
324
		else:  # Send user names to self
325
			online_user_names_mes = self.room_online(
326
				online,
327
				Actions.REFRESH_USER,
328
				channel_key
329
			)
330
			self.logger.info('!! Second tab, retrieving online for self')
331
			self.safe_write(online_user_names_mes)
332
333
	def publish(self, message, channel=None, parsable=False):
334
		if channel is None:
335
			raise ValidationError('lolol')
336
		jsoned_mess = json.dumps(message)
337
		self.logger.debug('<%s> %s', channel, jsoned_mess)
338
		if parsable:
339
			jsoned_mess = self.encode(jsoned_mess)
340
		self.async_redis_publisher.publish(channel, jsoned_mess)
341
342
	def encode(self, message):
343
		"""
344
		Marks message with prefix to specify that
345
		it should be decoded and proccesed before sending to client
346
		@param message: message to mark
347
		@return: marked message
348
		"""
349
		return self.parsable_prefix + message
350
351
	def decode(self, message):
352
		"""
353
		Check if message should be proccessed by server before writing to client
354
		@param message: message to check
355
		@return: Object structure of message if it should be processed, None if not
356
		"""
357
		if message.startswith(self.parsable_prefix):
358
			return json.loads(message[1:])
359
360
	def new_message(self, message):
361
		data = message.body
362
		if type(data) is not int:  # subscribe event
363
			decoded = self.decode(data)
364
			if decoded:
365
				data = decoded
366
			self.safe_write(data)
367
			if decoded:
368
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
369
370
	def safe_write(self, message):
371
		raise NotImplementedError('WebSocketHandler implements')
372
373
	def process_send_message(self, message):
374
		"""
375
		:type message: dict
376
		"""
377
		content = message[VarNames.CONTENT]
378
		receiver_id = message.get(VarNames.RECEIVER_ID)  # if receiver_id is None then its a private message
379
		self.logger.info('!! Sending message %s to user with id %s', content, receiver_id)
380
		channel = message[VarNames.CHANNEL]
381
		message_db = Message(
382
			sender_id=self.user_id,
383
			content=content
384
		)
385
		channel_id = int(channel[1:])
386
		if channel.startswith(RedisPrefix.USER_ID_CHANNEL_PREFIX):
387
			message_db.receiver_id = channel_id
388
		elif channel.startswith(RedisPrefix.ROOM_CHANNEL_PREFIX) and channel in self.channels:
389
			message_db.room_id = channel_id
390
		else:
391
			raise ValidationError('Access denied for channel {}'.format(channel))
392
		if VarNames.IMG in message:
393
			message_db.img = extract_photo(message[VarNames.IMG])
394
		self.do_db(message_db.save)  # exit on hacked id with exception
395
		prepared_message = self.create_send_message(message_db)
396
		if message_db.receiver_id is None:
397
			self.logger.debug('!! Detected as public')
398
			self.publish(prepared_message, channel)
399
		else:
400
			self.publish(prepared_message, self.channel)
401
			self.logger.debug('!! Detected as private, channel %s', channel)
402
			if channel != self.channel:
403
				self.publish(prepared_message, channel)
404
405
	def process_call(self, message):
406
		"""
407
		:type message: dict
408
		"""
409
		receiver_id = message.get(VarNames.RECEIVER_ID)  # if receiver_id is None then its a private message
410
		self.logger.info('!! Offering a call to user with id %s',  receiver_id)
411
		message = self.offer_call(message.get(VarNames.CONTENT), message.get(VarNames.CALL_TYPE))
412
		self.publish(message, RedisPrefix.generate_user(receiver_id))
413
414
	def create_new_room(self, message):
415
		room_name = message[VarNames.ROOM_NAME]
416
		if not room_name or len(room_name) > 16:
417
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
418
		room = Room(name=room_name)
419
		room.save()
420
		room.users.add(self.user_id)
421
		room.save()
422
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
423
		self.publish(subscribe_message, self.channel, True)
424
425
	def invite_user(self, message):
426
		print('asd')
427
		room_id = message[VarNames.ROOM_ID]
428
		user_id = message[VarNames.USER_ID]
429
		channel = RedisPrefix.generate_room(room_id)
430
		if channel not in self.channels:
431
			raise ValidationError("Access denied")
432
		try:
433
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
434
		except IntegrityError:
435
			raise ValidationError("User is already in channel")
436
		users_in_room = {}
437
		room = Room.objects.get(id=room_id)
438
		for user in room.users.all():
439
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
440
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
441
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
442
443
	def create_user_channel(self, message):
444
		user_id = message[VarNames.USER_ID]
445
		cursor = connection.cursor()
446
		cursor.execute(GET_DIRECT_ROOM_ID, [self.user_id, user_id])
447
		query_res = cursor.fetchall()
448
		if len(query_res) > 0:
449
			result = query_res[0]
450
			room_id = result[0]
451
			disabled = result[1]
452
			if disabled is None:
453
				raise ValidationError('This room already exist')
454
			else:
455
				Room.objects.filter(id=room_id).update(disabled=None)
456
		else:
457
			room = Room()
458
			room.save()
459
			room.users.add(self.user_id, user_id)
460
			room.save()
461
			room_id = room.id
462
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
463
		self.publish(subscribe_message, self.channel, True)
464
		other_channel = RedisPrefix.generate_user(user_id)
465
		if self.channel != other_channel:
466
			self.publish(subscribe_message, other_channel, True)
467
468
	def delete_channel(self, message):
469
		room_id = message[VarNames.ROOM_ID]
470
		channel = RedisPrefix.generate_room(room_id)
471
		if channel not in self.channels or room_id == ALL_ROOM_ID:
472
			raise ValidationError('You are not allowed to delete this room')
473
		room = Room.objects.get(id=room_id)
474
		if room.disabled is not None:
475
			raise ValidationError('Room is already deleted')
476
		if room.name is None:  # if private then disable
477
			room.disabled = True
478
		else: # if public -> leave the room, delete the link
479
			room.users.remove(self.user_id)
480
			online = self.get_online_from_redis(channel)
481
			online.remove(self.user_id)
482
			self.publish(self.room_online(online, Actions.LOGOUT, channel), channel)
483
		room.save()
484
		message = self.unsubscribe_direct_message(room_id)
485
		self.publish(message, channel, True)
486
487
	def send_client_new_channel(self, message):
488
		room_id = message[VarNames.ROOM_ID]
489
		channel = RedisPrefix.generate_room(room_id)
490
		self.add_channel(channel)
491
		self.add_online_user(room_id)# TODO doesnt work if already subscribed
492
493
	def send_client_delete_channel(self, message):
494
		room_id = message[VarNames.ROOM_ID]
495
		channel = RedisPrefix.generate_room(room_id)
496
		self.async_redis.unsubscribe(channel)
497
		self.async_redis_publisher.hdel(channel, self.id)
498
		self.channels.remove(channel)
499
500
	def process_get_messages(self, data):
501
		"""
502
		:type data: dict
503
		"""
504
		header_id = data.get('headerId', None)
505
		count = int(data.get('count', 10))
506
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
507
		if header_id is None:
508
			messages = Message.objects.filter(
509
				# Only public or private or private
510
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
511
			).order_by('-pk')[:count]
512
		else:
513
			messages = Message.objects.filter(
514
				Q(id__lt=header_id),
515
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
516
			).order_by('-pk')[:count]
517
		response = self.do_db(self.get_messages, messages)
518
		self.safe_write(response)
519
520
	def get_users_in_current_user_rooms(self):
521
		"""
522
		{
523
			"ROOM_ID:1": {
524
				"name": "All",
525
				"users": {
526
					"USER_ID:admin": {
527
						"name": "USER_NAME:admin",
528
						"sex": "SEX:Secret"
529
					},
530
					"USER_ID_2": {
531
						"name": "USER_NAME:Mike",
532
						"sex": "Male"
533
					}
534
				},
535
				"isPrivate": true
536
			}
537
		}
538
		"""
539
		cursor = connection.cursor()
540
		cursor.execute(USER_ROOMS_QUERY, [self.user_id])
541
		query_res = cursor.fetchall()
542
		res = {}
543
		for user in query_res:
544
			user_id = user[0]
545
			user_name = user[1]
546
			user_sex = user[2]
547
			room_id = user[3]
548
			room_name = user[4]
549
			if room_id not in res:
550
				res[room_id] = {
551
					VarNames.ROOM_NAME: room_name,
552
					VarNames.ROOM_USERS: {}
553
				}
554
			self.set_js_user_structure(res[room_id][VarNames.ROOM_USERS], user_id, user_name, user_sex)
555
		return res
556
557
	def set_js_user_structure(self, user_dict, user_id, name, sex):
558
		user_dict[user_id] = {
559
			VarNames.USER: name,
560
			VarNames.GENDER: GENDERS[sex]
561
		}
562
563
	def save_ip(self):
564
		if (self.do_db(UserJoinedInfo.objects.filter(
565
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
566
			return
567
		ip_address = self.get_or_create_ip()
568
		UserJoinedInfo.objects.create(
569
			ip=ip_address,
570
			user_id=self.user_id
571
		)
572
573
	def get_or_create_ip(self):
574
		try:
575
			ip_address = IpAddress.objects.get(ip=self.ip)
576
		except IpAddress.DoesNotExist:
577
			try:
578
				if not api_url:
579
					raise Exception('api url is absent')
580
				self.logger.debug("Creating ip record %s", self.ip)
581
				f = urlopen(api_url % self.ip)
582
				raw_response = f.read().decode("utf-8")
583
				response = json.loads(raw_response)
584
				if response['status'] != "success":
585
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
586
				ip_address = IpAddress.objects.create(
587
					ip=self.ip,
588
					isp=response['isp'],
589
					country=response['country'],
590
					region=response['regionName'],
591
					city=response['city'],
592
					country_code=response['countryCode']
593
				)
594
			except Exception as e:
595
				self.logger.error("Error while creating ip with country info, because %s", e)
596
				ip_address = IpAddress.objects.create(ip=self.ip)
597
		return ip_address
598
599
600
class AntiSpam(object):
601
602
	def __init__(self):
603
		self.spammed = 0
604
		self.info = {}
605
606
	def check_spam(self, json_message):
607
		message_length = len(json_message)
608
		info_key = int(round(time.time() * 100))
609
		self.info[info_key] = message_length
610
		if message_length > MAX_MESSAGE_SIZE:
611
			self.spammed += 1
612
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
613
		self.check_timed_spam()
614
615
	def check_timed_spam(self):
616
		# TODO implement me
617
		pass
618
		# raise ValidationError("You're chatting too much, calm down a bit!")
619
620
621
class TornadoHandler(WebSocketHandler, MessagesHandler):
622
623
	def __init__(self, *args, **kwargs):
624
		super(TornadoHandler, self).__init__(*args, **kwargs)
625
		self.connected = False
626
		self.anti_spam = AntiSpam()
627
628
	def data_received(self, chunk):
629
		pass
630
631
	def on_message(self, json_message):
632
		try:
633
			if not self.connected:
634
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
635
			if not json_message:
636
				raise ValidationError('Skipping null message')
637
			# self.anti_spam.check_spam(json_message)
638
			self.logger.debug('<< %s', json_message)
639
			message = json.loads(json_message)
640
			if message[VarNames.EVENT] not in self.pre_process_message:
641
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
642
			self.pre_process_message[message[VarNames.EVENT]](message)
643
		except ValidationError as e:
644
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
645
			self.safe_write(error_message)
646
647
	def on_close(self):
648
		if self.async_redis.subscribed:
649
			self.async_redis.unsubscribe(self.channels)
650
		log_data = {}
651
		for channel in self.channels:
652
			if channel.startswith(RedisPrefix.ROOM_CHANNEL_PREFIX):
653
				self.sync_redis.hdel(channel, self.id)
654
				if self.connected:
655
					# seems like async solves problem with connection lost and wrong data status
656
					# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
657
					online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
658
					log_data[channel] = {'online': online, 'is_online': is_online}
659
					if not is_online:
660
						message = self.room_online(online, Actions.LOGOUT, channel)
661
						self.publish(message, channel)
662
		self.logger.info("Close connection result: %s", json.dumps(log_data))
663
		self.async_redis.disconnect()
664
665
	def open(self):
666
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
667
		if sessionStore.exists(session_key):
668
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
669
			self.ip = self.get_client_ip()
670
			session = SessionStore(session_key)
671
			self.user_id = int(session["_auth_user_id"])
672
			log_params = {
673
				'user_id': str(self.user_id).zfill(3),
674
				'id': self.log_id,
675
				'ip': self.ip
676
			}
677
			self.logger = logging.LoggerAdapter(logger, log_params)
678
			self.async_redis.connect()
679
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
680
			self.sender_name = user_db.username
681
			self.sex = user_db.sex_str
682
			user_rooms = self.get_users_in_current_user_rooms()
683
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
684
			self.channels.clear()
685
			self.channels.append(self.channel)
686
			for room_id in user_rooms:
687
				self.channels.append(RedisPrefix.generate_room(room_id))
688
			self.listen(self.channels)
689
			for room_id in user_rooms:
690
				self.add_online_user(room_id)
691
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
692
			self.connected = True
693
			Thread(target=self.save_ip).start()
694
		else:
695
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
696
			self.close(403, "Session key %s has been rejected" % session_key)
697
698
	def check_origin(self, origin):
699
		"""
700
		check whether browser set domain matches origin
701
		"""
702
		parsed_origin = urlparse(origin)
703
		origin = parsed_origin.netloc
704
		origin_domain = origin.split(':')[0].lower()
705
		browser_set = self.request.headers.get("Host")
706
		browser_domain = browser_set.split(':')[0]
707
		return browser_domain == origin_domain
708
709
	def safe_write(self, message):
710
		"""
711
		Tries to send message, doesn't throw exception outside
712
		:type self: MessagesHandler
713
		"""
714
		# self.logger.debug('<< THREAD %s >>', os.getppid())
715
		try:
716
			if isinstance(message, dict):
717
				message = json.dumps(message)
718
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
719
				raise ValueError('Wrong message type : %s' % str(message))
720
			self.logger.debug(">> %s", message)
721
			self.write_message(message)
722
		except tornado.websocket.WebSocketClosedError as e:
723
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
724
725
	def get_client_ip(self):
726
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
727