Completed
Push — master ( a6f716...7bed65 )
by Andrew
01:33
created

MessagesCreator.add_user_to_room()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 8
rs 9.4285
c 0
b 0
f 0
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, USER_ROOMS_QUERY, GENDERS, GET_DIRECT_ROOM_ID
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo
30
31
PY3 = sys.version > '3'
32
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions:
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	SYSTEM_MESSAGE = 'system'
54
	GROWL_MESSAGE = 'growl'
55
	GET_MESSAGES = 'messages'
56
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
57
	DELETE_ROOM = 'deleteRoom'
58
	CREATE_ROOM_CHANNEL = 'addRoom'
59
	INVITE_USER = 'inviteUser'
60
	ADD_USER = 'addUserToAll'
61
62
63
class VarNames:
64
	RECEIVER_ID = 'receiverId'
65
	RECEIVER_NAME = 'receiverName'
66
	CALL_TYPE = 'type'
67
	USER = 'user'
68
	USER_ID =  'userId'
69
	TIME = 'time'
70
	CONTENT = 'content'
71
	IMG = 'image'
72
	EVENT = 'action'
73
	MESSAGE_ID = 'id'
74
	GENDER = 'sex'
75
	ROOM_NAME = 'name'
76
	ROOM_ID = 'roomId'
77
	ROOM_USERS = 'users'
78
	CHANNEL = 'channel'
79
	CHANNEL_NAME = 'channel'
80
	IS_ROOM_PRIVATE = 'private'
81
	#ROOM_NAME = 'roomName'
82
	# ROOM_ID = 'roomId'
83
84
85
class HandlerNames:
86
	NAME = 'handler'
87
	CHANNELS = 'channels'
88
	CHAT = 'chat'
89
	GROWL = 'growl'
90
	WEBRTC = 'webrtc'
91
92
93
class RedisPrefix:
94
	USER_ID_CHANNEL_PREFIX = 'u'
95
	ROOM_CHANNEL_PREFIX = 'r'
96
	__ROOM_ONLINE__ = 'o:{}'
97
98
	@classmethod
99
	def generate_user(cls, key):
100
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
101
102
	@classmethod
103
	def generate_room(cls, key):
104
		return cls.ROOM_CHANNEL_PREFIX + str(key)
105
106
RedisPrefix.DEFAULT_CHANNEL = RedisPrefix.generate_room(ALL_ROOM_ID)
107
108
109
class MessagesCreator(object):
110
111
	def __init__(self, *args, **kwargs):
112
		super(MessagesCreator, self).__init__(*args, **kwargs)
113
		self.sex = None
114
		self.sender_name = None
115
		self.user_id = 0  # anonymous by default
116
117
	def default(self, content, event, handler):
118
		"""
119
		:return: {"action": event, "content": content, "time": "20:48:57"}
120
		"""
121
		return {
122
			VarNames.EVENT: event,
123
			VarNames.CONTENT: content,
124
			VarNames.USER_ID: self.user_id,
125
			VarNames.TIME: get_milliseconds(),
126
			HandlerNames.NAME: handler
127
		}
128
129
	def room_online(self, online, event, channel):
130
		"""
131
		:return: {"action": event, "content": content, "time": "20:48:57"}
132
		"""
133
		room_less = self.default(online, event, HandlerNames.CHAT)
134
		room_less[VarNames.CHANNEL_NAME] = channel
135
		room_less[VarNames.USER] = self.sender_name
136
		room_less[VarNames.GENDER] = self.sex
137
		return room_less
138
139
	def offer_call(self, content, message_type):
140
		"""
141
		:return: {"action": "call", "content": content, "time": "20:48:57"}
142
		"""
143
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
144
		message[VarNames.CALL_TYPE] = message_type
145
		return message
146
147
	@classmethod
148
	def create_send_message(cls, message):
149
		"""
150
		:param message:
151
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
152
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
153
		"""
154
		if message.receiver_id:
155
			channel = RedisPrefix.generate_user(message.receiver_id)
156
		elif message.room_id:
157
			channel = RedisPrefix.generate_room(message.room_id)
158
		else:
159
			raise ValidationError('Channel is none')
160
		result = {
161
			VarNames.USER_ID: message.sender_id,
162
			VarNames.CONTENT: message.content,
163
			VarNames.TIME: message.time,
164
			VarNames.MESSAGE_ID: message.id,
165
			VarNames.EVENT: Actions.PRINT_MESSAGE,
166
			VarNames.CHANNEL: channel,
167
			HandlerNames.NAME: HandlerNames.CHAT
168
		}
169
		if message.img.name:
170
			result[VarNames.IMG] = message.img.url
171
		if message.receiver_id:
172
			result[VarNames.RECEIVER_ID] = message.receiver.id
173
			result[VarNames.RECEIVER_NAME] = message.receiver.username
174
		return result
175
176
	@classmethod
177
	def get_messages(cls, messages):
178
		"""
179
		:type messages: list[Messages]
180
		:type messages: QuerySet[Messages]
181
		"""
182
		return {
183
			VarNames.CONTENT: [cls.create_send_message(message) for message in messages],
184
			VarNames.EVENT: Actions.GET_MESSAGES
185
		}
186
187
	@property
188
	def stored_redis_user(self):
189
		return  self.user_id
190
191
	@property
192
	def channel(self):
193
		return RedisPrefix.generate_user(self.user_id)
194
195
	def subscribe_direct_channel_message(self, room_id, other_user_id):
196
		return {
197
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
198
			VarNames.ROOM_ID: room_id,
199
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
200
			HandlerNames.NAME: HandlerNames.CHANNELS
201
		}
202
203
	def subscribe_room_channel_message(self, room_id, room_name):
204
		return {
205
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
206
			VarNames.ROOM_ID: room_id,
207
			VarNames.ROOM_USERS: [self.user_id],
208
			HandlerNames.NAME: HandlerNames.CHANNELS,
209
			VarNames.ROOM_NAME: room_name
210
		}
211
212
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
213
		return {
214
			VarNames.EVENT: Actions.INVITE_USER,
215
			VarNames.ROOM_ID: room_id,
216
			VarNames.USER_ID: user_id,
217
			HandlerNames.NAME: HandlerNames.CHANNELS,
218
			VarNames.ROOM_NAME: room_name,
219
			VarNames.CONTENT: users
220
		}
221
222
	def add_user_to_room(self, channel, user_id, content):
223
		return {
224
			VarNames.EVENT: Actions.ADD_USER,
225
			VarNames.CHANNEL: channel,
226
			VarNames.USER_ID: user_id,
227
			HandlerNames.NAME: HandlerNames.CHAT,
228
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
229
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
230
		}
231
232
	def unsubscribe_direct_message(self, room_id):
233
		return {
234
			VarNames.EVENT: Actions.DELETE_ROOM,
235
			VarNames.ROOM_ID: room_id,
236
			VarNames.USER_ID: self.user_id,
237
			HandlerNames.NAME: HandlerNames.CHANNELS,
238
			VarNames.TIME: get_milliseconds()
239
		}
240
241
242
class MessagesHandler(MessagesCreator):
243
244
	def __init__(self, *args, **kwargs):
245
		self.parsable_prefix = 'p'
246
		super(MessagesHandler, self).__init__(*args, **kwargs)
247
		self.id = id(self)
248
		self.log_id = str(self.id % 10000).rjust(4, '0')
249
		self.ip = None
250
		log_params = {
251
			'user_id': '000',
252
			'id': self.log_id,
253
			'ip': 'initializing'
254
		}
255
		from chat import global_redis
256
		self.async_redis_publisher = global_redis.async_redis_publisher
257
		self.sync_redis = global_redis.sync_redis
258
		self.channels = []
259
		self.logger = logging.LoggerAdapter(logger, log_params)
260
		self.async_redis = tornadoredis.Client()
261
		self.pre_process_message = {
262
			Actions.GET_MESSAGES: self.process_get_messages,
263
			Actions.SEND_MESSAGE: self.process_send_message,
264
			Actions.CALL: self.process_call,
265
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
266
			Actions.DELETE_ROOM: self.delete_channel,
267
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
268
			Actions.INVITE_USER: self.invite_user,
269
		}
270
		self.post_process_message = {
271
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
272
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
273
			Actions.DELETE_ROOM: self.send_client_delete_channel,
274
			Actions.INVITE_USER: self.send_client_new_channel
275
		}
276
277
	@tornado.gen.engine
278
	def listen(self, channels):
279
		yield tornado.gen.Task(
280
			self.async_redis.subscribe, channels)
281
		self.async_redis.listen(self.new_message)
282
283
	@tornado.gen.engine
284
	def add_channel(self, channel):
285
		self.channels.append(channel)
286
		yield tornado.gen.Task(
287
			self.async_redis.subscribe, channel)
288
289
	def do_db(self, callback, *arg, **args):
290
		try:
291
			return callback(*arg, **args)
292
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
293
			self.logger.warning('%s, reconnecting' % e)  # TODO
294
			connection.close()
295
			return callback(*arg, **args)
296
297
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
298
		"""
299
		:rtype : dict
300
		returns (dict, bool) if check_type is present
301
		"""
302
		online = self.sync_redis.hgetall(channel)
303
		self.logger.debug('!! redis online: %s', online)
304
		result = set()
305
		user_is_online = False
306
		# redis stores REDIS_USER_FORMAT, so parse them
307
		if online:
308
			for key_hash, raw_user_id in online.items():  # py2 iteritems
309
				user_id = int(raw_user_id.decode('utf-8'))
310
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
311
					user_is_online = True
312
				result.add(user_id)
313
		result = list(result)
314
		return (result, user_is_online) if check_user_id else result
315
316
	def add_online_user(self, room_id):
317
		"""
318
		adds to redis
319
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
320
		:return:
321
		"""
322
		channel_key = RedisPrefix.generate_room(room_id)
323
		online = self.get_online_from_redis(channel_key)
324
		self.async_redis_publisher.hset(channel_key, self.id, self.stored_redis_user)
325
		if self.user_id not in online:  # if a new tab has been opened
326
			online.append(self.user_id)
327
			online_user_names_mes = self.room_online(
328
				online,
329
				Actions.LOGIN,
330
				channel_key
331
			)
332
			self.logger.info('!! First tab, sending refresh online for all')
333
			self.publish(online_user_names_mes, channel_key)
334
		else:  # Send user names to self
335
			online_user_names_mes = self.room_online(
336
				online,
337
				Actions.REFRESH_USER,
338
				channel_key
339
			)
340
			self.logger.info('!! Second tab, retrieving online for self')
341
			self.safe_write(online_user_names_mes)
342
343
	def publish(self, message, channel=None, parsable=False):
344
		if channel is None:
345
			raise ValidationError('lolol')
346
		jsoned_mess = json.dumps(message)
347
		self.logger.debug('<%s> %s', channel, jsoned_mess)
348
		if parsable:
349
			jsoned_mess = self.encode(jsoned_mess)
350
		self.async_redis_publisher.publish(channel, jsoned_mess)
351
352
	def encode(self, message):
353
		"""
354
		Marks message with prefix to specify that
355
		it should be decoded and proccesed before sending to client
356
		@param message: message to mark
357
		@return: marked message
358
		"""
359
		return self.parsable_prefix + message
360
361
	def decode(self, message):
362
		"""
363
		Check if message should be proccessed by server before writing to client
364
		@param message: message to check
365
		@return: Object structure of message if it should be processed, None if not
366
		"""
367
		if message.startswith(self.parsable_prefix):
368
			return json.loads(message[1:])
369
370
	def new_message(self, message):
371
		data = message.body
372
		if type(data) is not int:  # subscribe event
373
			decoded = self.decode(data)
374
			if decoded:
375
				data = decoded
376
			self.safe_write(data)
377
			if decoded:
378
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
379
380
	def safe_write(self, message):
381
		raise NotImplementedError('WebSocketHandler implements')
382
383
	def process_send_message(self, message):
384
		"""
385
		:type message: dict
386
		"""
387
		content = message[VarNames.CONTENT]
388
		receiver_id = message.get(VarNames.RECEIVER_ID)  # if receiver_id is None then its a private message
389
		self.logger.info('!! Sending message %s to user with id %s', content, receiver_id)
390
		channel = message[VarNames.CHANNEL]
391
		message_db = Message(
392
			sender_id=self.user_id,
393
			content=content
394
		)
395
		channel_id = int(channel[1:])
396
		if channel.startswith(RedisPrefix.USER_ID_CHANNEL_PREFIX):
397
			message_db.receiver_id = channel_id
398
		elif channel.startswith(RedisPrefix.ROOM_CHANNEL_PREFIX) and channel in self.channels:
399
			message_db.room_id = channel_id
400
		else:
401
			raise ValidationError('Access denied for channel {}'.format(channel))
402
		if VarNames.IMG in message:
403
			message_db.img = extract_photo(message[VarNames.IMG])
404
		self.do_db(message_db.save)  # exit on hacked id with exception
405
		prepared_message = self.create_send_message(message_db)
406
		if message_db.receiver_id is None:
407
			self.logger.debug('!! Detected as public')
408
			self.publish(prepared_message, channel)
409
		else:
410
			self.publish(prepared_message, self.channel)
411
			self.logger.debug('!! Detected as private, channel %s', channel)
412
			if channel != self.channel:
413
				self.publish(prepared_message, channel)
414
415
	def process_call(self, message):
416
		"""
417
		:type message: dict
418
		"""
419
		receiver_id = message.get(VarNames.RECEIVER_ID)  # if receiver_id is None then its a private message
420
		self.logger.info('!! Offering a call to user with id %s',  receiver_id)
421
		message = self.offer_call(message.get(VarNames.CONTENT), message.get(VarNames.CALL_TYPE))
422
		self.publish(message, RedisPrefix.generate_user(receiver_id))
423
424
	def create_new_room(self, message):
425
		room_name = message[VarNames.ROOM_NAME]
426
		if not room_name or len(room_name) > 16:
427
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
428
		room = Room(name=room_name)
429
		room.save()
430
		room.users.add(self.user_id)
431
		room.save()
432
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
433
		self.publish(subscribe_message, self.channel, True)
434
435
	def invite_user(self, message):
436
		print('asd')
437
		room_id = message[VarNames.ROOM_ID]
438
		user_id = message[VarNames.USER_ID]
439
		channel = RedisPrefix.generate_room(room_id)
440
		if channel not in self.channels:
441
			raise ValidationError("Access denied")
442
		try:
443
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
444
		except IntegrityError:
445
			raise ValidationError("User is already in channel")
446
		users_in_room = {}
447
		room = Room.objects.get(id=room_id)
448
		for user in room.users.all():
449
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
450
		self.publish(self.add_user_to_room(channel, user_id, users_in_room[user_id]), channel)
451
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
452
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
453
454
	def create_user_channel(self, message):
455
		user_id = message[VarNames.USER_ID]
456
		cursor = connection.cursor()
457
		cursor.execute(GET_DIRECT_ROOM_ID, [self.user_id, user_id])
458
		query_res = cursor.fetchall()
459
		if len(query_res) > 0:
460
			result = query_res[0]
461
			room_id = result[0]
462
			disabled = result[1]
463
			if disabled is None:
464
				raise ValidationError('This room already exist')
465
			else:
466
				Room.objects.filter(id=room_id).update(disabled=None)
467
		else:
468
			room = Room()
469
			room.save()
470
			room.users.add(self.user_id, user_id)
471
			room.save()
472
			room_id = room.id
473
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
474
		self.publish(subscribe_message, self.channel, True)
475
		other_channel = RedisPrefix.generate_user(user_id)
476
		if self.channel != other_channel:
477
			self.publish(subscribe_message, other_channel, True)
478
479
	def delete_channel(self, message):
480
		room_id = message[VarNames.ROOM_ID]
481
		channel = RedisPrefix.generate_room(room_id)
482
		if channel not in self.channels or room_id == ALL_ROOM_ID:
483
			raise ValidationError('You are not allowed to delete this room')
484
		room = Room.objects.get(id=room_id)
485
		if room.disabled is not None:
486
			raise ValidationError('Room is already deleted')
487
		if room.name is None:  # if private then disable
488
			room.disabled = True
489
		else: # if public -> leave the room, delete the link
490
			room.users.remove(self.user_id)
491
			online = self.get_online_from_redis(channel)
492
			online.remove(self.user_id)
493
			self.publish(self.room_online(online, Actions.LOGOUT, channel), channel)
494
		room.save()
495
		message = self.unsubscribe_direct_message(room_id)
496
		self.publish(message, channel, True)
497
498
	def send_client_new_channel(self, message):
499
		room_id = message[VarNames.ROOM_ID]
500
		channel = RedisPrefix.generate_room(room_id)
501
		self.add_channel(channel)
502
		self.add_online_user(room_id)# TODO doesnt work if already subscribed
503
504
	def send_client_delete_channel(self, message):
505
		room_id = message[VarNames.ROOM_ID]
506
		channel = RedisPrefix.generate_room(room_id)
507
		self.async_redis.unsubscribe(channel)
508
		self.async_redis_publisher.hdel(channel, self.id)
509
		self.channels.remove(channel)
510
511
	def process_get_messages(self, data):
512
		"""
513
		:type data: dict
514
		"""
515
		header_id = data.get('headerId', None)
516
		count = int(data.get('count', 10))
517
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
518
		if header_id is None:
519
			messages = Message.objects.filter(
520
				# Only public or private or private
521
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
522
			).order_by('-pk')[:count]
523
		else:
524
			messages = Message.objects.filter(
525
				Q(id__lt=header_id),
526
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
527
			).order_by('-pk')[:count]
528
		response = self.do_db(self.get_messages, messages)
529
		self.safe_write(response)
530
531
	def get_users_in_current_user_rooms(self):
532
		"""
533
		{
534
			"ROOM_ID:1": {
535
				"name": "All",
536
				"users": {
537
					"USER_ID:admin": {
538
						"name": "USER_NAME:admin",
539
						"sex": "SEX:Secret"
540
					},
541
					"USER_ID_2": {
542
						"name": "USER_NAME:Mike",
543
						"sex": "Male"
544
					}
545
				},
546
				"isPrivate": true
547
			}
548
		}
549
		"""
550
		cursor = connection.cursor()
551
		cursor.execute(USER_ROOMS_QUERY, [self.user_id])
552
		query_res = cursor.fetchall()
553
		res = {}
554
		for user in query_res:
555
			user_id = user[0]
556
			user_name = user[1]
557
			user_sex = user[2]
558
			room_id = user[3]
559
			room_name = user[4]
560
			if room_id not in res:
561
				res[room_id] = {
562
					VarNames.ROOM_NAME: room_name,
563
					VarNames.ROOM_USERS: {}
564
				}
565
			self.set_js_user_structure(res[room_id][VarNames.ROOM_USERS], user_id, user_name, user_sex)
566
		return res
567
568
	def set_js_user_structure(self, user_dict, user_id, name, sex):
569
		user_dict[user_id] = {
570
			VarNames.USER: name,
571
			VarNames.GENDER: GENDERS[sex]
572
		}
573
574
	def save_ip(self):
575
		if (self.do_db(UserJoinedInfo.objects.filter(
576
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
577
			return
578
		ip_address = self.get_or_create_ip()
579
		UserJoinedInfo.objects.create(
580
			ip=ip_address,
581
			user_id=self.user_id
582
		)
583
584
	def get_or_create_ip(self):
585
		try:
586
			ip_address = IpAddress.objects.get(ip=self.ip)
587
		except IpAddress.DoesNotExist:
588
			try:
589
				if not api_url:
590
					raise Exception('api url is absent')
591
				self.logger.debug("Creating ip record %s", self.ip)
592
				f = urlopen(api_url % self.ip)
593
				raw_response = f.read().decode("utf-8")
594
				response = json.loads(raw_response)
595
				if response['status'] != "success":
596
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
597
				ip_address = IpAddress.objects.create(
598
					ip=self.ip,
599
					isp=response['isp'],
600
					country=response['country'],
601
					region=response['regionName'],
602
					city=response['city'],
603
					country_code=response['countryCode']
604
				)
605
			except Exception as e:
606
				self.logger.error("Error while creating ip with country info, because %s", e)
607
				ip_address = IpAddress.objects.create(ip=self.ip)
608
		return ip_address
609
610
611
class AntiSpam(object):
612
613
	def __init__(self):
614
		self.spammed = 0
615
		self.info = {}
616
617
	def check_spam(self, json_message):
618
		message_length = len(json_message)
619
		info_key = int(round(time.time() * 100))
620
		self.info[info_key] = message_length
621
		if message_length > MAX_MESSAGE_SIZE:
622
			self.spammed += 1
623
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
624
		self.check_timed_spam()
625
626
	def check_timed_spam(self):
627
		# TODO implement me
628
		pass
629
		# raise ValidationError("You're chatting too much, calm down a bit!")
630
631
632
class TornadoHandler(WebSocketHandler, MessagesHandler):
633
634
	def __init__(self, *args, **kwargs):
635
		super(TornadoHandler, self).__init__(*args, **kwargs)
636
		self.connected = False
637
		self.anti_spam = AntiSpam()
638
639
	def data_received(self, chunk):
640
		pass
641
642
	def on_message(self, json_message):
643
		try:
644
			if not self.connected:
645
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
646
			if not json_message:
647
				raise ValidationError('Skipping null message')
648
			# self.anti_spam.check_spam(json_message)
649
			self.logger.debug('<< %s', json_message)
650
			message = json.loads(json_message)
651
			if message[VarNames.EVENT] not in self.pre_process_message:
652
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
653
			self.pre_process_message[message[VarNames.EVENT]](message)
654
		except ValidationError as e:
655
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
656
			self.safe_write(error_message)
657
658
	def on_close(self):
659
		if self.async_redis.subscribed:
660
			self.async_redis.unsubscribe(self.channels)
661
		log_data = {}
662
		for channel in self.channels:
663
			if channel.startswith(RedisPrefix.ROOM_CHANNEL_PREFIX):
664
				self.sync_redis.hdel(channel, self.id)
665
				if self.connected:
666
					# seems like async solves problem with connection lost and wrong data status
667
					# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
668
					online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
669
					log_data[channel] = {'online': online, 'is_online': is_online}
670
					if not is_online:
671
						message = self.room_online(online, Actions.LOGOUT, channel)
672
						self.publish(message, channel)
673
		self.logger.info("Close connection result: %s", json.dumps(log_data))
674
		self.async_redis.disconnect()
675
676
	def open(self):
677
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
678
		if sessionStore.exists(session_key):
679
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
680
			self.ip = self.get_client_ip()
681
			session = SessionStore(session_key)
682
			self.user_id = int(session["_auth_user_id"])
683
			log_params = {
684
				'user_id': str(self.user_id).zfill(3),
685
				'id': self.log_id,
686
				'ip': self.ip
687
			}
688
			self.logger = logging.LoggerAdapter(logger, log_params)
689
			self.async_redis.connect()
690
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
691
			self.sender_name = user_db.username
692
			self.sex = user_db.sex_str
693
			user_rooms = self.get_users_in_current_user_rooms()
694
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
695
			self.channels.clear()
696
			self.channels.append(self.channel)
697
			for room_id in user_rooms:
698
				self.channels.append(RedisPrefix.generate_room(room_id))
699
			self.listen(self.channels)
700
			for room_id in user_rooms:
701
				self.add_online_user(room_id)
702
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
703
			self.connected = True
704
			Thread(target=self.save_ip).start()
705
		else:
706
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
707
			self.close(403, "Session key %s has been rejected" % session_key)
708
709
	def check_origin(self, origin):
710
		"""
711
		check whether browser set domain matches origin
712
		"""
713
		parsed_origin = urlparse(origin)
714
		origin = parsed_origin.netloc
715
		origin_domain = origin.split(':')[0].lower()
716
		browser_set = self.request.headers.get("Host")
717
		browser_domain = browser_set.split(':')[0]
718
		return browser_domain == origin_domain
719
720
	def safe_write(self, message):
721
		"""
722
		Tries to send message, doesn't throw exception outside
723
		:type self: MessagesHandler
724
		"""
725
		# self.logger.debug('<< THREAD %s >>', os.getppid())
726
		try:
727
			if isinstance(message, dict):
728
				message = json.dumps(message)
729
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
730
				raise ValueError('Wrong message type : %s' % str(message))
731
			self.logger.debug(">> %s", message)
732
			self.write_message(message)
733
		except tornado.websocket.WebSocketClosedError as e:
734
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
735
736
	def get_client_ip(self):
737
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
738