Completed
Push — master ( d89389...f98f38 )
by Andrew
01:02
created

MessagesHandler.invite_user()   B

Complexity

Conditions 5

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 5
c 4
b 0
f 0
dl 0
loc 18
rs 8.5454
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo, RoomUsers
30
31
PY3 = sys.version > '3'
32
str_type = str if PY3 else basestring
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
base_logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions(object):
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	GROWL_MESSAGE = 'growl'
54
	GET_MESSAGES = 'loadMessages'
55
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
56
	DELETE_ROOM = 'deleteRoom'
57
	EDIT_MESSAGE = 'editMessage'
58
	DELETE_MESSAGE = 'deleteMessage'
59
	CREATE_ROOM_CHANNEL = 'addRoom'
60
	INVITE_USER = 'inviteUser'
61
	ADD_USER = 'addUserToAll'
62
	OFFLINE_MESSAGES = 'loadOfflineMessages'
63
64
65
class VarNames(object):
66
	CALL_TYPE = 'type'
67
	USER = 'user'
68
	USER_ID = 'userId'
69
	TIME = 'time'
70
	CONTENT = 'content'
71
	IMG = 'image'
72
	EVENT = 'action'
73
	MESSAGE_ID = 'id'
74
	GENDER = 'sex'
75
	ROOM_NAME = 'name'
76
	ROOM_ID = 'roomId'
77
	ROOM_USERS = 'users'
78
	CHANNEL = 'channel'
79
	GET_MESSAGES_COUNT = 'count'
80
	GET_MESSAGES_HEADER_ID = 'headerId'
81
	CHANNEL_NAME = 'channel'
82
	IS_ROOM_PRIVATE = 'private'
83
	#ROOM_NAME = 'roomName'
84
	# ROOM_ID = 'roomId'
85
86
87
class CallType(object):
88
	OFFER = 'offer'
89
90
class HandlerNames:
91
	NAME = 'handler'
92
	CHANNELS = 'channels'
93
	CHAT = 'chat'
94
	GROWL = 'growl'
95
	WEBRTC = 'webrtc'
96
	FILE = 'file'
97
98
99
class RedisPrefix:
100
	USER_ID_CHANNEL_PREFIX = 'u'
101
	__ROOM_ONLINE__ = 'o:{}'
102
103
	@classmethod
104
	def generate_user(cls, key):
105
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
106
107
RedisPrefix.DEFAULT_CHANNEL = ALL_ROOM_ID
108
109
110
class MessagesCreator(object):
111
112
	def __init__(self, *args, **kwargs):
113
		super(MessagesCreator, self).__init__(*args, **kwargs)
114
		self.sex = None
115
		self.sender_name = None
116
		self.user_id = 0  # anonymous by default
117
118
	def default(self, content, event, handler):
119
		"""
120
		:return: {"action": event, "content": content, "time": "20:48:57"}
121
		"""
122
		return {
123
			VarNames.EVENT: event,
124
			VarNames.CONTENT: content,
125
			VarNames.USER_ID: self.user_id,
126
			VarNames.TIME: get_milliseconds(),
127
			HandlerNames.NAME: handler
128
		}
129
130
	def room_online(self, online, event, channel):
131
		"""
132
		:return: {"action": event, "content": content, "time": "20:48:57"}
133
		"""
134
		room_less = self.default(online, event, HandlerNames.CHAT)
135
		room_less[VarNames.CHANNEL_NAME] = channel
136
		room_less[VarNames.USER] = self.sender_name
137
		room_less[VarNames.GENDER] = self.sex
138
		return room_less
139
140
	def offer_call(self, content, message_type):
141
		"""
142
		:return: {"action": "call", "content": content, "time": "20:48:57"}
143
		"""
144
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
145
		message[VarNames.CALL_TYPE] = message_type
146
		message[VarNames.USER] = self.sender_name
147
		return message
148
149
	@classmethod
150
	def create_message(cls, message):
151
		res = {
152
			VarNames.USER_ID: message.sender_id,
153
			VarNames.CONTENT: message.content,
154
			VarNames.TIME: message.time,
155
			VarNames.MESSAGE_ID: message.id,
156
		}
157
		if message.img.name:
158
			res[VarNames.IMG] = message.img.url
159
		return res
160
161
	@classmethod
162
	def create_send_message(cls, message, event=Actions.PRINT_MESSAGE):
163
		"""
164
		:param message:
165
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
166
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
167
		"""
168
		res = cls.create_message(message)
169
		res[VarNames.EVENT] = event
170
		res[VarNames.CHANNEL] = message.room_id
171
		res[HandlerNames.NAME] = HandlerNames.CHAT
172
		return res
173
174
	@classmethod
175
	def get_messages(cls, messages, channel):
176
		"""
177
		:type messages: list[Messages]
178
		:type channel: str
179
		:type messages: QuerySet[Messages]
180
		"""
181
		return {
182
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
183
			VarNames.EVENT: Actions.GET_MESSAGES,
184
			VarNames.CHANNEL: channel,
185
			HandlerNames.NAME: HandlerNames.CHAT
186
		}
187
188
	@property
189
	def stored_redis_user(self):
190
		return  self.user_id
191
192
	@property
193
	def channel(self):
194
		return RedisPrefix.generate_user(self.user_id)
195
196
	def subscribe_direct_channel_message(self, room_id, other_user_id):
197
		return {
198
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
199
			VarNames.ROOM_ID: room_id,
200
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
201
			HandlerNames.NAME: HandlerNames.CHANNELS
202
		}
203
204
	def subscribe_room_channel_message(self, room_id, room_name):
205
		return {
206
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
207
			VarNames.ROOM_ID: room_id,
208
			VarNames.ROOM_USERS: [self.user_id],
209
			HandlerNames.NAME: HandlerNames.CHANNELS,
210
			VarNames.ROOM_NAME: room_name
211
		}
212
213
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
214
		return {
215
			VarNames.EVENT: Actions.INVITE_USER,
216
			VarNames.ROOM_ID: room_id,
217
			VarNames.USER_ID: user_id,
218
			HandlerNames.NAME: HandlerNames.CHANNELS,
219
			VarNames.ROOM_NAME: room_name,
220
			VarNames.CONTENT: users
221
		}
222
223
	def add_user_to_room(self, channel, user_id, content):
224
		return {
225
			VarNames.EVENT: Actions.ADD_USER,
226
			VarNames.CHANNEL: channel,
227
			VarNames.USER_ID: user_id,
228
			HandlerNames.NAME: HandlerNames.CHAT,
229
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
230
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
231
		}
232
233
	def unsubscribe_direct_message(self, room_id):
234
		return {
235
			VarNames.EVENT: Actions.DELETE_ROOM,
236
			VarNames.ROOM_ID: room_id,
237
			VarNames.USER_ID: self.user_id,
238
			HandlerNames.NAME: HandlerNames.CHANNELS,
239
			VarNames.TIME: get_milliseconds()
240
		}
241
242
	def load_offline_message(self, offline_messages, channel_key):
243
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
244
		res[VarNames.CHANNEL] = channel_key
245
		return res
246
247
248
class MessagesHandler(MessagesCreator):
249
250
	def __init__(self, *args, **kwargs):
251
		self.parsable_prefix = 'p'
252
		super(MessagesHandler, self).__init__(*args, **kwargs)
253
		self.id = id(self)
254
		self.ip = None
255
		from chat import global_redis
256
		self.async_redis_publisher = global_redis.async_redis_publisher
257
		self.sync_redis = global_redis.sync_redis
258
		self.channels = []
259
		self.call_receiver_channel = None
260
		self._logger = None
261
		self.async_redis = tornadoredis.Client()
262
		self.pre_process_message = {
263
			Actions.GET_MESSAGES: self.process_get_messages,
264
			Actions.SEND_MESSAGE: self.process_send_message,
265
			Actions.CALL: self.process_call,
266
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
267
			Actions.DELETE_ROOM: self.delete_channel,
268
			Actions.EDIT_MESSAGE: self.edit_message,
269
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
270
			Actions.INVITE_USER: self.invite_user,
271
		}
272
		self.post_process_message = {
273
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
274
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
275
			Actions.DELETE_ROOM: self.send_client_delete_channel,
276
			Actions.INVITE_USER: self.send_client_new_channel,
277
			Actions.CALL: self.set_opponent_call_channel
278
		}
279
280
	@tornado.gen.engine
281
	def listen(self, channels):
282
		yield tornado.gen.Task(
283
			self.async_redis.subscribe, channels)
284
		self.async_redis.listen(self.new_message)
285
286
	@property
287
	def logger(self):
288
		return self._logger if self._logger else base_logger
289
290
	@tornado.gen.engine
291
	def add_channel(self, channel):
292
		self.channels.append(channel)
293
		yield tornado.gen.Task(
294
			self.async_redis.subscribe, (channel,))
295
296
	def do_db(self, callback, *args, **kwargs):
297
		try:
298
			return callback(*args, **kwargs)
299
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
300
			self.logger.warning('%s, reconnecting' % e)  # TODO
301
			connection.close()
302
			return callback(*args, **kwargs)
303
304
	def execute_query(self, query, *args, **kwargs):
305
		cursor = connection.cursor()
306
		cursor.execute(query, *args, **kwargs)
307
		return cursor.fetchall()
308
309
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
310
		"""
311
		:rtype : dict
312
		returns (dict, bool) if check_type is present
313
		"""
314
		online = self.sync_redis.hgetall(channel)
315
		self.logger.debug('!! channel %s redis online: %s', channel, online)
316
		result = set()
317
		user_is_online = False
318
		# redis stores REDIS_USER_FORMAT, so parse them
319
		if online:
320
			for key_hash, raw_user_id in online.items():  # py2 iteritems
321
				user_id = int(raw_user_id.decode('utf-8'))
322
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
323
					user_is_online = True
324
				result.add(user_id)
325
		result = list(result)
326
		return (result, user_is_online) if check_user_id else result
327
328
	def add_online_user(self, room_id, offline_messages=None):
329
		"""
330
		adds to redis
331
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
332
		:return:
333
		"""
334
		self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user)
335
		# since we add user to online first, latest trigger will always show correct online
336
		online, is_online = self.get_online_from_redis(room_id, self.user_id, self.id)
337
		if not is_online:  # if a new tab has been opened
338
			online.append(self.user_id)
339
			online_user_names_mes = self.room_online(
340
				online,
341
				Actions.LOGIN,
342
				room_id
343
			)
344
			self.logger.info('!! First tab, sending refresh online for all')
345
			self.publish(online_user_names_mes, room_id)
346
			if offline_messages:
347
				self.safe_write(self.load_offline_message(offline_messages, room_id))
348
		else:  # Send user names to self
349
			online_user_names_mes = self.room_online(
350
				online,
351
				Actions.REFRESH_USER,
352
				room_id
353
			)
354
			self.logger.info('!! Second tab, retrieving online for self')
355
			self.safe_write(online_user_names_mes)
356
357
	def publish(self, message, channel, parsable=False):
358
		jsoned_mess = json.dumps(message)
359
		self.logger.debug('<%s> %s', channel, jsoned_mess)
360
		if parsable:
361
			jsoned_mess = self.encode(jsoned_mess)
362
		self.async_redis_publisher.publish(channel, jsoned_mess)
363
364
	def encode(self, message):
365
		"""
366
		Marks message with prefix to specify that
367
		it should be decoded and proccesed before sending to client
368
		@param message: message to mark
369
		@return: marked message
370
		"""
371
		return self.parsable_prefix + message
372
373
	def decode(self, message):
374
		"""
375
		Check if message should be proccessed by server before writing to client
376
		@param message: message to check
377
		@type message: str
378
		@return: Object structure of message if it should be processed, None if not
379
		"""
380
		if message.startswith(self.parsable_prefix):
381
			return json.loads(message[1:])
382
383
	def new_message(self, message):
384
		data = message.body
385
		if isinstance(data, str_type):  # subscribe event
386
			decoded = self.decode(data)
387
			if decoded:
388
				data = decoded
389
			self.safe_write(data)
390
			if decoded:
391
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
392
393
	def safe_write(self, message):
394
		raise NotImplementedError('WebSocketHandler implements')
395
396
	def process_send_message(self, message):
397
		"""
398
		:type message: dict
399
		"""
400
		channel = message[VarNames.CHANNEL]
401
		message_db = Message(
402
			sender_id=self.user_id,
403
			content=message[VarNames.CONTENT]
404
		)
405
		message_db.room_id = channel
406
		if VarNames.IMG in message:
407
			message_db.img = extract_photo(message[VarNames.IMG])
408
		self.do_db(message_db.save)  # exit on hacked id with exception
409
		prepared_message = self.create_send_message(message_db)
410
		self.publish(prepared_message, channel)
411
412
	def process_call(self, in_message):
413
		"""
414
		:type in_message: dict
415
		"""
416
		call_type = in_message.get(VarNames.CALL_TYPE)
417
		set_opponent_channel = False
418
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
419
		if call_type == CallType.OFFER:
420
			room_id = in_message[VarNames.CHANNEL]
421
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
422
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
423
			set_opponent_channel = True
424
			out_message[VarNames.CHANNEL] = room_id
425
		# TODO
426
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
427
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
428
429
	def create_new_room(self, message):
430
		room_name = message[VarNames.ROOM_NAME]
431
		if not room_name or len(room_name) > 16:
432
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
433
		room = Room(name=room_name)
434
		self.do_db(room.save)
435
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
436
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
437
		self.publish(subscribe_message, self.channel, True)
438
439
	def invite_user(self, message):
440
		room_id = message[VarNames.ROOM_ID]
441
		user_id = message[VarNames.USER_ID]
442
		if room_id not in self.channels:
443
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
444
		room = self.do_db(Room.objects.get, id=room_id)
445
		if room.is_private:
446
			raise ValidationError("You can't add users to direct room, create a new room instead")
447
		try:
448
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
449
		except IntegrityError:
450
			raise ValidationError("User is already in channel")
451
		users_in_room = {}
452
		for user in room.users.all():
453
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
454
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
455
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
456
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
457 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
458
	def create_self_room(self, user_rooms):
459
		rooms_ids = list([room['room_id'] for room in user_rooms])
460
		query_res = self.execute_query(SELECT_SELF_ROOM, [rooms_ids,])
461
		if len(query_res) > 0:
462
			room = query_res[0]
463
			room_id = room[0]
464
			self.update_room(room_id, room[1])
465
		else:
466
			room = Room()
467
			room.save()
468
			room_id = room.id
469
			RoomUsers(user_id=self.user_id, room_id=room_id).save()
470 View Code Duplication
		return room_id
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471
472
	def create_other_room(self, user_rooms, user_id):
473
		query_res = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms).values('room__id', 'room__disabled')
474
		if len(query_res) > 0:
475
			room = query_res[0]
476
			room_id = room['room__id']
477
			self.update_room(room_id, room['room__disabled'])
478
		else:
479
			room = Room()
480
			room.save()
481
			room_id = room.id
482
			RoomUsers.objects.bulk_create([
483
				RoomUsers(user_id=user_id, room_id=room_id),
484
				RoomUsers(user_id=self.user_id, room_id=room_id),
485
			])
486
		return room_id
487
488
	def update_room(self, room_id, disabled):
489
		if not disabled:
490
			raise ValidationError('This room already exist')
491
		else:
492
			Room.objects.filter(id=room_id).update(disabled=False)
493
494
	def create_user_channel(self, message):
495
		user_id = message[VarNames.USER_ID]
496
		# get all self private rooms ids
497
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
498
		# get private room that contains another user from rooms above
499
		if self.user_id == user_id:
500
			room_id = self.create_self_room(user_rooms)
501
		else:
502
			room_id = self.create_other_room(user_rooms, user_id)
503
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
504
		self.publish(subscribe_message, self.channel, True)
505
		other_channel = RedisPrefix.generate_user(user_id)
506
		if self.channel != other_channel:
507
			self.publish(subscribe_message, other_channel, True)
508
509
	def delete_channel(self, message):
510
		room_id = message[VarNames.ROOM_ID]
511
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
512
			raise ValidationError('You are not allowed to exit this room')
513
		room = self.do_db(Room.objects.get, id=room_id)
514
		if room.disabled:
515
			raise ValidationError('Room is already deleted')
516
		if room.name is None:  # if private then disable
517
			room.disabled = True
518
		else: # if public -> leave the room, delete the link
519
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
520
			online = self.get_online_from_redis(room_id)
521
			online.remove(self.user_id)
522
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
523
		room.save()
524
		message = self.unsubscribe_direct_message(room_id)
525
		self.publish(message, room_id, True)
526
527
	def edit_message(self, data):
528
		message_id = data[VarNames.MESSAGE_ID]
529
		message = Message.objects.get(id=message_id)
530
		if message.sender_id != self.user_id:
531
			raise ValidationError("You can only edit your messages")
532
		if message.time + 60000 < get_milliseconds():
533
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
534
		if message.deleted:
535
			raise ValidationError("Already deleted")
536
		message.content = data[VarNames.CONTENT]
537
		selector = Message.objects.filter(id=message_id)
538
		if message.content is None:
539
			selector.update(deleted=True)
540
			action = Actions.DELETE_MESSAGE
541
		else:
542
			action = Actions.EDIT_MESSAGE
543
			selector.update(content=message.content)
544
		self.publish(self.create_send_message(message, action), message.room_id)
545
546
	def send_client_new_channel(self, message):
547
		room_id = message[VarNames.ROOM_ID]
548
		self.add_channel(room_id)
549
		self.add_online_user(room_id)
550
551
	def set_opponent_call_channel(self, message):
552
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
553
554
	def send_client_delete_channel(self, message):
555
		room_id = message[VarNames.ROOM_ID]
556
		self.async_redis.unsubscribe((room_id,))
557
		self.async_redis_publisher.hdel(room_id, self.id)
558
		self.channels.remove(room_id)
559
560
	def process_get_messages(self, data):
561
		"""
562
		:type data: dict
563
		"""
564
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
565
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
566
		room_id = data[VarNames.CHANNEL]
567
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
568
		if header_id is None:
569
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
570
		else:
571
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
572
		response = self.do_db(self.get_messages, messages, room_id)
573
		self.safe_write(response)
574
575
	def get_offline_messages(self):
576
		res = {}
577
		offline_messages = Message.objects.filter(
578
			id__gt=F('room__roomusers__last_read_message_id'),
579
			deleted=False,
580
			room__roomusers__user_id=self.user_id
581
		)
582
		for message in offline_messages:
583
			res.setdefault(message.room_id, []).append(self.create_message(message))
584
		return res
585
586
	def get_users_in_current_user_rooms(self):
587
		"""
588
		{
589
			"ROOM_ID:1": {
590
				"name": "All",
591
				"users": {
592
					"USER_ID:admin": {
593
						"name": "USER_NAME:admin",
594
						"sex": "SEX:Secret"
595
					},
596
					"USER_ID_2": {
597
						"name": "USER_NAME:Mike",
598
						"sex": "Male"
599
					}
600
				},
601
				"isPrivate": true
602
			}
603
		}
604
		"""
605
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
606
		res = {room['id']: {
607
				VarNames.ROOM_NAME: room['name'],
608
				VarNames.ROOM_USERS: {}
609
			} for room in user_rooms}
610
		room_ids = (room_id for room_id in res)
611
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
612
		for user in rooms_users:
613
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
614
		return res
615
616
	def set_js_user_structure(self, user_dict, user_id, name, sex):
617
		user_dict[user_id] = {
618
			VarNames.USER: name,
619
			VarNames.GENDER: GENDERS[sex]
620
		}
621
622
	def save_ip(self):
623
		if (self.do_db(UserJoinedInfo.objects.filter(
624
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
625
			return
626
		ip_address = self.get_or_create_ip()
627
		UserJoinedInfo.objects.create(
628
			ip=ip_address,
629
			user_id=self.user_id
630
		)
631
632
	def get_or_create_ip(self):
633
		try:
634
			ip_address = IpAddress.objects.get(ip=self.ip)
635
		except IpAddress.DoesNotExist:
636
			try:
637
				if not api_url:
638
					raise Exception('api url is absent')
639
				self.logger.debug("Creating ip record %s", self.ip)
640
				f = urlopen(api_url % self.ip)
641
				raw_response = f.read().decode("utf-8")
642
				response = json.loads(raw_response)
643
				if response['status'] != "success":
644
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
645
				ip_address = IpAddress.objects.create(
646
					ip=self.ip,
647
					isp=response['isp'],
648
					country=response['country'],
649
					region=response['regionName'],
650
					city=response['city'],
651
					country_code=response['countryCode']
652
				)
653
			except Exception as e:
654
				self.logger.error("Error while creating ip with country info, because %s", e)
655
				ip_address = IpAddress.objects.create(ip=self.ip)
656
		return ip_address
657
658
659
class AntiSpam(object):
660
661
	def __init__(self):
662
		self.spammed = 0
663
		self.info = {}
664
665
	def check_spam(self, json_message):
666
		message_length = len(json_message)
667
		info_key = int(round(time.time() * 100))
668
		self.info[info_key] = message_length
669
		if message_length > MAX_MESSAGE_SIZE:
670
			self.spammed += 1
671
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
672
		self.check_timed_spam()
673
674
	def check_timed_spam(self):
675
		# TODO implement me
676
		pass
677
		# raise ValidationError("You're chatting too much, calm down a bit!")
678
679
680
class TornadoHandler(WebSocketHandler, MessagesHandler):
681
682
	def __init__(self, *args, **kwargs):
683
		super(TornadoHandler, self).__init__(*args, **kwargs)
684
		self.connected = False
685
		self.anti_spam = AntiSpam()
686
687
	def data_received(self, chunk):
688
		pass
689
690
	def on_message(self, json_message):
691
		try:
692
			if not self.connected:
693
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
694
			if not json_message:
695
				raise ValidationError('Skipping null message')
696
			# self.anti_spam.check_spam(json_message)
697
			self.logger.debug('<< %s', json_message)
698
			message = json.loads(json_message)
699
			if message[VarNames.EVENT] not in self.pre_process_message:
700
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
701
			channel = message.get(VarNames.CHANNEL)
702
			if channel and channel not in self.channels:
703
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
704
			self.pre_process_message[message[VarNames.EVENT]](message)
705
		except ValidationError as e:
706
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
707
			self.safe_write(error_message)
708
709
	def on_close(self):
710
		if self.async_redis.subscribed:
711
			self.logger.info("Close event, unsubscribing from %s", self.channels)
712
			self.async_redis.unsubscribe(self.channels)
713
		else:
714
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
715
		log_data = {}
716
		gone_offline = False
717
		for channel in self.channels:
718
			if not isinstance(channel, int):
719
				continue
720
			self.sync_redis.hdel(channel, self.id)
721
			if self.connected:
722
				# seems like async solves problem with connection lost and wrong data status
723
				# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
724
				online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
725
				log_data[channel] = {'online': online, 'is_online': is_online}
726
				if not is_online:
727
					message = self.room_online(online, Actions.LOGOUT, channel)
728
					self.publish(message, channel)
729
					gone_offline = True
730
		if gone_offline:
731
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
732
			self.logger.info("Updated %s last read message", res)
733
734
		self.logger.info("Close connection result: %s", json.dumps(log_data))
735
		self.async_redis.disconnect()
736
737
	def open(self):
738
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
739
		if sessionStore.exists(session_key):
740
			self.ip = self.get_client_ip()
741
			session = SessionStore(session_key)
742
			self.user_id = int(session["_auth_user_id"])
743
			log_params = {
744
				'user_id': str(self.user_id).zfill(3),
745
				'id': self.id,
746
				'ip': self.ip
747
			}
748
			self._logger = logging.LoggerAdapter(base_logger, log_params)
749
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
750
			self.async_redis.connect()
751
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
752
			self.sender_name = user_db.username
753
			self.sex = user_db.sex_str
754
			user_rooms = self.get_users_in_current_user_rooms()
755
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
756
			# get all missed messages
757
			self.channels.clear()
758
			self.channels.append(self.channel)
759
			for room_id in user_rooms:
760
				self.channels.append(room_id)
761
			self.listen(self.channels)
762
			off_messages = self.get_offline_messages()
763
			for room_id in user_rooms:
764
				self.add_online_user(room_id, off_messages.get(room_id))
765
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
766
			self.connected = True
767
			Thread(target=self.save_ip).start()
768
		else:
769
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
770
			self.close(403, "Session key %s has been rejected" % session_key)
771
772
	def check_origin(self, origin):
773
		"""
774
		check whether browser set domain matches origin
775
		"""
776
		parsed_origin = urlparse(origin)
777
		origin = parsed_origin.netloc
778
		origin_domain = origin.split(':')[0].lower()
779
		browser_set = self.request.headers.get("Host")
780
		browser_domain = browser_set.split(':')[0]
781
		return browser_domain == origin_domain
782
783
	def safe_write(self, message):
784
		"""
785
		Tries to send message, doesn't throw exception outside
786
		:type self: MessagesHandler
787
		"""
788
		# self.logger.debug('<< THREAD %s >>', os.getppid())
789
		try:
790
			if isinstance(message, dict):
791
				message = json.dumps(message)
792
			if not isinstance(message, str_type):
793
				raise ValueError('Wrong message type : %s' % str(message))
794
			self.logger.debug(">> %s", message)
795
			self.write_message(message)
796
		except tornado.websocket.WebSocketClosedError as e:
797
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
798
799
	def get_client_ip(self):
800
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
801