Completed
Push — master ( d1e052...03b8fe )
by Andrew
01:16
created

MessagesHandler.update_room()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 5
rs 9.4285
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import datetime
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.ioloop
12
import tornado.web
13
import tornado.websocket
14
import tornadoredis
15
from django.conf import settings
16
from django.core.exceptions import ValidationError
17
from django.db import connection, OperationalError, InterfaceError, IntegrityError
18
from django.db.models import Q, F
19
from redis_sessions.session import SessionStore
20
from tornado.websocket import WebSocketHandler
21
22
from chat.utils import extract_photo
23
24
try:
25
	from urllib.parse import urlparse  # py2
26
except ImportError:
27
	from urlparse import urlparse  # py3
28
29
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM
30
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo, RoomUsers
31
32
PY3 = sys.version > '3'
33
34
api_url = getattr(settings, "IP_API_URL", None)
35
36
sessionStore = SessionStore()
37
38
logger = logging.getLogger(__name__)
39
40
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
41
#CONNECTION_POOL = tornadoredis.ConnectionPool(
42
#	max_connections=500,
43
#	wait_for_available=True)
44
45
46
class Actions:
47
	LOGIN = 'addOnlineUser'
48
	LOGOUT = 'removeOnlineUser'
49
	SEND_MESSAGE = 'sendMessage'
50
	PRINT_MESSAGE = 'printMessage'
51
	CALL = 'call'
52
	ROOMS = 'setRooms'
53
	REFRESH_USER = 'setOnlineUsers'
54
	GROWL_MESSAGE = 'growl'
55
	GET_MESSAGES = 'loadMessages'
56
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
57
	DELETE_ROOM = 'deleteRoom'
58
	EDIT_MESSAGE = 'editMessage'
59
	DELETE_MESSAGE = 'deleteMessage'
60
	CREATE_ROOM_CHANNEL = 'addRoom'
61
	INVITE_USER = 'inviteUser'
62
	ADD_USER = 'addUserToAll'
63
	OFFLINE_MESSAGES = 'loadOfflineMessages'
64
65
66
class VarNames:
67
	CALL_TYPE = 'type'
68
	USER = 'user'
69
	USER_ID = 'userId'
70
	TIME = 'time'
71
	CONTENT = 'content'
72
	IMG = 'image'
73
	EVENT = 'action'
74
	MESSAGE_ID = 'id'
75
	GENDER = 'sex'
76
	ROOM_NAME = 'name'
77
	ROOM_ID = 'roomId'
78
	ROOM_USERS = 'users'
79
	CHANNEL = 'channel'
80
	GET_MESSAGES_COUNT = 'count'
81
	GET_MESSAGES_HEADER_ID = 'headerId'
82
	CHANNEL_NAME = 'channel'
83
	IS_ROOM_PRIVATE = 'private'
84
	#ROOM_NAME = 'roomName'
85
	# ROOM_ID = 'roomId'
86
87
88
class CallType:
89
	OFFER = 'offer'
90
91
class HandlerNames:
92
	NAME = 'handler'
93
	CHANNELS = 'channels'
94
	CHAT = 'chat'
95
	GROWL = 'growl'
96
	WEBRTC = 'webrtc'
97
	FILE = 'file'
98
99
100
class RedisPrefix:
101
	USER_ID_CHANNEL_PREFIX = 'u'
102
	__ROOM_ONLINE__ = 'o:{}'
103
104
	@classmethod
105
	def generate_user(cls, key):
106
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
107
108
RedisPrefix.DEFAULT_CHANNEL = ALL_ROOM_ID
109
110
111
class MessagesCreator(object):
112
113
	def __init__(self, *args, **kwargs):
114
		super(MessagesCreator, self).__init__(*args, **kwargs)
115
		self.sex = None
116
		self.sender_name = None
117
		self.user_id = 0  # anonymous by default
118
119
	def default(self, content, event, handler):
120
		"""
121
		:return: {"action": event, "content": content, "time": "20:48:57"}
122
		"""
123
		return {
124
			VarNames.EVENT: event,
125
			VarNames.CONTENT: content,
126
			VarNames.USER_ID: self.user_id,
127
			VarNames.TIME: get_milliseconds(),
128
			HandlerNames.NAME: handler
129
		}
130
131
	def room_online(self, online, event, channel):
132
		"""
133
		:return: {"action": event, "content": content, "time": "20:48:57"}
134
		"""
135
		room_less = self.default(online, event, HandlerNames.CHAT)
136
		room_less[VarNames.CHANNEL_NAME] = channel
137
		room_less[VarNames.USER] = self.sender_name
138
		room_less[VarNames.GENDER] = self.sex
139
		return room_less
140
141
	def offer_call(self, content, message_type):
142
		"""
143
		:return: {"action": "call", "content": content, "time": "20:48:57"}
144
		"""
145
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
146
		message[VarNames.CALL_TYPE] = message_type
147
		message[VarNames.USER] = self.sender_name
148
		return message
149
150
	@classmethod
151
	def create_message(cls, message):
152
		res = {
153
			VarNames.USER_ID: message.sender_id,
154
			VarNames.CONTENT: message.content,
155
			VarNames.TIME: message.time,
156
			VarNames.MESSAGE_ID: message.id,
157
		}
158
		if message.img.name:
159
			res[VarNames.IMG] = message.img.url
160
		return res
161
162
	@classmethod
163
	def create_send_message(cls, message, event=Actions.PRINT_MESSAGE):
164
		"""
165
		:param message:
166
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
167
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
168
		"""
169
		res = cls.create_message(message)
170
		res[VarNames.EVENT] = event
171
		res[VarNames.CHANNEL] = message.room_id
172
		res[HandlerNames.NAME] = HandlerNames.CHAT
173
		return res
174
175
	@classmethod
176
	def get_messages(cls, messages, channel):
177
		"""
178
		:type messages: list[Messages]
179
		:type channel: str
180
		:type messages: QuerySet[Messages]
181
		"""
182
		return {
183
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
184
			VarNames.EVENT: Actions.GET_MESSAGES,
185
			VarNames.CHANNEL: channel,
186
			HandlerNames.NAME: HandlerNames.CHAT
187
		}
188
189
	@property
190
	def stored_redis_user(self):
191
		return  self.user_id
192
193
	@property
194
	def channel(self):
195
		return RedisPrefix.generate_user(self.user_id)
196
197
	def subscribe_direct_channel_message(self, room_id, other_user_id):
198
		return {
199
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
200
			VarNames.ROOM_ID: room_id,
201
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
202
			HandlerNames.NAME: HandlerNames.CHANNELS
203
		}
204
205
	def subscribe_room_channel_message(self, room_id, room_name):
206
		return {
207
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
208
			VarNames.ROOM_ID: room_id,
209
			VarNames.ROOM_USERS: [self.user_id],
210
			HandlerNames.NAME: HandlerNames.CHANNELS,
211
			VarNames.ROOM_NAME: room_name
212
		}
213
214
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
215
		return {
216
			VarNames.EVENT: Actions.INVITE_USER,
217
			VarNames.ROOM_ID: room_id,
218
			VarNames.USER_ID: user_id,
219
			HandlerNames.NAME: HandlerNames.CHANNELS,
220
			VarNames.ROOM_NAME: room_name,
221
			VarNames.CONTENT: users
222
		}
223
224
	def add_user_to_room(self, channel, user_id, content):
225
		return {
226
			VarNames.EVENT: Actions.ADD_USER,
227
			VarNames.CHANNEL: channel,
228
			VarNames.USER_ID: user_id,
229
			HandlerNames.NAME: HandlerNames.CHAT,
230
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
231
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
232
		}
233
234
	def unsubscribe_direct_message(self, room_id):
235
		return {
236
			VarNames.EVENT: Actions.DELETE_ROOM,
237
			VarNames.ROOM_ID: room_id,
238
			VarNames.USER_ID: self.user_id,
239
			HandlerNames.NAME: HandlerNames.CHANNELS,
240
			VarNames.TIME: get_milliseconds()
241
		}
242
243
	def load_offline_message(self, offline_messages, channel_key):
244
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
245
		res[VarNames.CHANNEL] = channel_key
246
		return res
247
248
249
class MessagesHandler(MessagesCreator):
250
251
	def __init__(self, *args, **kwargs):
252
		self.parsable_prefix = 'p'
253
		super(MessagesHandler, self).__init__(*args, **kwargs)
254
		self.id = id(self)
255
		self.log_id = str(self.id % 10000).rjust(4, '0')
256
		self.ip = None
257
		from chat import global_redis
258
		self.async_redis_publisher = global_redis.async_redis_publisher
259
		self.sync_redis = global_redis.sync_redis
260
		self.channels = []
261
		self.call_receiver_channel = None
262
		self.logger = None
263
		self.async_redis = tornadoredis.Client()
264
		self.pre_process_message = {
265
			Actions.GET_MESSAGES: self.process_get_messages,
266
			Actions.SEND_MESSAGE: self.process_send_message,
267
			Actions.CALL: self.process_call,
268
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
269
			Actions.DELETE_ROOM: self.delete_channel,
270
			Actions.EDIT_MESSAGE: self.edit_message,
271
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
272
			Actions.INVITE_USER: self.invite_user,
273
		}
274
		self.post_process_message = {
275
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
276
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
277
			Actions.DELETE_ROOM: self.send_client_delete_channel,
278
			Actions.INVITE_USER: self.send_client_new_channel,
279
			Actions.CALL: self.set_opponent_call_channel
280
		}
281
282
	@tornado.gen.engine
283
	def listen(self, channels):
284
		yield tornado.gen.Task(
285
			self.async_redis.subscribe, channels)
286
		self.async_redis.listen(self.new_message)
287
288
	@tornado.gen.engine
289
	def add_channel(self, channel):
290
		self.channels.append(channel)
291
		yield tornado.gen.Task(
292
			self.async_redis.subscribe, (channel,))
293
294
	def do_db(self, callback, *args, **kwargs):
295
		try:
296
			return callback(*args, **kwargs)
297
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
298
			self.logger.warning('%s, reconnecting' % e)  # TODO
299
			connection.close()
300
			return callback(*args, **kwargs)
301
302
	def execute_query(self, query, *args, **kwargs):
303
		cursor = connection.cursor()
304
		cursor.execute(query, *args, **kwargs)
305
		return cursor.fetchall()
306
307
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
308
		"""
309
		:rtype : dict
310
		returns (dict, bool) if check_type is present
311
		"""
312
		online = self.sync_redis.hgetall(channel)
313
		self.logger.debug('!! channel %s redis online: %s', channel, online)
314
		result = set()
315
		user_is_online = False
316
		# redis stores REDIS_USER_FORMAT, so parse them
317
		if online:
318
			for key_hash, raw_user_id in online.items():  # py2 iteritems
319
				user_id = int(raw_user_id.decode('utf-8'))
320
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
321
					user_is_online = True
322
				result.add(user_id)
323
		result = list(result)
324
		return (result, user_is_online) if check_user_id else result
325
326
	def add_online_user(self, room_id, offline_messages=None):
327
		"""
328
		adds to redis
329
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
330
		:return:
331
		"""
332
		online = self.get_online_from_redis(room_id)
333
		self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user)
334
		if self.user_id not in online:  # if a new tab has been opened
335
			online.append(self.user_id)
336
			online_user_names_mes = self.room_online(
337
				online,
338
				Actions.LOGIN,
339
				room_id
340
			)
341
			self.logger.info('!! First tab, sending refresh online for all')
342
			self.publish(online_user_names_mes, room_id)
343
			if offline_messages:
344
				self.safe_write(self.load_offline_message(offline_messages, room_id))
345
		else:  # Send user names to self
346
			online_user_names_mes = self.room_online(
347
				online,
348
				Actions.REFRESH_USER,
349
				room_id
350
			)
351
			self.logger.info('!! Second tab, retrieving online for self')
352
			self.safe_write(online_user_names_mes)
353
354
	def publish(self, message, channel, parsable=False):
355
		jsoned_mess = json.dumps(message)
356
		self.logger.debug('<%s> %s', channel, jsoned_mess)
357
		if parsable:
358
			jsoned_mess = self.encode(jsoned_mess)
359
		self.async_redis_publisher.publish(channel, jsoned_mess)
360
361
	def encode(self, message):
362
		"""
363
		Marks message with prefix to specify that
364
		it should be decoded and proccesed before sending to client
365
		@param message: message to mark
366
		@return: marked message
367
		"""
368
		return self.parsable_prefix + message
369
370
	def decode(self, message):
371
		"""
372
		Check if message should be proccessed by server before writing to client
373
		@param message: message to check
374
		@return: Object structure of message if it should be processed, None if not
375
		"""
376
		if message.startswith(self.parsable_prefix):
377
			return json.loads(message[1:])
378
379
	def new_message(self, message):
380
		data = message.body
381
		if type(data) is not int:  # subscribe event
382
			decoded = self.decode(data)
383
			if decoded:
384
				data = decoded
385
			self.safe_write(data)
386
			if decoded:
387
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
388
389
	def safe_write(self, message):
390
		raise NotImplementedError('WebSocketHandler implements')
391
392
	def process_send_message(self, message):
393
		"""
394
		:type message: dict
395
		"""
396
		channel = message[VarNames.CHANNEL]
397
		message_db = Message(
398
			sender_id=self.user_id,
399
			content=message[VarNames.CONTENT]
400
		)
401
		message_db.room_id = channel
402
		if VarNames.IMG in message:
403
			message_db.img = extract_photo(message[VarNames.IMG])
404
		self.do_db(message_db.save)  # exit on hacked id with exception
405
		prepared_message = self.create_send_message(message_db)
406
		self.publish(prepared_message, channel)
407
408
	def process_call(self, in_message):
409
		"""
410
		:type in_message: dict
411
		"""
412
		call_type = in_message.get(VarNames.CALL_TYPE)
413
		set_opponent_channel = False
414
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
415
		if call_type == CallType.OFFER:
416
			room_id = in_message[VarNames.CHANNEL]
417
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
418
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
419
			set_opponent_channel = True
420
			out_message[VarNames.CHANNEL] = room_id
421
		# TODO
422
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
423
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
424
425
	def create_new_room(self, message):
426
		room_name = message[VarNames.ROOM_NAME]
427
		if not room_name or len(room_name) > 16:
428
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
429
		room = Room(name=room_name)
430
		self.do_db(room.save)
431
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
432
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
433
		self.publish(subscribe_message, self.channel, True)
434
435
	def invite_user(self, message):
436
		room_id = message[VarNames.ROOM_ID]
437
		user_id = message[VarNames.USER_ID]
438
		if room_id not in self.channels:
439
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
440
		room = self.do_db(Room.objects.get, id=room_id)
441
		if room.is_private:
442
			raise ValidationError("You can't add users to direct room, create a new room instead")
443
		try:
444
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
445
		except IntegrityError:
446
			raise ValidationError("User is already in channel")
447
		users_in_room = {}
448
		for user in room.users.all():
449
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
450
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
451
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
452
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
453
454
	def create_self_room(self, user_rooms):
455
		rooms_ids = list([room['room_id'] for room in user_rooms])
456
		query_res = self.execute_query(SELECT_SELF_ROOM, [rooms_ids,])
457 View Code Duplication
		if len(query_res) > 0:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
458
			room = query_res[0]
459
			room_id = room[0]
460
			self.update_room(room_id, room[1])
461
		else:
462
			room = Room()
463
			room.save()
464
			room_id = room.id
465
			RoomUsers(user_id=self.user_id, room_id=room_id).save()
466
		return room_id
467
468
	def create_other_room(self, user_rooms, user_id):
469
		query_res = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms).values('room__id', 'room__disabled')
470 View Code Duplication
		if len(query_res) > 0:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471
			room = query_res[0]
472
			room_id = room['room__id']
473
			self.update_room(room_id, room['room__disabled'])
474
		else:
475
			room = Room()
476
			room.save()
477
			room_id = room.id
478
			RoomUsers.objects.bulk_create([
479
				RoomUsers(user_id=user_id, room_id=room_id),
480
				RoomUsers(user_id=self.user_id, room_id=room_id),
481
			])
482
		return room_id
483
484
	def update_room(self, room_id, disabled):
485
		if not disabled:
486
			raise ValidationError('This room already exist')
487
		else:
488
			Room.objects.filter(id=room_id).update(disabled=False)
489
490
	def create_user_channel(self, message):
491
		user_id = message[VarNames.USER_ID]
492
		# get all self private rooms ids
493
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
494
		# get private room that contains another user from rooms above
495
		if self.user_id == user_id:
496
			room_id = self.create_self_room(user_rooms)
497
		else:
498
			room_id = self.create_other_room(user_rooms, user_id)
499
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
500
		self.publish(subscribe_message, self.channel, True)
501
		other_channel = RedisPrefix.generate_user(user_id)
502
		if self.channel != other_channel:
503
			self.publish(subscribe_message, other_channel, True)
504
505
	def delete_channel(self, message):
506
		room_id = message[VarNames.ROOM_ID]
507
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
508
			raise ValidationError('You are not allowed to exit this room')
509
		room = self.do_db(Room.objects.get, id=room_id)
510
		if room.disabled:
511
			raise ValidationError('Room is already deleted')
512
		if room.name is None:  # if private then disable
513
			room.disabled = True
514
		else: # if public -> leave the room, delete the link
515
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
516
			online = self.get_online_from_redis(room_id)
517
			online.remove(self.user_id)
518
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
519
		room.save()
520
		message = self.unsubscribe_direct_message(room_id)
521
		self.publish(message, room_id, True)
522
523
	def edit_message(self, data):
524
		message_id = data[VarNames.MESSAGE_ID]
525
		message = Message.objects.get(id=message_id)
526
		if message.sender_id != self.user_id:
527
			raise ValidationError("You can only edit your messages")
528
		if message.time + 60000 < get_milliseconds():
529
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
530
		if message.deleted:
531
			raise ValidationError("Already deleted")
532
		message.content = data[VarNames.CONTENT]
533
		selector = Message.objects.filter(id=message_id)
534
		if message.content is None:
535
			selector.update(deleted=True)
536
			action = Actions.DELETE_MESSAGE
537
		else:
538
			action = Actions.EDIT_MESSAGE
539
			selector.update(content=message.content)
540
		self.publish(self.create_send_message(message, action), message.room_id)
541
542
	def send_client_new_channel(self, message):
543
		room_id = message[VarNames.ROOM_ID]
544
		self.add_channel(room_id)
545
		self.add_online_user(room_id)# TODO doesnt work if already subscribed
546
547
	def set_opponent_call_channel(self, message):
548
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
549
550
	def send_client_delete_channel(self, message):
551
		room_id = message[VarNames.ROOM_ID]
552
		self.async_redis.unsubscribe((room_id,))
553
		self.async_redis_publisher.hdel(room_id, self.id)
554
		self.channels.remove(room_id)
555
556
	def process_get_messages(self, data):
557
		"""
558
		:type data: dict
559
		"""
560
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
561
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
562
		room_id = data[VarNames.CHANNEL]
563
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
564
		if header_id is None:
565
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
566
		else:
567
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
568
		response = self.do_db(self.get_messages, messages, room_id)
569
		self.safe_write(response)
570
571
	def get_offline_messages(self):
572
		res = {}
573
		offline_messages = Message.objects.filter(
574
			id__gt=F('room__roomusers__last_read_message_id'),
575
			deleted=False,
576
			room__roomusers__user_id=self.user_id
577
		)
578
		for message in offline_messages:
579
			res.setdefault(message.room_id, []).append(self.create_message(message))
580
		return res
581
582
	def get_users_in_current_user_rooms(self):
583
		"""
584
		{
585
			"ROOM_ID:1": {
586
				"name": "All",
587
				"users": {
588
					"USER_ID:admin": {
589
						"name": "USER_NAME:admin",
590
						"sex": "SEX:Secret"
591
					},
592
					"USER_ID_2": {
593
						"name": "USER_NAME:Mike",
594
						"sex": "Male"
595
					}
596
				},
597
				"isPrivate": true
598
			}
599
		}
600
		"""
601
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
602
		res = {room['id']: {
603
				VarNames.ROOM_NAME: room['name'],
604
				VarNames.ROOM_USERS: {}
605
			} for room in user_rooms}
606
		room_ids = (room_id for room_id in res)
607
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
608
		for user in rooms_users:
609
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
610
		return res
611
612
	def set_js_user_structure(self, user_dict, user_id, name, sex):
613
		user_dict[user_id] = {
614
			VarNames.USER: name,
615
			VarNames.GENDER: GENDERS[sex]
616
		}
617
618
	def save_ip(self):
619
		if (self.do_db(UserJoinedInfo.objects.filter(
620
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
621
			return
622
		ip_address = self.get_or_create_ip()
623
		UserJoinedInfo.objects.create(
624
			ip=ip_address,
625
			user_id=self.user_id
626
		)
627
628
	def get_or_create_ip(self):
629
		try:
630
			ip_address = IpAddress.objects.get(ip=self.ip)
631
		except IpAddress.DoesNotExist:
632
			try:
633
				if not api_url:
634
					raise Exception('api url is absent')
635
				self.logger.debug("Creating ip record %s", self.ip)
636
				f = urlopen(api_url % self.ip)
637
				raw_response = f.read().decode("utf-8")
638
				response = json.loads(raw_response)
639
				if response['status'] != "success":
640
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
641
				ip_address = IpAddress.objects.create(
642
					ip=self.ip,
643
					isp=response['isp'],
644
					country=response['country'],
645
					region=response['regionName'],
646
					city=response['city'],
647
					country_code=response['countryCode']
648
				)
649
			except Exception as e:
650
				self.logger.error("Error while creating ip with country info, because %s", e)
651
				ip_address = IpAddress.objects.create(ip=self.ip)
652
		return ip_address
653
654
655
class AntiSpam(object):
656
657
	def __init__(self):
658
		self.spammed = 0
659
		self.info = {}
660
661
	def check_spam(self, json_message):
662
		message_length = len(json_message)
663
		info_key = int(round(time.time() * 100))
664
		self.info[info_key] = message_length
665
		if message_length > MAX_MESSAGE_SIZE:
666
			self.spammed += 1
667
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
668
		self.check_timed_spam()
669
670
	def check_timed_spam(self):
671
		# TODO implement me
672
		pass
673
		# raise ValidationError("You're chatting too much, calm down a bit!")
674
675
676
class TornadoHandler(WebSocketHandler, MessagesHandler):
677
678
	def __init__(self, *args, **kwargs):
679
		super(TornadoHandler, self).__init__(*args, **kwargs)
680
		self.connected = False
681
		self.anti_spam = AntiSpam()
682
683
	def data_received(self, chunk):
684
		pass
685
686
	def on_message(self, json_message):
687
		try:
688
			if not self.connected:
689
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
690
			if not json_message:
691
				raise ValidationError('Skipping null message')
692
			# self.anti_spam.check_spam(json_message)
693
			self.logger.debug('<< %s', json_message)
694
			message = json.loads(json_message)
695
			if message[VarNames.EVENT] not in self.pre_process_message:
696
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
697
			channel = message.get(VarNames.CHANNEL)
698
			if channel and channel not in self.channels:
699
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
700
			self.pre_process_message[message[VarNames.EVENT]](message)
701
		except ValidationError as e:
702
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
703
			self.safe_write(error_message)
704
705
	def on_close(self):
706
		if self.async_redis.subscribed:
707
			self.logger.info("Close event, unsubscribing from %s", self.channels)
708
			self.async_redis.unsubscribe(self.channels)
709
		else:
710
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
711
		log_data = {}
712
		gone_offline = False
713
		for channel in self.channels:
714
			if not isinstance(channel, int):
715
				continue
716
			self.sync_redis.hdel(channel, self.id)
717
			if self.connected:
718
				# seems like async solves problem with connection lost and wrong data status
719
				# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
720
				online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
721
				log_data[channel] = {'online': online, 'is_online': is_online}
722
				if not is_online:
723
					message = self.room_online(online, Actions.LOGOUT, channel)
724
					self.publish(message, channel)
725
					gone_offline = True
726
		if gone_offline:
727
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
728
			logger.info("Updated %s last read message", res)
729
730
		self.logger.info("Close connection result: %s", json.dumps(log_data))
731
		self.async_redis.disconnect()
732
733
	def open(self):
734
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
735
		if sessionStore.exists(session_key):
736
			self.ip = self.get_client_ip()
737
			session = SessionStore(session_key)
738
			self.user_id = int(session["_auth_user_id"])
739
			log_params = {
740
				'user_id': str(self.user_id).zfill(3),
741
				'id': self.log_id,
742
				'ip': self.ip
743
			}
744
			self.logger = logging.LoggerAdapter(logger, log_params)
745
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
746
			self.async_redis.connect()
747
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
748
			self.sender_name = user_db.username
749
			self.sex = user_db.sex_str
750
			user_rooms = self.get_users_in_current_user_rooms()
751
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
752
			# get all missed messages
753
			self.channels.clear()
754
			self.channels.append(self.channel)
755
			for room_id in user_rooms:
756
				self.channels.append(room_id)
757
			self.listen(self.channels)
758
			off_messages = self.get_offline_messages()
759
			for room_id in user_rooms:
760
				self.add_online_user(room_id, off_messages.get(room_id))
761
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
762
			self.connected = True
763
			Thread(target=self.save_ip).start()
764
		else:
765
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
766
			self.close(403, "Session key %s has been rejected" % session_key)
767
768
	def check_origin(self, origin):
769
		"""
770
		check whether browser set domain matches origin
771
		"""
772
		parsed_origin = urlparse(origin)
773
		origin = parsed_origin.netloc
774
		origin_domain = origin.split(':')[0].lower()
775
		browser_set = self.request.headers.get("Host")
776
		browser_domain = browser_set.split(':')[0]
777
		return browser_domain == origin_domain
778
779
	def safe_write(self, message):
780
		"""
781
		Tries to send message, doesn't throw exception outside
782
		:type self: MessagesHandler
783
		"""
784
		# self.logger.debug('<< THREAD %s >>', os.getppid())
785
		try:
786
			if isinstance(message, dict):
787
				message = json.dumps(message)
788
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
789
				raise ValueError('Wrong message type : %s' % str(message))
790
			self.logger.debug(">> %s", message)
791
			self.write_message(message)
792
		except tornado.websocket.WebSocketClosedError as e:
793
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
794
795
	def get_client_ip(self):
796
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
797