Completed
Push — master ( c23c94...d89389 )
by Andrew
01:28
created

MessagesHandler.invite_user()   D

Complexity

Conditions 5

Size

Total Lines 18

Duplication

Lines 1
Ratio 5.56 %

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 5
c 4
b 0
f 0
dl 1
loc 18
rs 4.9473
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo, RoomUsers
30
31
PY3 = sys.version > '3'
32
str_type = str if PY3 else basestring
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
base_logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions(object):
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	GROWL_MESSAGE = 'growl'
54
	GET_MESSAGES = 'loadMessages'
55
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
56
	DELETE_ROOM = 'deleteRoom'
57
	EDIT_MESSAGE = 'editMessage'
58
	DELETE_MESSAGE = 'deleteMessage'
59
	CREATE_ROOM_CHANNEL = 'addRoom'
60
	INVITE_USER = 'inviteUser'
61
	ADD_USER = 'addUserToAll'
62
	OFFLINE_MESSAGES = 'loadOfflineMessages'
63
64
65
class VarNames(object):
66
	CALL_TYPE = 'type'
67
	USER = 'user'
68
	USER_ID = 'userId'
69
	TIME = 'time'
70
	CONTENT = 'content'
71
	IMG = 'image'
72
	EVENT = 'action'
73
	MESSAGE_ID = 'id'
74
	GENDER = 'sex'
75
	ROOM_NAME = 'name'
76
	ROOM_ID = 'roomId'
77
	ROOM_USERS = 'users'
78
	CHANNEL = 'channel'
79
	GET_MESSAGES_COUNT = 'count'
80
	GET_MESSAGES_HEADER_ID = 'headerId'
81
	CHANNEL_NAME = 'channel'
82
	IS_ROOM_PRIVATE = 'private'
83
	#ROOM_NAME = 'roomName'
84
	# ROOM_ID = 'roomId'
85
86
87
class CallType(object):
88
	OFFER = 'offer'
89
90
class HandlerNames:
91
	NAME = 'handler'
92
	CHANNELS = 'channels'
93
	CHAT = 'chat'
94
	GROWL = 'growl'
95
	WEBRTC = 'webrtc'
96
	FILE = 'file'
97
98
99
class RedisPrefix:
100
	USER_ID_CHANNEL_PREFIX = 'u'
101
	__ROOM_ONLINE__ = 'o:{}'
102
103
	@classmethod
104
	def generate_user(cls, key):
105
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
106
107
RedisPrefix.DEFAULT_CHANNEL = ALL_ROOM_ID
108
109
110
class MessagesCreator(object):
111
112
	def __init__(self, *args, **kwargs):
113
		super(MessagesCreator, self).__init__(*args, **kwargs)
114
		self.sex = None
115
		self.sender_name = None
116
		self.user_id = 0  # anonymous by default
117
118
	def default(self, content, event, handler):
119
		"""
120
		:return: {"action": event, "content": content, "time": "20:48:57"}
121
		"""
122
		return {
123
			VarNames.EVENT: event,
124
			VarNames.CONTENT: content,
125
			VarNames.USER_ID: self.user_id,
126
			VarNames.TIME: get_milliseconds(),
127
			HandlerNames.NAME: handler
128
		}
129
130
	def room_online(self, online, event, channel):
131
		"""
132
		:return: {"action": event, "content": content, "time": "20:48:57"}
133
		"""
134
		room_less = self.default(online, event, HandlerNames.CHAT)
135
		room_less[VarNames.CHANNEL_NAME] = channel
136
		room_less[VarNames.USER] = self.sender_name
137
		room_less[VarNames.GENDER] = self.sex
138
		return room_less
139
140
	def offer_call(self, content, message_type):
141
		"""
142
		:return: {"action": "call", "content": content, "time": "20:48:57"}
143
		"""
144
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
145
		message[VarNames.CALL_TYPE] = message_type
146
		message[VarNames.USER] = self.sender_name
147
		return message
148
149
	@classmethod
150
	def create_message(cls, message):
151
		res = {
152
			VarNames.USER_ID: message.sender_id,
153
			VarNames.CONTENT: message.content,
154
			VarNames.TIME: message.time,
155
			VarNames.MESSAGE_ID: message.id,
156
		}
157
		if message.img.name:
158
			res[VarNames.IMG] = message.img.url
159
		return res
160
161
	@classmethod
162
	def create_send_message(cls, message, event=Actions.PRINT_MESSAGE):
163
		"""
164
		:param message:
165
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
166
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
167
		"""
168
		res = cls.create_message(message)
169
		res[VarNames.EVENT] = event
170
		res[VarNames.CHANNEL] = message.room_id
171
		res[HandlerNames.NAME] = HandlerNames.CHAT
172
		return res
173
174
	@classmethod
175
	def get_messages(cls, messages, channel):
176
		"""
177
		:type messages: list[Messages]
178
		:type channel: str
179
		:type messages: QuerySet[Messages]
180
		"""
181
		return {
182
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
183
			VarNames.EVENT: Actions.GET_MESSAGES,
184
			VarNames.CHANNEL: channel,
185
			HandlerNames.NAME: HandlerNames.CHAT
186
		}
187
188
	@property
189
	def stored_redis_user(self):
190
		return  self.user_id
191
192
	@property
193
	def channel(self):
194
		return RedisPrefix.generate_user(self.user_id)
195
196
	def subscribe_direct_channel_message(self, room_id, other_user_id):
197
		return {
198
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
199
			VarNames.ROOM_ID: room_id,
200
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
201
			HandlerNames.NAME: HandlerNames.CHANNELS
202
		}
203
204
	def subscribe_room_channel_message(self, room_id, room_name):
205
		return {
206
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
207
			VarNames.ROOM_ID: room_id,
208
			VarNames.ROOM_USERS: [self.user_id],
209
			HandlerNames.NAME: HandlerNames.CHANNELS,
210
			VarNames.ROOM_NAME: room_name
211
		}
212
213
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
214
		return {
215
			VarNames.EVENT: Actions.INVITE_USER,
216
			VarNames.ROOM_ID: room_id,
217
			VarNames.USER_ID: user_id,
218
			HandlerNames.NAME: HandlerNames.CHANNELS,
219
			VarNames.ROOM_NAME: room_name,
220
			VarNames.CONTENT: users
221
		}
222
223
	def add_user_to_room(self, channel, user_id, content):
224
		return {
225
			VarNames.EVENT: Actions.ADD_USER,
226
			VarNames.CHANNEL: channel,
227
			VarNames.USER_ID: user_id,
228
			HandlerNames.NAME: HandlerNames.CHAT,
229
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
230
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
231
		}
232
233
	def unsubscribe_direct_message(self, room_id):
234
		return {
235
			VarNames.EVENT: Actions.DELETE_ROOM,
236
			VarNames.ROOM_ID: room_id,
237
			VarNames.USER_ID: self.user_id,
238
			HandlerNames.NAME: HandlerNames.CHANNELS,
239
			VarNames.TIME: get_milliseconds()
240
		}
241
242
	def load_offline_message(self, offline_messages, channel_key):
243
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
244
		res[VarNames.CHANNEL] = channel_key
245
		return res
246
247
248
class MessagesHandler(MessagesCreator):
249
250
	def __init__(self, *args, **kwargs):
251
		self.parsable_prefix = 'p'
252
		super(MessagesHandler, self).__init__(*args, **kwargs)
253
		self.id = id(self)
254
		self.log_id = str(self.id % 10000).rjust(4, '0')
255
		self.ip = None
256
		from chat import global_redis
257
		self.async_redis_publisher = global_redis.async_redis_publisher
258
		self.sync_redis = global_redis.sync_redis
259
		self.channels = []
260
		self.call_receiver_channel = None
261
		self._logger = None
262
		self.async_redis = tornadoredis.Client()
263
		self.pre_process_message = {
264
			Actions.GET_MESSAGES: self.process_get_messages,
265
			Actions.SEND_MESSAGE: self.process_send_message,
266
			Actions.CALL: self.process_call,
267
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
268
			Actions.DELETE_ROOM: self.delete_channel,
269
			Actions.EDIT_MESSAGE: self.edit_message,
270
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
271
			Actions.INVITE_USER: self.invite_user,
272
		}
273
		self.post_process_message = {
274
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
275
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
276
			Actions.DELETE_ROOM: self.send_client_delete_channel,
277
			Actions.INVITE_USER: self.send_client_new_channel,
278
			Actions.CALL: self.set_opponent_call_channel
279
		}
280
281
	@tornado.gen.engine
282
	def listen(self, channels):
283
		yield tornado.gen.Task(
284
			self.async_redis.subscribe, channels)
285
		self.async_redis.listen(self.new_message)
286
287
	@property
288
	def logger(self):
289
		return self._logger if self._logger else base_logger
290
291
	@tornado.gen.engine
292
	def add_channel(self, channel):
293
		self.channels.append(channel)
294
		yield tornado.gen.Task(
295
			self.async_redis.subscribe, (channel,))
296
297
	def do_db(self, callback, *args, **kwargs):
298
		try:
299
			return callback(*args, **kwargs)
300
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
301
			self.logger.warning('%s, reconnecting' % e)  # TODO
302
			connection.close()
303
			return callback(*args, **kwargs)
304
305
	def execute_query(self, query, *args, **kwargs):
306
		cursor = connection.cursor()
307
		cursor.execute(query, *args, **kwargs)
308
		return cursor.fetchall()
309
310
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
311
		"""
312
		:rtype : dict
313
		returns (dict, bool) if check_type is present
314
		"""
315
		online = self.sync_redis.hgetall(channel)
316
		self.logger.debug('!! channel %s redis online: %s', channel, online)
317
		result = set()
318
		user_is_online = False
319
		# redis stores REDIS_USER_FORMAT, so parse them
320
		if online:
321
			for key_hash, raw_user_id in online.items():  # py2 iteritems
322
				user_id = int(raw_user_id.decode('utf-8'))
323
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
324
					user_is_online = True
325
				result.add(user_id)
326
		result = list(result)
327
		return (result, user_is_online) if check_user_id else result
328
329
	def add_online_user(self, room_id, offline_messages=None):
330
		"""
331
		adds to redis
332
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
333
		:return:
334
		"""
335
		self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user)
336
		# since we add user to online first, latest trigger will always show correct online
337
		online, is_online = self.get_online_from_redis(room_id, self.user_id, self.id)
338
		if not is_online:  # if a new tab has been opened
339
			online.append(self.user_id)
340
			online_user_names_mes = self.room_online(
341
				online,
342
				Actions.LOGIN,
343
				room_id
344
			)
345
			self.logger.info('!! First tab, sending refresh online for all')
346
			self.publish(online_user_names_mes, room_id)
347
			if offline_messages:
348
				self.safe_write(self.load_offline_message(offline_messages, room_id))
349
		else:  # Send user names to self
350
			online_user_names_mes = self.room_online(
351
				online,
352
				Actions.REFRESH_USER,
353
				room_id
354
			)
355
			self.logger.info('!! Second tab, retrieving online for self')
356
			self.safe_write(online_user_names_mes)
357
358
	def publish(self, message, channel, parsable=False):
359
		jsoned_mess = json.dumps(message)
360
		self.logger.debug('<%s> %s', channel, jsoned_mess)
361
		if parsable:
362
			jsoned_mess = self.encode(jsoned_mess)
363
		self.async_redis_publisher.publish(channel, jsoned_mess)
364
365
	def encode(self, message):
366
		"""
367
		Marks message with prefix to specify that
368
		it should be decoded and proccesed before sending to client
369
		@param message: message to mark
370
		@return: marked message
371
		"""
372
		return self.parsable_prefix + message
373
374
	def decode(self, message):
375
		"""
376
		Check if message should be proccessed by server before writing to client
377
		@param message: message to check
378
		@type message: str
379
		@return: Object structure of message if it should be processed, None if not
380
		"""
381
		if message.startswith(self.parsable_prefix):
382
			return json.loads(message[1:])
383
384
	def new_message(self, message):
385
		data = message.body
386
		if isinstance(data, str_type):  # subscribe event
387
			decoded = self.decode(data)
388
			if decoded:
389
				data = decoded
390
			self.safe_write(data)
391
			if decoded:
392
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
393
394
	def safe_write(self, message):
395
		raise NotImplementedError('WebSocketHandler implements')
396
397
	def process_send_message(self, message):
398
		"""
399
		:type message: dict
400
		"""
401
		channel = message[VarNames.CHANNEL]
402
		message_db = Message(
403
			sender_id=self.user_id,
404
			content=message[VarNames.CONTENT]
405
		)
406
		message_db.room_id = channel
407
		if VarNames.IMG in message:
408
			message_db.img = extract_photo(message[VarNames.IMG])
409
		self.do_db(message_db.save)  # exit on hacked id with exception
410
		prepared_message = self.create_send_message(message_db)
411
		self.publish(prepared_message, channel)
412
413
	def process_call(self, in_message):
414
		"""
415
		:type in_message: dict
416
		"""
417
		call_type = in_message.get(VarNames.CALL_TYPE)
418
		set_opponent_channel = False
419
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
420
		if call_type == CallType.OFFER:
421
			room_id = in_message[VarNames.CHANNEL]
422
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
423
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
424
			set_opponent_channel = True
425
			out_message[VarNames.CHANNEL] = room_id
426
		# TODO
427
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
428
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
429
430
	def create_new_room(self, message):
431
		room_name = message[VarNames.ROOM_NAME]
432
		if not room_name or len(room_name) > 16:
433
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
434
		room = Room(name=room_name)
435
		self.do_db(room.save)
436
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
437
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
438
		self.publish(subscribe_message, self.channel, True)
439
440
	def invite_user(self, message):
441
		room_id = message[VarNames.ROOM_ID]
442
		user_id = message[VarNames.USER_ID]
443
		if room_id not in self.channels:
444
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
445
		room = self.do_db(Room.objects.get, id=room_id)
446
		if room.is_private:
447
			raise ValidationError("You can't add users to direct room, create a new room instead")
448
		try:
449
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
450
		except IntegrityError:
451
			raise ValidationError("User is already in channel")
452
		users_in_room = {}
453
		for user in room.users.all():
454
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
455
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
456
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
457 View Code Duplication
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
458
459
	def create_self_room(self, user_rooms):
460
		rooms_ids = list([room['room_id'] for room in user_rooms])
461
		query_res = self.execute_query(SELECT_SELF_ROOM, [rooms_ids,])
462
		if len(query_res) > 0:
463
			room = query_res[0]
464
			room_id = room[0]
465
			self.update_room(room_id, room[1])
466
		else:
467
			room = Room()
468
			room.save()
469
			room_id = room.id
470 View Code Duplication
			RoomUsers(user_id=self.user_id, room_id=room_id).save()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471
		return room_id
472
473
	def create_other_room(self, user_rooms, user_id):
474
		query_res = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms).values('room__id', 'room__disabled')
475
		if len(query_res) > 0:
476
			room = query_res[0]
477
			room_id = room['room__id']
478
			self.update_room(room_id, room['room__disabled'])
479
		else:
480
			room = Room()
481
			room.save()
482
			room_id = room.id
483
			RoomUsers.objects.bulk_create([
484
				RoomUsers(user_id=user_id, room_id=room_id),
485
				RoomUsers(user_id=self.user_id, room_id=room_id),
486
			])
487
		return room_id
488
489
	def update_room(self, room_id, disabled):
490
		if not disabled:
491
			raise ValidationError('This room already exist')
492
		else:
493
			Room.objects.filter(id=room_id).update(disabled=False)
494
495
	def create_user_channel(self, message):
496
		user_id = message[VarNames.USER_ID]
497
		# get all self private rooms ids
498
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
499
		# get private room that contains another user from rooms above
500
		if self.user_id == user_id:
501
			room_id = self.create_self_room(user_rooms)
502
		else:
503
			room_id = self.create_other_room(user_rooms, user_id)
504
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
505
		self.publish(subscribe_message, self.channel, True)
506
		other_channel = RedisPrefix.generate_user(user_id)
507
		if self.channel != other_channel:
508
			self.publish(subscribe_message, other_channel, True)
509
510
	def delete_channel(self, message):
511
		room_id = message[VarNames.ROOM_ID]
512
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
513
			raise ValidationError('You are not allowed to exit this room')
514
		room = self.do_db(Room.objects.get, id=room_id)
515
		if room.disabled:
516
			raise ValidationError('Room is already deleted')
517
		if room.name is None:  # if private then disable
518
			room.disabled = True
519
		else: # if public -> leave the room, delete the link
520
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
521
			online = self.get_online_from_redis(room_id)
522
			online.remove(self.user_id)
523
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
524
		room.save()
525
		message = self.unsubscribe_direct_message(room_id)
526
		self.publish(message, room_id, True)
527
528
	def edit_message(self, data):
529
		message_id = data[VarNames.MESSAGE_ID]
530
		message = Message.objects.get(id=message_id)
531
		if message.sender_id != self.user_id:
532
			raise ValidationError("You can only edit your messages")
533
		if message.time + 60000 < get_milliseconds():
534
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
535
		if message.deleted:
536
			raise ValidationError("Already deleted")
537
		message.content = data[VarNames.CONTENT]
538
		selector = Message.objects.filter(id=message_id)
539
		if message.content is None:
540
			selector.update(deleted=True)
541
			action = Actions.DELETE_MESSAGE
542
		else:
543
			action = Actions.EDIT_MESSAGE
544
			selector.update(content=message.content)
545
		self.publish(self.create_send_message(message, action), message.room_id)
546
547
	def send_client_new_channel(self, message):
548
		room_id = message[VarNames.ROOM_ID]
549
		self.add_channel(room_id)
550
		self.add_online_user(room_id)
551
552
	def set_opponent_call_channel(self, message):
553
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
554
555
	def send_client_delete_channel(self, message):
556
		room_id = message[VarNames.ROOM_ID]
557
		self.async_redis.unsubscribe((room_id,))
558
		self.async_redis_publisher.hdel(room_id, self.id)
559
		self.channels.remove(room_id)
560
561
	def process_get_messages(self, data):
562
		"""
563
		:type data: dict
564
		"""
565
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
566
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
567
		room_id = data[VarNames.CHANNEL]
568
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
569
		if header_id is None:
570
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
571
		else:
572
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
573
		response = self.do_db(self.get_messages, messages, room_id)
574
		self.safe_write(response)
575
576
	def get_offline_messages(self):
577
		res = {}
578
		offline_messages = Message.objects.filter(
579
			id__gt=F('room__roomusers__last_read_message_id'),
580
			deleted=False,
581
			room__roomusers__user_id=self.user_id
582
		)
583
		for message in offline_messages:
584
			res.setdefault(message.room_id, []).append(self.create_message(message))
585
		return res
586
587
	def get_users_in_current_user_rooms(self):
588
		"""
589
		{
590
			"ROOM_ID:1": {
591
				"name": "All",
592
				"users": {
593
					"USER_ID:admin": {
594
						"name": "USER_NAME:admin",
595
						"sex": "SEX:Secret"
596
					},
597
					"USER_ID_2": {
598
						"name": "USER_NAME:Mike",
599
						"sex": "Male"
600
					}
601
				},
602
				"isPrivate": true
603
			}
604
		}
605
		"""
606
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
607
		res = {room['id']: {
608
				VarNames.ROOM_NAME: room['name'],
609
				VarNames.ROOM_USERS: {}
610
			} for room in user_rooms}
611
		room_ids = (room_id for room_id in res)
612
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
613
		for user in rooms_users:
614
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
615
		return res
616
617
	def set_js_user_structure(self, user_dict, user_id, name, sex):
618
		user_dict[user_id] = {
619
			VarNames.USER: name,
620
			VarNames.GENDER: GENDERS[sex]
621
		}
622
623
	def save_ip(self):
624
		if (self.do_db(UserJoinedInfo.objects.filter(
625
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
626
			return
627
		ip_address = self.get_or_create_ip()
628
		UserJoinedInfo.objects.create(
629
			ip=ip_address,
630
			user_id=self.user_id
631
		)
632
633
	def get_or_create_ip(self):
634
		try:
635
			ip_address = IpAddress.objects.get(ip=self.ip)
636
		except IpAddress.DoesNotExist:
637
			try:
638
				if not api_url:
639
					raise Exception('api url is absent')
640
				self.logger.debug("Creating ip record %s", self.ip)
641
				f = urlopen(api_url % self.ip)
642
				raw_response = f.read().decode("utf-8")
643
				response = json.loads(raw_response)
644
				if response['status'] != "success":
645
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
646
				ip_address = IpAddress.objects.create(
647
					ip=self.ip,
648
					isp=response['isp'],
649
					country=response['country'],
650
					region=response['regionName'],
651
					city=response['city'],
652
					country_code=response['countryCode']
653
				)
654
			except Exception as e:
655
				self.logger.error("Error while creating ip with country info, because %s", e)
656
				ip_address = IpAddress.objects.create(ip=self.ip)
657
		return ip_address
658
659
660
class AntiSpam(object):
661
662
	def __init__(self):
663
		self.spammed = 0
664
		self.info = {}
665
666
	def check_spam(self, json_message):
667
		message_length = len(json_message)
668
		info_key = int(round(time.time() * 100))
669
		self.info[info_key] = message_length
670
		if message_length > MAX_MESSAGE_SIZE:
671
			self.spammed += 1
672
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
673
		self.check_timed_spam()
674
675
	def check_timed_spam(self):
676
		# TODO implement me
677
		pass
678
		# raise ValidationError("You're chatting too much, calm down a bit!")
679
680
681
class TornadoHandler(WebSocketHandler, MessagesHandler):
682
683
	def __init__(self, *args, **kwargs):
684
		super(TornadoHandler, self).__init__(*args, **kwargs)
685
		self.connected = False
686
		self.anti_spam = AntiSpam()
687
688
	def data_received(self, chunk):
689
		pass
690
691
	def on_message(self, json_message):
692
		try:
693
			if not self.connected:
694
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
695
			if not json_message:
696
				raise ValidationError('Skipping null message')
697
			# self.anti_spam.check_spam(json_message)
698
			self.logger.debug('<< %s', json_message)
699
			message = json.loads(json_message)
700
			if message[VarNames.EVENT] not in self.pre_process_message:
701
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
702
			channel = message.get(VarNames.CHANNEL)
703
			if channel and channel not in self.channels:
704
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
705
			self.pre_process_message[message[VarNames.EVENT]](message)
706
		except ValidationError as e:
707
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
708
			self.safe_write(error_message)
709
710
	def on_close(self):
711
		if self.async_redis.subscribed:
712
			self.logger.info("Close event, unsubscribing from %s", self.channels)
713
			self.async_redis.unsubscribe(self.channels)
714
		else:
715
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
716
		log_data = {}
717
		gone_offline = False
718
		for channel in self.channels:
719
			if not isinstance(channel, int):
720
				continue
721
			self.sync_redis.hdel(channel, self.id)
722
			if self.connected:
723
				# seems like async solves problem with connection lost and wrong data status
724
				# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
725
				online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
726
				log_data[channel] = {'online': online, 'is_online': is_online}
727
				if not is_online:
728
					message = self.room_online(online, Actions.LOGOUT, channel)
729
					self.publish(message, channel)
730
					gone_offline = True
731
		if gone_offline:
732
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
733
			self.logger.info("Updated %s last read message", res)
734
735
		self.logger.info("Close connection result: %s", json.dumps(log_data))
736
		self.async_redis.disconnect()
737
738
	def open(self):
739
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
740
		if sessionStore.exists(session_key):
741
			self.ip = self.get_client_ip()
742
			session = SessionStore(session_key)
743
			self.user_id = int(session["_auth_user_id"])
744
			log_params = {
745
				'user_id': str(self.user_id).zfill(3),
746
				'id': self.log_id,
747
				'ip': self.ip
748
			}
749
			self._logger = logging.LoggerAdapter(base_logger, log_params)
750
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
751
			self.async_redis.connect()
752
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
753
			self.sender_name = user_db.username
754
			self.sex = user_db.sex_str
755
			user_rooms = self.get_users_in_current_user_rooms()
756
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
757
			# get all missed messages
758
			self.channels.clear()
759
			self.channels.append(self.channel)
760
			for room_id in user_rooms:
761
				self.channels.append(room_id)
762
			self.listen(self.channels)
763
			off_messages = self.get_offline_messages()
764
			for room_id in user_rooms:
765
				self.add_online_user(room_id, off_messages.get(room_id))
766
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
767
			self.connected = True
768
			Thread(target=self.save_ip).start()
769
		else:
770
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
771
			self.close(403, "Session key %s has been rejected" % session_key)
772
773
	def check_origin(self, origin):
774
		"""
775
		check whether browser set domain matches origin
776
		"""
777
		parsed_origin = urlparse(origin)
778
		origin = parsed_origin.netloc
779
		origin_domain = origin.split(':')[0].lower()
780
		browser_set = self.request.headers.get("Host")
781
		browser_domain = browser_set.split(':')[0]
782
		return browser_domain == origin_domain
783
784
	def safe_write(self, message):
785
		"""
786
		Tries to send message, doesn't throw exception outside
787
		:type self: MessagesHandler
788
		"""
789
		# self.logger.debug('<< THREAD %s >>', os.getppid())
790
		try:
791
			if isinstance(message, dict):
792
				message = json.dumps(message)
793
			if not isinstance(message, str_type):
794
				raise ValueError('Wrong message type : %s' % str(message))
795
			self.logger.debug(">> %s", message)
796
			self.write_message(message)
797
		except tornado.websocket.WebSocketClosedError as e:
798
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
799
800
	def get_client_ip(self):
801
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
802