Completed
Push — master ( 0882b1...17f835 )
by Andrew
01:11
created

MessagesHandler   D

Complexity

Total Complexity 61

Size/Duplication

Total Lines 362
Duplicated Lines 0 %

Importance

Changes 15
Bugs 3 Features 0
Metric Value
c 15
b 3
f 0
dl 0
loc 362
rs 4.054
wmc 61

25 Methods

Rating   Name   Duplication   Size   Complexity  
A set_js_user_structure() 0 4 1
A save_ip() 0 8 2
B get_or_create_ip() 0 25 5
B get_users_in_current_user_rooms() 0 36 3
B __init__() 0 33 1
A publish() 0 8 3
B add_online_user() 0 26 2
A encode() 0 8 1
A listen() 0 5 1
A new_message() 0 9 4
A safe_write() 0 2 1
A do_db() 0 7 2
A add_channel() 0 5 1
A decode() 0 8 2
B get_online_from_redis() 0 18 6
B create_user_channel() 0 24 4
A set_opponent_call_channel() 0 2 1
A create_new_room() 0 10 3
A send_client_delete_channel() 0 6 1
A process_send_message() 0 16 2
B delete_channel() 0 18 5
A send_client_new_channel() 0 5 1
B invite_user() 0 19 5
A process_get_messages() 0 15 2
A process_call() 0 16 2

How to fix   Complexity   

Complex Class

Complex classes like MessagesHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, USER_ROOMS_QUERY, GENDERS, GET_DIRECT_ROOM_ID
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo
30
31
PY3 = sys.version > '3'
32
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions:
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	SYSTEM_MESSAGE = 'system'
54
	GROWL_MESSAGE = 'growl'
55
	GET_MESSAGES = 'loadMessages'
56
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
57
	DELETE_ROOM = 'deleteRoom'
58
	CREATE_ROOM_CHANNEL = 'addRoom'
59
	INVITE_USER = 'inviteUser'
60
	ADD_USER = 'addUserToAll'
61
62
63
class VarNames:
64
	CALL_TYPE = 'type'
65
	USER = 'user'
66
	USER_ID =  'userId'
67
	TIME = 'time'
68
	CONTENT = 'content'
69
	IMG = 'image'
70
	EVENT = 'action'
71
	MESSAGE_ID = 'id'
72
	GENDER = 'sex'
73
	ROOM_NAME = 'name'
74
	ROOM_ID = 'roomId'
75
	ROOM_USERS = 'users'
76
	CHANNEL = 'channel'
77
	GET_MESSAGES_COUNT = 'count'
78
	GET_MESSAGES_HEADER_ID = 'headerId'
79
	CHANNEL_NAME = 'channel'
80
	IS_ROOM_PRIVATE = 'private'
81
	#ROOM_NAME = 'roomName'
82
	# ROOM_ID = 'roomId'
83
84
class CallType:
85
	OFFER = 'offer'
86
87
class HandlerNames:
88
	NAME = 'handler'
89
	CHANNELS = 'channels'
90
	CHAT = 'chat'
91
	GROWL = 'growl'
92
	WEBRTC = 'webrtc'
93
94
95
class RedisPrefix:
96
	USER_ID_CHANNEL_PREFIX = 'u'
97
	ROOM_CHANNEL_PREFIX = 'r'
98
	__ROOM_ONLINE__ = 'o:{}'
99
100
	@classmethod
101
	def generate_user(cls, key):
102
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
103
104
	@classmethod
105
	def generate_room(cls, key):
106
		return cls.ROOM_CHANNEL_PREFIX + str(key)
107
108
	@classmethod
109
	def extract_id(cls, channel):
110
		return int(channel[1:])
111
112
RedisPrefix.DEFAULT_CHANNEL = RedisPrefix.generate_room(ALL_ROOM_ID)
113
114
115
class MessagesCreator(object):
116
117
	def __init__(self, *args, **kwargs):
118
		super(MessagesCreator, self).__init__(*args, **kwargs)
119
		self.sex = None
120
		self.sender_name = None
121
		self.user_id = 0  # anonymous by default
122
123
	def default(self, content, event, handler):
124
		"""
125
		:return: {"action": event, "content": content, "time": "20:48:57"}
126
		"""
127
		return {
128
			VarNames.EVENT: event,
129
			VarNames.CONTENT: content,
130
			VarNames.USER_ID: self.user_id,
131
			VarNames.TIME: get_milliseconds(),
132
			HandlerNames.NAME: handler
133
		}
134
135
	def room_online(self, online, event, channel):
136
		"""
137
		:return: {"action": event, "content": content, "time": "20:48:57"}
138
		"""
139
		room_less = self.default(online, event, HandlerNames.CHAT)
140
		room_less[VarNames.CHANNEL_NAME] = channel
141
		room_less[VarNames.USER] = self.sender_name
142
		room_less[VarNames.GENDER] = self.sex
143
		return room_less
144
145
	def offer_call(self, content, message_type):
146
		"""
147
		:return: {"action": "call", "content": content, "time": "20:48:57"}
148
		"""
149
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
150
		message[VarNames.CALL_TYPE] = message_type
151
		message[VarNames.USER] = self.sender_name
152
		return message
153
154
	@classmethod
155
	def create_message(cls, message):
156
		res = {
157
			VarNames.USER_ID: message.sender_id,
158
			VarNames.CONTENT: message.content,
159
			VarNames.TIME: message.time,
160
			VarNames.MESSAGE_ID: message.id,
161
		}
162
		if message.img.name:
163
			res[VarNames.IMG] = message.img.url
164
		return res
165
166
	@classmethod
167
	def create_send_message(cls, message):
168
		"""
169
		:param message:
170
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
171
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
172
		"""
173
		channel = RedisPrefix.generate_room(message.room_id)
174
		res = cls.create_message(message)
175
		res[VarNames.EVENT] = Actions.PRINT_MESSAGE,
176
		res[VarNames.CHANNEL] = channel
177
		res[HandlerNames.NAME] = HandlerNames.CHAT
178
		return res
179
180
	@classmethod
181
	def get_messages(cls, messages, channel):
182
		"""
183
		:type messages: list[Messages]
184
		:type channel: str
185
		:type messages: QuerySet[Messages]
186
		"""
187
		return {
188
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
189
			VarNames.EVENT: Actions.GET_MESSAGES,
190
			VarNames.CHANNEL: channel,
191
			HandlerNames.NAME: HandlerNames.CHAT
192
		}
193
194
	@property
195
	def stored_redis_user(self):
196
		return  self.user_id
197
198
	@property
199
	def channel(self):
200
		return RedisPrefix.generate_user(self.user_id)
201
202
	def subscribe_direct_channel_message(self, room_id, other_user_id):
203
		return {
204
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
205
			VarNames.ROOM_ID: room_id,
206
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
207
			HandlerNames.NAME: HandlerNames.CHANNELS
208
		}
209
210
	def subscribe_room_channel_message(self, room_id, room_name):
211
		return {
212
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
213
			VarNames.ROOM_ID: room_id,
214
			VarNames.ROOM_USERS: [self.user_id],
215
			HandlerNames.NAME: HandlerNames.CHANNELS,
216
			VarNames.ROOM_NAME: room_name
217
		}
218
219
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
220
		return {
221
			VarNames.EVENT: Actions.INVITE_USER,
222
			VarNames.ROOM_ID: room_id,
223
			VarNames.USER_ID: user_id,
224
			HandlerNames.NAME: HandlerNames.CHANNELS,
225
			VarNames.ROOM_NAME: room_name,
226
			VarNames.CONTENT: users
227
		}
228
229
	def add_user_to_room(self, channel, user_id, content):
230
		return {
231
			VarNames.EVENT: Actions.ADD_USER,
232
			VarNames.CHANNEL: channel,
233
			VarNames.USER_ID: user_id,
234
			HandlerNames.NAME: HandlerNames.CHAT,
235
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
236
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
237
		}
238
239
	def unsubscribe_direct_message(self, room_id):
240
		return {
241
			VarNames.EVENT: Actions.DELETE_ROOM,
242
			VarNames.ROOM_ID: room_id,
243
			VarNames.USER_ID: self.user_id,
244
			HandlerNames.NAME: HandlerNames.CHANNELS,
245
			VarNames.TIME: get_milliseconds()
246
		}
247
248
249
class MessagesHandler(MessagesCreator):
250
251
	def __init__(self, *args, **kwargs):
252
		self.parsable_prefix = 'p'
253
		super(MessagesHandler, self).__init__(*args, **kwargs)
254
		self.id = id(self)
255
		self.log_id = str(self.id % 10000).rjust(4, '0')
256
		self.ip = None
257
		log_params = {
258
			'user_id': '000',
259
			'id': self.log_id,
260
			'ip': 'initializing'
261
		}
262
		from chat import global_redis
263
		self.async_redis_publisher = global_redis.async_redis_publisher
264
		self.sync_redis = global_redis.sync_redis
265
		self.channels = []
266
		self.call_receiver_channel = None
267
		self.logger = logging.LoggerAdapter(logger, log_params)
268
		self.async_redis = tornadoredis.Client()
269
		self.pre_process_message = {
270
			Actions.GET_MESSAGES: self.process_get_messages,
271
			Actions.SEND_MESSAGE: self.process_send_message,
272
			Actions.CALL: self.process_call,
273
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
274
			Actions.DELETE_ROOM: self.delete_channel,
275
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
276
			Actions.INVITE_USER: self.invite_user,
277
		}
278
		self.post_process_message = {
279
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
280
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
281
			Actions.DELETE_ROOM: self.send_client_delete_channel,
282
			Actions.INVITE_USER: self.send_client_new_channel,
283
			Actions.CALL: self.set_opponent_call_channel
284
		}
285
286
	@tornado.gen.engine
287
	def listen(self, channels):
288
		yield tornado.gen.Task(
289
			self.async_redis.subscribe, channels)
290
		self.async_redis.listen(self.new_message)
291
292
	@tornado.gen.engine
293
	def add_channel(self, channel):
294
		self.channels.append(channel)
295
		yield tornado.gen.Task(
296
			self.async_redis.subscribe, channel)
297
298
	def do_db(self, callback, *arg, **args):
299
		try:
300
			return callback(*arg, **args)
301
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
302
			self.logger.warning('%s, reconnecting' % e)  # TODO
303
			connection.close()
304
			return callback(*arg, **args)
305
306
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
307
		"""
308
		:rtype : dict
309
		returns (dict, bool) if check_type is present
310
		"""
311
		online = self.sync_redis.hgetall(channel)
312
		self.logger.debug('!! redis online: %s', online)
313
		result = set()
314
		user_is_online = False
315
		# redis stores REDIS_USER_FORMAT, so parse them
316
		if online:
317
			for key_hash, raw_user_id in online.items():  # py2 iteritems
318
				user_id = int(raw_user_id.decode('utf-8'))
319
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
320
					user_is_online = True
321
				result.add(user_id)
322
		result = list(result)
323
		return (result, user_is_online) if check_user_id else result
324
325
	def add_online_user(self, room_id):
326
		"""
327
		adds to redis
328
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
329
		:return:
330
		"""
331
		channel_key = RedisPrefix.generate_room(room_id)
332
		online = self.get_online_from_redis(channel_key)
333
		self.async_redis_publisher.hset(channel_key, self.id, self.stored_redis_user)
334
		if self.user_id not in online:  # if a new tab has been opened
335
			online.append(self.user_id)
336
			online_user_names_mes = self.room_online(
337
				online,
338
				Actions.LOGIN,
339
				channel_key
340
			)
341
			self.logger.info('!! First tab, sending refresh online for all')
342
			self.publish(online_user_names_mes, channel_key)
343
		else:  # Send user names to self
344
			online_user_names_mes = self.room_online(
345
				online,
346
				Actions.REFRESH_USER,
347
				channel_key
348
			)
349
			self.logger.info('!! Second tab, retrieving online for self')
350
			self.safe_write(online_user_names_mes)
351
352
	def publish(self, message, channel=None, parsable=False):
353
		if channel is None:
354
			raise ValidationError('lolol')
355
		jsoned_mess = json.dumps(message)
356
		self.logger.debug('<%s> %s', channel, jsoned_mess)
357
		if parsable:
358
			jsoned_mess = self.encode(jsoned_mess)
359
		self.async_redis_publisher.publish(channel, jsoned_mess)
360
361
	def encode(self, message):
362
		"""
363
		Marks message with prefix to specify that
364
		it should be decoded and proccesed before sending to client
365
		@param message: message to mark
366
		@return: marked message
367
		"""
368
		return self.parsable_prefix + message
369
370
	def decode(self, message):
371
		"""
372
		Check if message should be proccessed by server before writing to client
373
		@param message: message to check
374
		@return: Object structure of message if it should be processed, None if not
375
		"""
376
		if message.startswith(self.parsable_prefix):
377
			return json.loads(message[1:])
378
379
	def new_message(self, message):
380
		data = message.body
381
		if type(data) is not int:  # subscribe event
382
			decoded = self.decode(data)
383
			if decoded:
384
				data = decoded
385
			self.safe_write(data)
386
			if decoded:
387
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
388
389
	def safe_write(self, message):
390
		raise NotImplementedError('WebSocketHandler implements')
391
392
	def process_send_message(self, message):
393
		"""
394
		:type message: dict
395
		"""
396
		channel = message[VarNames.CHANNEL]
397
		message_db = Message(
398
			sender_id=self.user_id,
399
			content=message[VarNames.CONTENT]
400
		)
401
		channel_id = RedisPrefix.extract_id(channel)
402
		message_db.room_id = channel_id
403
		if VarNames.IMG in message:
404
			message_db.img = extract_photo(message[VarNames.IMG])
405
		self.do_db(message_db.save)  # exit on hacked id with exception
406
		prepared_message = self.create_send_message(message_db)
407
		self.publish(prepared_message, channel)
408
409
	def process_call(self, in_message):
410
		"""
411
		:type in_message: dict
412
		"""
413
		call_type = in_message.get(VarNames.CALL_TYPE)
414
		set_opponent_channel = False
415
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
416
		if call_type == CallType.OFFER:
417
			to_channel = in_message[VarNames.CHANNEL]
418
			room_id = RedisPrefix.extract_id(to_channel)
419
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
420
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
421
			set_opponent_channel = True
422
			out_message[VarNames.CHANNEL] = to_channel
423
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
424
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
425
426
	def create_new_room(self, message):
427
		room_name = message[VarNames.ROOM_NAME]
428
		if not room_name or len(room_name) > 16:
429
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
430
		room = Room(name=room_name)
431
		room.save()
432
		room.users.add(self.user_id)
433
		room.save()
434
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
435
		self.publish(subscribe_message, self.channel, True)
436
437
	def invite_user(self, message):
438
		room_id = message[VarNames.ROOM_ID]
439
		user_id = message[VarNames.USER_ID]
440
		channel = RedisPrefix.generate_room(room_id)
441
		if channel not in self.channels:
442
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
443
		room = Room.objects.get(id=room_id)
444
		if room.is_private:
445
			raise ValidationError("You can't add users to direct room, create a new room instead")
446
		try:
447
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
448
		except IntegrityError:
449
			raise ValidationError("User is already in channel")
450
		users_in_room = {}
451
		for user in room.users.all():
452
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
453
		self.publish(self.add_user_to_room(channel, user_id, users_in_room[user_id]), channel)
454
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
455
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
456
457
	def create_user_channel(self, message):
458
		user_id = message[VarNames.USER_ID]
459
		cursor = connection.cursor()
460
		cursor.execute(GET_DIRECT_ROOM_ID, [self.user_id, user_id])
461
		query_res = cursor.fetchall()
462
		if len(query_res) > 0:
463
			result = query_res[0]
464
			room_id = result[0]
465
			disabled = result[1]
466
			if disabled is None:
467
				raise ValidationError('This room already exist')
468
			else:
469
				Room.objects.filter(id=room_id).update(disabled=None)
470
		else:
471
			room = Room()
472
			room.save()
473
			room.users.add(self.user_id, user_id)
474
			room.save()
475
			room_id = room.id
476
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
477
		self.publish(subscribe_message, self.channel, True)
478
		other_channel = RedisPrefix.generate_user(user_id)
479
		if self.channel != other_channel:
480
			self.publish(subscribe_message, other_channel, True)
481
482
	def delete_channel(self, message):
483
		room_id = message[VarNames.ROOM_ID]
484
		channel = RedisPrefix.generate_room(room_id)
485
		if channel not in self.channels or room_id == ALL_ROOM_ID:
486
			raise ValidationError('You are not allowed to delete this room')
487
		room = Room.objects.get(id=room_id)
488
		if room.disabled is not None:
489
			raise ValidationError('Room is already deleted')
490
		if room.name is None:  # if private then disable
491
			room.disabled = True
492
		else: # if public -> leave the room, delete the link
493
			room.users.remove(self.user_id)
494
			online = self.get_online_from_redis(channel)
495
			online.remove(self.user_id)
496
			self.publish(self.room_online(online, Actions.LOGOUT, channel), channel)
497
		room.save()
498
		message = self.unsubscribe_direct_message(room_id)
499
		self.publish(message, channel, True)
500
501
	def send_client_new_channel(self, message):
502
		room_id = message[VarNames.ROOM_ID]
503
		channel = RedisPrefix.generate_room(room_id)
504
		self.add_channel(channel)
505
		self.add_online_user(room_id)# TODO doesnt work if already subscribed
506
507
	def set_opponent_call_channel(self, message):
508
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
509
510
	def send_client_delete_channel(self, message):
511
		room_id = message[VarNames.ROOM_ID]
512
		channel = RedisPrefix.generate_room(room_id)
513
		self.async_redis.unsubscribe(channel)
514
		self.async_redis_publisher.hdel(channel, self.id)
515
		self.channels.remove(channel)
516
517
	def process_get_messages(self, data):
518
		"""
519
		:type data: dict
520
		"""
521
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
522
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
523
		channel = data[VarNames.CHANNEL]
524
		room_id = RedisPrefix.extract_id(channel)
525
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
526
		if header_id is None:
527
			messages = Message.objects.filter(Q(room_id=room_id)).order_by('-pk')[:count]
528
		else:
529
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id)).order_by('-pk')[:count]
530
		response = self.do_db(self.get_messages, messages, channel)
531
		self.safe_write(response)
532
533
	def get_users_in_current_user_rooms(self):
534
		"""
535
		{
536
			"ROOM_ID:1": {
537
				"name": "All",
538
				"users": {
539
					"USER_ID:admin": {
540
						"name": "USER_NAME:admin",
541
						"sex": "SEX:Secret"
542
					},
543
					"USER_ID_2": {
544
						"name": "USER_NAME:Mike",
545
						"sex": "Male"
546
					}
547
				},
548
				"isPrivate": true
549
			}
550
		}
551
		"""
552
		cursor = connection.cursor()
553
		cursor.execute(USER_ROOMS_QUERY, [self.user_id])
554
		query_res = cursor.fetchall()
555
		res = {}
556
		for user in query_res:
557
			user_id = user[0]
558
			user_name = user[1]
559
			user_sex = user[2]
560
			room_id = user[3]
561
			room_name = user[4]
562
			if room_id not in res:
563
				res[room_id] = {
564
					VarNames.ROOM_NAME: room_name,
565
					VarNames.ROOM_USERS: {}
566
				}
567
			self.set_js_user_structure(res[room_id][VarNames.ROOM_USERS], user_id, user_name, user_sex)
568
		return res
569
570
	def set_js_user_structure(self, user_dict, user_id, name, sex):
571
		user_dict[user_id] = {
572
			VarNames.USER: name,
573
			VarNames.GENDER: GENDERS[sex]
574
		}
575
576
	def save_ip(self):
577
		if (self.do_db(UserJoinedInfo.objects.filter(
578
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
579
			return
580
		ip_address = self.get_or_create_ip()
581
		UserJoinedInfo.objects.create(
582
			ip=ip_address,
583
			user_id=self.user_id
584
		)
585
586
	def get_or_create_ip(self):
587
		try:
588
			ip_address = IpAddress.objects.get(ip=self.ip)
589
		except IpAddress.DoesNotExist:
590
			try:
591
				if not api_url:
592
					raise Exception('api url is absent')
593
				self.logger.debug("Creating ip record %s", self.ip)
594
				f = urlopen(api_url % self.ip)
595
				raw_response = f.read().decode("utf-8")
596
				response = json.loads(raw_response)
597
				if response['status'] != "success":
598
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
599
				ip_address = IpAddress.objects.create(
600
					ip=self.ip,
601
					isp=response['isp'],
602
					country=response['country'],
603
					region=response['regionName'],
604
					city=response['city'],
605
					country_code=response['countryCode']
606
				)
607
			except Exception as e:
608
				self.logger.error("Error while creating ip with country info, because %s", e)
609
				ip_address = IpAddress.objects.create(ip=self.ip)
610
		return ip_address
611
612
613
class AntiSpam(object):
614
615
	def __init__(self):
616
		self.spammed = 0
617
		self.info = {}
618
619
	def check_spam(self, json_message):
620
		message_length = len(json_message)
621
		info_key = int(round(time.time() * 100))
622
		self.info[info_key] = message_length
623
		if message_length > MAX_MESSAGE_SIZE:
624
			self.spammed += 1
625
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
626
		self.check_timed_spam()
627
628
	def check_timed_spam(self):
629
		# TODO implement me
630
		pass
631
		# raise ValidationError("You're chatting too much, calm down a bit!")
632
633
634
class TornadoHandler(WebSocketHandler, MessagesHandler):
635
636
	def __init__(self, *args, **kwargs):
637
		super(TornadoHandler, self).__init__(*args, **kwargs)
638
		self.connected = False
639
		self.anti_spam = AntiSpam()
640
641
	def data_received(self, chunk):
642
		pass
643
644
	def on_message(self, json_message):
645
		try:
646
			if not self.connected:
647
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
648
			if not json_message:
649
				raise ValidationError('Skipping null message')
650
			# self.anti_spam.check_spam(json_message)
651
			self.logger.debug('<< %s', json_message)
652
			message = json.loads(json_message)
653
			if message[VarNames.EVENT] not in self.pre_process_message:
654
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
655
			channel = message.get(VarNames.CHANNEL)
656
			if channel and channel not in self.channels:
657
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
658
			self.pre_process_message[message[VarNames.EVENT]](message)
659
		except ValidationError as e:
660
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
661
			self.safe_write(error_message)
662
663
	def on_close(self):
664
		if self.async_redis.subscribed:
665
			self.async_redis.unsubscribe(self.channels)
666
		log_data = {}
667
		for channel in self.channels:
668
			if channel.startswith(RedisPrefix.ROOM_CHANNEL_PREFIX):
669
				self.sync_redis.hdel(channel, self.id)
670
				if self.connected:
671
					# seems like async solves problem with connection lost and wrong data status
672
					# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
673
					online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
674
					log_data[channel] = {'online': online, 'is_online': is_online}
675
					if not is_online:
676
						message = self.room_online(online, Actions.LOGOUT, channel)
677
						self.publish(message, channel)
678
		self.logger.info("Close connection result: %s", json.dumps(log_data))
679
		self.async_redis.disconnect()
680
681
	def open(self):
682
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
683
		if sessionStore.exists(session_key):
684
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
685
			self.ip = self.get_client_ip()
686
			session = SessionStore(session_key)
687
			self.user_id = int(session["_auth_user_id"])
688
			log_params = {
689
				'user_id': str(self.user_id).zfill(3),
690
				'id': self.log_id,
691
				'ip': self.ip
692
			}
693
			self.logger = logging.LoggerAdapter(logger, log_params)
694
			self.async_redis.connect()
695
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
696
			self.sender_name = user_db.username
697
			self.sex = user_db.sex_str
698
			user_rooms = self.get_users_in_current_user_rooms()
699
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
700
			self.channels.clear()
701
			self.channels.append(self.channel)
702
			for room_id in user_rooms:
703
				self.channels.append(RedisPrefix.generate_room(room_id))
704
			self.listen(self.channels)
705
			for room_id in user_rooms:
706
				self.add_online_user(room_id)
707
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
708
			self.connected = True
709
			Thread(target=self.save_ip).start()
710
		else:
711
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
712
			self.close(403, "Session key %s has been rejected" % session_key)
713
714
	def check_origin(self, origin):
715
		"""
716
		check whether browser set domain matches origin
717
		"""
718
		parsed_origin = urlparse(origin)
719
		origin = parsed_origin.netloc
720
		origin_domain = origin.split(':')[0].lower()
721
		browser_set = self.request.headers.get("Host")
722
		browser_domain = browser_set.split(':')[0]
723
		return browser_domain == origin_domain
724
725
	def safe_write(self, message):
726
		"""
727
		Tries to send message, doesn't throw exception outside
728
		:type self: MessagesHandler
729
		"""
730
		# self.logger.debug('<< THREAD %s >>', os.getppid())
731
		try:
732
			if isinstance(message, dict):
733
				message = json.dumps(message)
734
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
735
				raise ValueError('Wrong message type : %s' % str(message))
736
			self.logger.debug(">> %s", message)
737
			self.write_message(message)
738
		except tornado.websocket.WebSocketClosedError as e:
739
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
740
741
	def get_client_ip(self):
742
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
743