Completed
Push — master ( 1b17de...c38f1d )
by Andrew
02:33
created

MessagesHandler.decode()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 8
rs 9.4285
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, USER_ROOMS_QUERY, GENDERS, GET_DIRECT_ROOM_ID
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo
30
31
PY3 = sys.version > '3'
32
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions:
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	SYSTEM_MESSAGE = 'system'
54
	GROWL_MESSAGE = 'growl'
55
	GET_MESSAGES = 'messages'
56
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
57
	DELETE_ROOM = 'deleteRoom'
58
	CREATE_ROOM_CHANNEL = 'addRoom'
59
	INVITE_USER = 'inviteUser'
60
	ADD_USER = 'addUserToAll'
61
62
63
class VarNames:
64
	RECEIVER_ID = 'receiverId'
65
	RECEIVER_NAME = 'receiverName'
66
	CALL_TYPE = 'type'
67
	USER = 'user'
68
	USER_ID =  'userId'
69
	TIME = 'time'
70
	CONTENT = 'content'
71
	IMG = 'image'
72
	EVENT = 'action'
73
	MESSAGE_ID = 'id'
74
	GENDER = 'sex'
75
	ROOM_NAME = 'name'
76
	ROOM_ID = 'roomId'
77
	ROOM_USERS = 'users'
78
	CHANNEL = 'channel'
79
	CHANNEL_NAME = 'channel'
80
	IS_ROOM_PRIVATE = 'private'
81
	#ROOM_NAME = 'roomName'
82
	# ROOM_ID = 'roomId'
83
84
85
class HandlerNames:
86
	NAME = 'handler'
87
	CHANNELS = 'channels'
88
	CHAT = 'chat'
89
	GROWL = 'growl'
90
	WEBRTC = 'webrtc'
91
92
93
class RedisPrefix:
94
	USER_ID_CHANNEL_PREFIX = 'u'
95
	ROOM_CHANNEL_PREFIX = 'r'
96
	__ROOM_ONLINE__ = 'o:{}'
97
98
	@classmethod
99
	def generate_user(cls, key):
100
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
101
102
	@classmethod
103
	def generate_room(cls, key):
104
		return cls.ROOM_CHANNEL_PREFIX + str(key)
105
106
RedisPrefix.DEFAULT_CHANNEL = RedisPrefix.generate_room(ALL_ROOM_ID)
107
108
109
class MessagesCreator(object):
110
111
	def __init__(self, *args, **kwargs):
112
		super(MessagesCreator, self).__init__(*args, **kwargs)
113
		self.sex = None
114
		self.sender_name = None
115
		self.user_id = 0  # anonymous by default
116
117
	def default(self, content, event, handler):
118
		"""
119
		:return: {"action": event, "content": content, "time": "20:48:57"}
120
		"""
121
		return {
122
			VarNames.EVENT: event,
123
			VarNames.CONTENT: content,
124
			VarNames.USER_ID: self.user_id,
125
			VarNames.TIME: get_milliseconds(),
126
			HandlerNames.NAME: handler
127
		}
128
129
	def room_online(self, online, event, channel):
130
		"""
131
		:return: {"action": event, "content": content, "time": "20:48:57"}
132
		"""
133
		room_less = self.default(online, event, HandlerNames.CHAT)
134
		room_less[VarNames.CHANNEL_NAME] = channel
135
		room_less[VarNames.USER] = self.sender_name
136
		room_less[VarNames.GENDER] = self.sex
137
		return room_less
138
139
	def offer_call(self, content, message_type):
140
		"""
141
		:return: {"action": "call", "content": content, "time": "20:48:57"}
142
		"""
143
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
144
		message[VarNames.CALL_TYPE] = message_type
145
		return message
146
147
	@classmethod
148
	def create_send_message(cls, message):
149
		"""
150
		:param message:
151
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
152
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
153
		"""
154
		if message.receiver_id:
155
			channel = RedisPrefix.generate_user(message.receiver_id)
156
		elif message.room_id:
157
			channel = RedisPrefix.generate_room(message.room_id)
158
		else:
159
			raise ValidationError('Channel is none')
160
		result = {
161
			VarNames.USER_ID: message.sender_id,
162
			VarNames.CONTENT: message.content,
163
			VarNames.TIME: message.time,
164
			VarNames.MESSAGE_ID: message.id,
165
			VarNames.EVENT: Actions.PRINT_MESSAGE,
166
			VarNames.CHANNEL: channel,
167
			HandlerNames.NAME: HandlerNames.CHAT
168
		}
169
		if message.img.name:
170
			result[VarNames.IMG] = message.img.url
171
		if message.receiver_id:
172
			result[VarNames.RECEIVER_ID] = message.receiver.id
173
			result[VarNames.RECEIVER_NAME] = message.receiver.username
174
		return result
175
176
	@classmethod
177
	def get_messages(cls, messages):
178
		"""
179
		:type messages: list[Messages]
180
		:type messages: QuerySet[Messages]
181
		"""
182
		return {
183
			VarNames.CONTENT: [cls.create_send_message(message) for message in messages],
184
			VarNames.EVENT: Actions.GET_MESSAGES
185
		}
186
187
	@property
188
	def stored_redis_user(self):
189
		return  self.user_id
190
191
	@property
192
	def channel(self):
193
		return RedisPrefix.generate_user(self.user_id)
194
195
	def subscribe_direct_channel_message(self, room_id, other_user_id):
196
		return {
197
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
198
			VarNames.ROOM_ID: room_id,
199
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
200
			HandlerNames.NAME: HandlerNames.CHANNELS
201
		}
202
203
	def subscribe_room_channel_message(self, room_id, room_name):
204
		return {
205
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
206
			VarNames.ROOM_ID: room_id,
207
			VarNames.ROOM_USERS: [self.user_id],
208
			HandlerNames.NAME: HandlerNames.CHANNELS,
209
			VarNames.ROOM_NAME: room_name
210
		}
211
212
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
213
		return {
214
			VarNames.EVENT: Actions.INVITE_USER,
215
			VarNames.ROOM_ID: room_id,
216
			VarNames.USER_ID: user_id,
217
			HandlerNames.NAME: HandlerNames.CHANNELS,
218
			VarNames.ROOM_NAME: room_name,
219
			VarNames.CONTENT: users
220
		}
221
222
	def add_user_to_room(self, channel, user_id, content):
223
		return {
224
			VarNames.EVENT: Actions.ADD_USER,
225
			VarNames.CHANNEL: channel,
226
			VarNames.USER_ID: user_id,
227
			HandlerNames.NAME: HandlerNames.CHAT,
228
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
229
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
230
		}
231
232
	def unsubscribe_direct_message(self, room_id):
233
		return {
234
			VarNames.EVENT: Actions.DELETE_ROOM,
235
			VarNames.ROOM_ID: room_id,
236
			VarNames.USER_ID: self.user_id,
237
			HandlerNames.NAME: HandlerNames.CHANNELS,
238
			VarNames.TIME: get_milliseconds()
239
		}
240
241
242
class MessagesHandler(MessagesCreator):
243
244
	def __init__(self, *args, **kwargs):
245
		self.parsable_prefix = 'p'
246
		super(MessagesHandler, self).__init__(*args, **kwargs)
247
		self.id = id(self)
248
		self.log_id = str(self.id % 10000).rjust(4, '0')
249
		self.ip = None
250
		log_params = {
251
			'user_id': '000',
252
			'id': self.log_id,
253
			'ip': 'initializing'
254
		}
255
		from chat import global_redis
256
		self.async_redis_publisher = global_redis.async_redis_publisher
257
		self.sync_redis = global_redis.sync_redis
258
		self.channels = []
259
		self.logger = logging.LoggerAdapter(logger, log_params)
260
		self.async_redis = tornadoredis.Client()
261
		self.pre_process_message = {
262
			Actions.GET_MESSAGES: self.process_get_messages,
263
			Actions.SEND_MESSAGE: self.process_send_message,
264
			Actions.CALL: self.process_call,
265
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
266
			Actions.DELETE_ROOM: self.delete_channel,
267
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
268
			Actions.INVITE_USER: self.invite_user,
269
		}
270
		self.post_process_message = {
271
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
272
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
273
			Actions.DELETE_ROOM: self.send_client_delete_channel,
274
			Actions.INVITE_USER: self.send_client_new_channel
275
		}
276
277
	@tornado.gen.engine
278
	def listen(self, channels):
279
		yield tornado.gen.Task(
280
			self.async_redis.subscribe, channels)
281
		self.async_redis.listen(self.new_message)
282
283
	@tornado.gen.engine
284
	def add_channel(self, channel):
285
		self.channels.append(channel)
286
		yield tornado.gen.Task(
287
			self.async_redis.subscribe, channel)
288
289
	def do_db(self, callback, *arg, **args):
290
		try:
291
			return callback(*arg, **args)
292
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
293
			self.logger.warning('%s, reconnecting' % e)  # TODO
294
			connection.close()
295
			return callback(*arg, **args)
296
297
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
298
		"""
299
		:rtype : dict
300
		returns (dict, bool) if check_type is present
301
		"""
302
		online = self.sync_redis.hgetall(channel)
303
		self.logger.debug('!! redis online: %s', online)
304
		result = set()
305
		user_is_online = False
306
		# redis stores REDIS_USER_FORMAT, so parse them
307
		if online:
308
			for key_hash, raw_user_id in online.items():  # py2 iteritems
309
				user_id = int(raw_user_id.decode('utf-8'))
310
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
311
					user_is_online = True
312
				result.add(user_id)
313
		result = list(result)
314
		return (result, user_is_online) if check_user_id else result
315
316
	def add_online_user(self, room_id):
317
		"""
318
		adds to redis
319
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
320
		:return:
321
		"""
322
		channel_key = RedisPrefix.generate_room(room_id)
323
		online = self.get_online_from_redis(channel_key)
324
		self.async_redis_publisher.hset(channel_key, self.id, self.stored_redis_user)
325
		if self.user_id not in online:  # if a new tab has been opened
326
			online.append(self.user_id)
327
			online_user_names_mes = self.room_online(
328
				online,
329
				Actions.LOGIN,
330
				channel_key
331
			)
332
			self.logger.info('!! First tab, sending refresh online for all')
333
			self.publish(online_user_names_mes, channel_key)
334
		else:  # Send user names to self
335
			online_user_names_mes = self.room_online(
336
				online,
337
				Actions.REFRESH_USER,
338
				channel_key
339
			)
340
			self.logger.info('!! Second tab, retrieving online for self')
341
			self.safe_write(online_user_names_mes)
342
343
	def publish(self, message, channel=None, parsable=False):
344
		if channel is None:
345
			raise ValidationError('lolol')
346
		jsoned_mess = json.dumps(message)
347
		self.logger.debug('<%s> %s', channel, jsoned_mess)
348
		if parsable:
349
			jsoned_mess = self.encode(jsoned_mess)
350
		self.async_redis_publisher.publish(channel, jsoned_mess)
351
352
	def encode(self, message):
353
		"""
354
		Marks message with prefix to specify that
355
		it should be decoded and proccesed before sending to client
356
		@param message: message to mark
357
		@return: marked message
358
		"""
359
		return self.parsable_prefix + message
360
361
	def decode(self, message):
362
		"""
363
		Check if message should be proccessed by server before writing to client
364
		@param message: message to check
365
		@return: Object structure of message if it should be processed, None if not
366
		"""
367
		if message.startswith(self.parsable_prefix):
368
			return json.loads(message[1:])
369
370
	def new_message(self, message):
371
		data = message.body
372
		if type(data) is not int:  # subscribe event
373
			decoded = self.decode(data)
374
			if decoded:
375
				data = decoded
376
			self.safe_write(data)
377
			if decoded:
378
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
379
380
	def safe_write(self, message):
381
		raise NotImplementedError('WebSocketHandler implements')
382
383
	def process_send_message(self, message):
384
		"""
385
		:type message: dict
386
		"""
387
		content = message[VarNames.CONTENT]
388
		receiver_id = message.get(VarNames.RECEIVER_ID)  # if receiver_id is None then its a private message
389
		self.logger.info('!! Sending message %s to user with id %s', content, receiver_id)
390
		channel = message[VarNames.CHANNEL]
391
		message_db = Message(
392
			sender_id=self.user_id,
393
			content=content
394
		)
395
		channel_id = int(channel[1:])
396
		if channel.startswith(RedisPrefix.USER_ID_CHANNEL_PREFIX):
397
			message_db.receiver_id = channel_id
398
		elif channel.startswith(RedisPrefix.ROOM_CHANNEL_PREFIX) and channel in self.channels:
399
			message_db.room_id = channel_id
400
		else:
401
			raise ValidationError('Access denied for channel {}'.format(channel))
402
		if VarNames.IMG in message:
403
			message_db.img = extract_photo(message[VarNames.IMG])
404
		self.do_db(message_db.save)  # exit on hacked id with exception
405
		prepared_message = self.create_send_message(message_db)
406
		if message_db.receiver_id is None:
407
			self.logger.debug('!! Detected as public')
408
			self.publish(prepared_message, channel)
409
		else:
410
			self.publish(prepared_message, self.channel)
411
			self.logger.debug('!! Detected as private, channel %s', channel)
412
			if channel != self.channel:
413
				self.publish(prepared_message, channel)
414
415
	def process_call(self, message):
416
		"""
417
		:type message: dict
418
		"""
419
		receiver_id = message.get(VarNames.RECEIVER_ID)  # if receiver_id is None then its a private message
420
		self.logger.info('!! Offering a call to user with id %s',  receiver_id)
421
		message = self.offer_call(message.get(VarNames.CONTENT), message.get(VarNames.CALL_TYPE))
422
		self.publish(message, RedisPrefix.generate_user(receiver_id))
423
424
	def create_new_room(self, message):
425
		room_name = message[VarNames.ROOM_NAME]
426
		if not room_name or len(room_name) > 16:
427
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
428
		room = Room(name=room_name)
429
		room.save()
430
		room.users.add(self.user_id)
431
		room.save()
432
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
433
		self.publish(subscribe_message, self.channel, True)
434
435
	def invite_user(self, message):
436
		print('asd')
437
		room_id = message[VarNames.ROOM_ID]
438
		user_id = message[VarNames.USER_ID]
439
		channel = RedisPrefix.generate_room(room_id)
440
		if channel not in self.channels:
441
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
442
		room = Room.objects.get(id=room_id)
443
		if room.is_private:
444
			raise ValidationError("You can't add users to direct room, create a new room instead")
445
		try:
446
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
447
		except IntegrityError:
448
			raise ValidationError("User is already in channel")
449
		users_in_room = {}
450
		for user in room.users.all():
451
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
452
		self.publish(self.add_user_to_room(channel, user_id, users_in_room[user_id]), channel)
453
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
454
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
455
456
	def create_user_channel(self, message):
457
		user_id = message[VarNames.USER_ID]
458
		cursor = connection.cursor()
459
		cursor.execute(GET_DIRECT_ROOM_ID, [self.user_id, user_id])
460
		query_res = cursor.fetchall()
461
		if len(query_res) > 0:
462
			result = query_res[0]
463
			room_id = result[0]
464
			disabled = result[1]
465
			if disabled is None:
466
				raise ValidationError('This room already exist')
467
			else:
468
				Room.objects.filter(id=room_id).update(disabled=None)
469
		else:
470
			room = Room()
471
			room.save()
472
			room.users.add(self.user_id, user_id)
473
			room.save()
474
			room_id = room.id
475
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
476
		self.publish(subscribe_message, self.channel, True)
477
		other_channel = RedisPrefix.generate_user(user_id)
478
		if self.channel != other_channel:
479
			self.publish(subscribe_message, other_channel, True)
480
481
	def delete_channel(self, message):
482
		room_id = message[VarNames.ROOM_ID]
483
		channel = RedisPrefix.generate_room(room_id)
484
		if channel not in self.channels or room_id == ALL_ROOM_ID:
485
			raise ValidationError('You are not allowed to delete this room')
486
		room = Room.objects.get(id=room_id)
487
		if room.disabled is not None:
488
			raise ValidationError('Room is already deleted')
489
		if room.name is None:  # if private then disable
490
			room.disabled = True
491
		else: # if public -> leave the room, delete the link
492
			room.users.remove(self.user_id)
493
			online = self.get_online_from_redis(channel)
494
			online.remove(self.user_id)
495
			self.publish(self.room_online(online, Actions.LOGOUT, channel), channel)
496
		room.save()
497
		message = self.unsubscribe_direct_message(room_id)
498
		self.publish(message, channel, True)
499
500
	def send_client_new_channel(self, message):
501
		room_id = message[VarNames.ROOM_ID]
502
		channel = RedisPrefix.generate_room(room_id)
503
		self.add_channel(channel)
504
		self.add_online_user(room_id)# TODO doesnt work if already subscribed
505
506
	def send_client_delete_channel(self, message):
507
		room_id = message[VarNames.ROOM_ID]
508
		channel = RedisPrefix.generate_room(room_id)
509
		self.async_redis.unsubscribe(channel)
510
		self.async_redis_publisher.hdel(channel, self.id)
511
		self.channels.remove(channel)
512
513
	def process_get_messages(self, data):
514
		"""
515
		:type data: dict
516
		"""
517
		header_id = data.get('headerId', None)
518
		count = int(data.get('count', 10))
519
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
520
		if header_id is None:
521
			messages = Message.objects.filter(
522
				# Only public or private or private
523
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
524
			).order_by('-pk')[:count]
525
		else:
526
			messages = Message.objects.filter(
527
				Q(id__lt=header_id),
528
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
529
			).order_by('-pk')[:count]
530
		response = self.do_db(self.get_messages, messages)
531
		self.safe_write(response)
532
533
	def get_users_in_current_user_rooms(self):
534
		"""
535
		{
536
			"ROOM_ID:1": {
537
				"name": "All",
538
				"users": {
539
					"USER_ID:admin": {
540
						"name": "USER_NAME:admin",
541
						"sex": "SEX:Secret"
542
					},
543
					"USER_ID_2": {
544
						"name": "USER_NAME:Mike",
545
						"sex": "Male"
546
					}
547
				},
548
				"isPrivate": true
549
			}
550
		}
551
		"""
552
		cursor = connection.cursor()
553
		cursor.execute(USER_ROOMS_QUERY, [self.user_id])
554
		query_res = cursor.fetchall()
555
		res = {}
556
		for user in query_res:
557
			user_id = user[0]
558
			user_name = user[1]
559
			user_sex = user[2]
560
			room_id = user[3]
561
			room_name = user[4]
562
			if room_id not in res:
563
				res[room_id] = {
564
					VarNames.ROOM_NAME: room_name,
565
					VarNames.ROOM_USERS: {}
566
				}
567
			self.set_js_user_structure(res[room_id][VarNames.ROOM_USERS], user_id, user_name, user_sex)
568
		return res
569
570
	def set_js_user_structure(self, user_dict, user_id, name, sex):
571
		user_dict[user_id] = {
572
			VarNames.USER: name,
573
			VarNames.GENDER: GENDERS[sex]
574
		}
575
576
	def save_ip(self):
577
		if (self.do_db(UserJoinedInfo.objects.filter(
578
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
579
			return
580
		ip_address = self.get_or_create_ip()
581
		UserJoinedInfo.objects.create(
582
			ip=ip_address,
583
			user_id=self.user_id
584
		)
585
586
	def get_or_create_ip(self):
587
		try:
588
			ip_address = IpAddress.objects.get(ip=self.ip)
589
		except IpAddress.DoesNotExist:
590
			try:
591
				if not api_url:
592
					raise Exception('api url is absent')
593
				self.logger.debug("Creating ip record %s", self.ip)
594
				f = urlopen(api_url % self.ip)
595
				raw_response = f.read().decode("utf-8")
596
				response = json.loads(raw_response)
597
				if response['status'] != "success":
598
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
599
				ip_address = IpAddress.objects.create(
600
					ip=self.ip,
601
					isp=response['isp'],
602
					country=response['country'],
603
					region=response['regionName'],
604
					city=response['city'],
605
					country_code=response['countryCode']
606
				)
607
			except Exception as e:
608
				self.logger.error("Error while creating ip with country info, because %s", e)
609
				ip_address = IpAddress.objects.create(ip=self.ip)
610
		return ip_address
611
612
613
class AntiSpam(object):
614
615
	def __init__(self):
616
		self.spammed = 0
617
		self.info = {}
618
619
	def check_spam(self, json_message):
620
		message_length = len(json_message)
621
		info_key = int(round(time.time() * 100))
622
		self.info[info_key] = message_length
623
		if message_length > MAX_MESSAGE_SIZE:
624
			self.spammed += 1
625
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
626
		self.check_timed_spam()
627
628
	def check_timed_spam(self):
629
		# TODO implement me
630
		pass
631
		# raise ValidationError("You're chatting too much, calm down a bit!")
632
633
634
class TornadoHandler(WebSocketHandler, MessagesHandler):
635
636
	def __init__(self, *args, **kwargs):
637
		super(TornadoHandler, self).__init__(*args, **kwargs)
638
		self.connected = False
639
		self.anti_spam = AntiSpam()
640
641
	def data_received(self, chunk):
642
		pass
643
644
	def on_message(self, json_message):
645
		try:
646
			if not self.connected:
647
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
648
			if not json_message:
649
				raise ValidationError('Skipping null message')
650
			# self.anti_spam.check_spam(json_message)
651
			self.logger.debug('<< %s', json_message)
652
			message = json.loads(json_message)
653
			if message[VarNames.EVENT] not in self.pre_process_message:
654
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
655
			self.pre_process_message[message[VarNames.EVENT]](message)
656
		except ValidationError as e:
657
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
658
			self.safe_write(error_message)
659
660
	def on_close(self):
661
		if self.async_redis.subscribed:
662
			self.async_redis.unsubscribe(self.channels)
663
		log_data = {}
664
		for channel in self.channels:
665
			if channel.startswith(RedisPrefix.ROOM_CHANNEL_PREFIX):
666
				self.sync_redis.hdel(channel, self.id)
667
				if self.connected:
668
					# seems like async solves problem with connection lost and wrong data status
669
					# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
670
					online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
671
					log_data[channel] = {'online': online, 'is_online': is_online}
672
					if not is_online:
673
						message = self.room_online(online, Actions.LOGOUT, channel)
674
						self.publish(message, channel)
675
		self.logger.info("Close connection result: %s", json.dumps(log_data))
676
		self.async_redis.disconnect()
677
678
	def open(self):
679
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
680
		if sessionStore.exists(session_key):
681
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
682
			self.ip = self.get_client_ip()
683
			session = SessionStore(session_key)
684
			self.user_id = int(session["_auth_user_id"])
685
			log_params = {
686
				'user_id': str(self.user_id).zfill(3),
687
				'id': self.log_id,
688
				'ip': self.ip
689
			}
690
			self.logger = logging.LoggerAdapter(logger, log_params)
691
			self.async_redis.connect()
692
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
693
			self.sender_name = user_db.username
694
			self.sex = user_db.sex_str
695
			user_rooms = self.get_users_in_current_user_rooms()
696
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
697
			self.channels.clear()
698
			self.channels.append(self.channel)
699
			for room_id in user_rooms:
700
				self.channels.append(RedisPrefix.generate_room(room_id))
701
			self.listen(self.channels)
702
			for room_id in user_rooms:
703
				self.add_online_user(room_id)
704
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
705
			self.connected = True
706
			Thread(target=self.save_ip).start()
707
		else:
708
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
709
			self.close(403, "Session key %s has been rejected" % session_key)
710
711
	def check_origin(self, origin):
712
		"""
713
		check whether browser set domain matches origin
714
		"""
715
		parsed_origin = urlparse(origin)
716
		origin = parsed_origin.netloc
717
		origin_domain = origin.split(':')[0].lower()
718
		browser_set = self.request.headers.get("Host")
719
		browser_domain = browser_set.split(':')[0]
720
		return browser_domain == origin_domain
721
722
	def safe_write(self, message):
723
		"""
724
		Tries to send message, doesn't throw exception outside
725
		:type self: MessagesHandler
726
		"""
727
		# self.logger.debug('<< THREAD %s >>', os.getppid())
728
		try:
729
			if isinstance(message, dict):
730
				message = json.dumps(message)
731
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
732
				raise ValueError('Wrong message type : %s' % str(message))
733
			self.logger.debug(">> %s", message)
734
			self.write_message(message)
735
		except tornado.websocket.WebSocketClosedError as e:
736
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
737
738
	def get_client_ip(self):
739
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
740