Completed
Push — master ( fe916f...6a326a )
by Andrew
01:31
created

TornadoHandler.disconnect()   A

Complexity

Conditions 3

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 11
rs 9.4285
1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from threading import Thread
7
from urllib.request import urlopen
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.ioloop
12
import tornado.web
13
import tornado.websocket
14
import tornadoredis
15
from django.conf import settings
16
from django.core.exceptions import ValidationError
17
from django.db import connection, OperationalError, InterfaceError, IntegrityError
18
from django.db.models import Q, F
19
from redis_sessions.session import SessionStore
20
from tornado import ioloop
21
from tornado.websocket import WebSocketHandler
22
23
from chat.utils import extract_photo
24
25
try:
26
	from urllib.parse import urlparse  # py2
27
except ImportError:
28
	from urlparse import urlparse  # py3
29
30
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM
31
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo, RoomUsers
32
33
PY3 = sys.version > '3'
34
str_type = str if PY3 else basestring
35
api_url = getattr(settings, "IP_API_URL", None)
36
37
sessionStore = SessionStore()
38
39
base_logger = logging.getLogger(__name__)
40
41
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
42
#CONNECTION_POOL = tornadoredis.ConnectionPool(
43
#	max_connections=500,
44
#	wait_for_available=True)
45
46
47
class Actions(object):
48
	LOGIN = 'addOnlineUser'
49
	LOGOUT = 'removeOnlineUser'
50
	SEND_MESSAGE = 'sendMessage'
51
	PRINT_MESSAGE = 'printMessage'
52
	CALL = 'call'
53
	ROOMS = 'setRooms'
54
	REFRESH_USER = 'setOnlineUsers'
55
	GROWL_MESSAGE = 'growl'
56
	GET_MESSAGES = 'loadMessages'
57
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
58
	DELETE_ROOM = 'deleteRoom'
59
	EDIT_MESSAGE = 'editMessage'
60
	DELETE_MESSAGE = 'deleteMessage'
61
	CREATE_ROOM_CHANNEL = 'addRoom'
62
	INVITE_USER = 'inviteUser'
63
	ADD_USER = 'addUserToAll'
64
	OFFLINE_MESSAGES = 'loadOfflineMessages'
65
66
67
class VarNames(object):
68
	CALL_TYPE = 'type'
69
	USER = 'user'
70
	USER_ID = 'userId'
71
	TIME = 'time'
72
	CONTENT = 'content'
73
	IMG = 'image'
74
	EVENT = 'action'
75
	MESSAGE_ID = 'id'
76
	GENDER = 'sex'
77
	ROOM_NAME = 'name'
78
	ROOM_ID = 'roomId'
79
	ROOM_USERS = 'users'
80
	CHANNEL = 'channel'
81
	GET_MESSAGES_COUNT = 'count'
82
	GET_MESSAGES_HEADER_ID = 'headerId'
83
	CHANNEL_NAME = 'channel'
84
	IS_ROOM_PRIVATE = 'private'
85
	#ROOM_NAME = 'roomName'
86
	# ROOM_ID = 'roomId'
87
88
89
class CallType(object):
90
	OFFER = 'offer'
91
92
class HandlerNames:
93
	NAME = 'handler'
94
	CHANNELS = 'channels'
95
	CHAT = 'chat'
96
	GROWL = 'growl'
97
	WEBRTC = 'webrtc'
98
	FILE = 'file'
99
100
101
class RedisPrefix:
102
	USER_ID_CHANNEL_PREFIX = 'u'
103
	__ROOM_ONLINE__ = 'o:{}'
104
105
	@classmethod
106
	def generate_user(cls, key):
107
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
108
109
RedisPrefix.DEFAULT_CHANNEL = ALL_ROOM_ID
110
111
112
class MessagesCreator(object):
113
114
	def __init__(self, *args, **kwargs):
115
		super(MessagesCreator, self).__init__(*args, **kwargs)
116
		self.sex = None
117
		self.sender_name = None
118
		self.user_id = 0  # anonymous by default
119
120
	def default(self, content, event, handler):
121
		"""
122
		:return: {"action": event, "content": content, "time": "20:48:57"}
123
		"""
124
		return {
125
			VarNames.EVENT: event,
126
			VarNames.CONTENT: content,
127
			VarNames.USER_ID: self.user_id,
128
			VarNames.TIME: get_milliseconds(),
129
			HandlerNames.NAME: handler
130
		}
131
132
	def room_online(self, online, event, channel):
133
		"""
134
		:return: {"action": event, "content": content, "time": "20:48:57"}
135
		"""
136
		room_less = self.default(online, event, HandlerNames.CHAT)
137
		room_less[VarNames.CHANNEL_NAME] = channel
138
		room_less[VarNames.USER] = self.sender_name
139
		room_less[VarNames.GENDER] = self.sex
140
		return room_less
141
142
	def offer_call(self, content, message_type):
143
		"""
144
		:return: {"action": "call", "content": content, "time": "20:48:57"}
145
		"""
146
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
147
		message[VarNames.CALL_TYPE] = message_type
148
		message[VarNames.USER] = self.sender_name
149
		return message
150
151
	@classmethod
152
	def create_message(cls, message):
153
		res = {
154
			VarNames.USER_ID: message.sender_id,
155
			VarNames.CONTENT: message.content,
156
			VarNames.TIME: message.time,
157
			VarNames.MESSAGE_ID: message.id,
158
		}
159
		if message.img.name:
160
			res[VarNames.IMG] = message.img.url
161
		return res
162
163
	@classmethod
164
	def create_send_message(cls, message, event=Actions.PRINT_MESSAGE):
165
		"""
166
		:param message:
167
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
168
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
169
		"""
170
		res = cls.create_message(message)
171
		res[VarNames.EVENT] = event
172
		res[VarNames.CHANNEL] = message.room_id
173
		res[HandlerNames.NAME] = HandlerNames.CHAT
174
		return res
175
176
	@classmethod
177
	def get_messages(cls, messages, channel):
178
		"""
179
		:type messages: list[Messages]
180
		:type channel: str
181
		:type messages: QuerySet[Messages]
182
		"""
183
		return {
184
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
185
			VarNames.EVENT: Actions.GET_MESSAGES,
186
			VarNames.CHANNEL: channel,
187
			HandlerNames.NAME: HandlerNames.CHAT
188
		}
189
190
	@property
191
	def stored_redis_user(self):
192
		return  self.user_id
193
194
	@property
195
	def channel(self):
196
		return RedisPrefix.generate_user(self.user_id)
197
198
	def subscribe_direct_channel_message(self, room_id, other_user_id):
199
		return {
200
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
201
			VarNames.ROOM_ID: room_id,
202
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
203
			HandlerNames.NAME: HandlerNames.CHANNELS
204
		}
205
206
	def subscribe_room_channel_message(self, room_id, room_name):
207
		return {
208
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
209
			VarNames.ROOM_ID: room_id,
210
			VarNames.ROOM_USERS: [self.user_id],
211
			HandlerNames.NAME: HandlerNames.CHANNELS,
212
			VarNames.ROOM_NAME: room_name
213
		}
214
215
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
216
		return {
217
			VarNames.EVENT: Actions.INVITE_USER,
218
			VarNames.ROOM_ID: room_id,
219
			VarNames.USER_ID: user_id,
220
			HandlerNames.NAME: HandlerNames.CHANNELS,
221
			VarNames.ROOM_NAME: room_name,
222
			VarNames.CONTENT: users
223
		}
224
225
	def add_user_to_room(self, channel, user_id, content):
226
		return {
227
			VarNames.EVENT: Actions.ADD_USER,
228
			VarNames.CHANNEL: channel,
229
			VarNames.USER_ID: user_id,
230
			HandlerNames.NAME: HandlerNames.CHAT,
231
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
232
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
233
		}
234
235
	def unsubscribe_direct_message(self, room_id):
236
		return {
237
			VarNames.EVENT: Actions.DELETE_ROOM,
238
			VarNames.ROOM_ID: room_id,
239
			VarNames.USER_ID: self.user_id,
240
			HandlerNames.NAME: HandlerNames.CHANNELS,
241
			VarNames.TIME: get_milliseconds()
242
		}
243
244
	def load_offline_message(self, offline_messages, channel_key):
245
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
246
		res[VarNames.CHANNEL] = channel_key
247
		return res
248
249
250
class MessagesHandler(MessagesCreator):
251
252
	def __init__(self, *args, **kwargs):
253
		self.parsable_prefix = 'p'
254
		super(MessagesHandler, self).__init__(*args, **kwargs)
255
		self.id = id(self)
256
		self.ip = None
257
		from chat import global_redis
258
		self.async_redis_publisher = global_redis.async_redis_publisher
259
		self.sync_redis = global_redis.sync_redis
260
		self.channels = []
261
		self.call_receiver_channel = None
262
		self._logger = None
263
		self.async_redis = tornadoredis.Client()
264
		self.pre_process_message = {
265
			Actions.GET_MESSAGES: self.process_get_messages,
266
			Actions.SEND_MESSAGE: self.process_send_message,
267
			Actions.CALL: self.process_call,
268
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
269
			Actions.DELETE_ROOM: self.delete_channel,
270
			Actions.EDIT_MESSAGE: self.edit_message,
271
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
272
			Actions.INVITE_USER: self.invite_user,
273
		}
274
		self.post_process_message = {
275
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
276
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
277
			Actions.DELETE_ROOM: self.send_client_delete_channel,
278
			Actions.INVITE_USER: self.send_client_new_channel,
279
			Actions.CALL: self.set_opponent_call_channel
280
		}
281
282
	@tornado.gen.engine
283
	def listen(self, channels):
284
		yield tornado.gen.Task(
285
			self.async_redis.subscribe, channels)
286
		self.async_redis.listen(self.new_message)
287
288
	@property
289
	def logger(self):
290
		return self._logger if self._logger else base_logger
291
292
	@tornado.gen.engine
293
	def add_channel(self, channel):
294
		self.channels.append(channel)
295
		yield tornado.gen.Task(
296
			self.async_redis.subscribe, (channel,))
297
298
	def do_db(self, callback, *args, **kwargs):
299
		try:
300
			return callback(*args, **kwargs)
301
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
302
			self.logger.warning('%s, reconnecting' % e)  # TODO
303
			connection.close()
304
			return callback(*args, **kwargs)
305
306
	def execute_query(self, query, *args, **kwargs):
307
		cursor = connection.cursor()
308
		cursor.execute(query, *args, **kwargs)
309
		desc = cursor.description
310
		return [
311
			dict(zip([col[0] for col in desc], row))
312
			for row in cursor.fetchall()
313
			]
314
315
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
316
		"""
317
		:rtype : dict
318
		returns (dict, bool) if check_type is present
319
		"""
320
		online = self.sync_redis.hgetall(channel)
321
		self.logger.debug('!! channel %s redis online: %s', channel, online)
322
		result = set()
323
		user_is_online = False
324
		# redis stores REDIS_USER_FORMAT, so parse them
325
		if online:
326
			for key_hash, raw_user_id in online.items():  # py2 iteritems
327
				user_id = int(raw_user_id.decode('utf-8'))
328
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
329
					user_is_online = True
330
				result.add(user_id)
331
		result = list(result)
332
		return (result, user_is_online) if check_user_id else result
333
334
	def add_online_user(self, room_id, offline_messages=None):
335
		"""
336
		adds to redis
337
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
338
		:return:
339
		"""
340
		self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user)
341
		# since we add user to online first, latest trigger will always show correct online
342
		online, is_online = self.get_online_from_redis(room_id, self.user_id, self.id)
343
		if not is_online:  # if a new tab has been opened
344
			online.append(self.user_id)
345
			online_user_names_mes = self.room_online(
346
				online,
347
				Actions.LOGIN,
348
				room_id
349
			)
350
			self.logger.info('!! First tab, sending refresh online for all')
351
			self.publish(online_user_names_mes, room_id)
352
			if offline_messages:
353
				self.safe_write(self.load_offline_message(offline_messages, room_id))
354
		else:  # Send user names to self
355
			online_user_names_mes = self.room_online(
356
				online,
357
				Actions.REFRESH_USER,
358
				room_id
359
			)
360
			self.logger.info('!! Second tab, retrieving online for self')
361
			self.safe_write(online_user_names_mes)
362
363
	def publish(self, message, channel, parsable=False):
364
		jsoned_mess = json.dumps(message)
365
		self.logger.debug('<%s> %s', channel, jsoned_mess)
366
		if parsable:
367
			jsoned_mess = self.encode(jsoned_mess)
368
		self.async_redis_publisher.publish(channel, jsoned_mess)
369
370
	def encode(self, message):
371
		"""
372
		Marks message with prefix to specify that
373
		it should be decoded and proccesed before sending to client
374
		@param message: message to mark
375
		@return: marked message
376
		"""
377
		return self.parsable_prefix + message
378
379
	def decode(self, message):
380
		"""
381
		Check if message should be proccessed by server before writing to client
382
		@param message: message to check
383
		@type message: str
384
		@return: Object structure of message if it should be processed, None if not
385
		"""
386
		if message.startswith(self.parsable_prefix):
387
			return json.loads(message[1:])
388
389
	def new_message(self, message):
390
		data = message.body
391
		if isinstance(data, str_type):  # subscribe event
392
			decoded = self.decode(data)
393
			if decoded:
394
				data = decoded
395
			self.safe_write(data)
396
			if decoded:
397
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
398
399
	def safe_write(self, message):
400
		raise NotImplementedError('WebSocketHandler implements')
401
402
	def process_send_message(self, message):
403
		"""
404
		:type message: dict
405
		"""
406
		channel = message[VarNames.CHANNEL]
407
		message_db = Message(
408
			sender_id=self.user_id,
409
			content=message[VarNames.CONTENT]
410
		)
411
		message_db.room_id = channel
412
		if VarNames.IMG in message:
413
			message_db.img = extract_photo(message[VarNames.IMG])
414
		self.do_db(message_db.save)  # exit on hacked id with exception
415
		prepared_message = self.create_send_message(message_db)
416
		self.publish(prepared_message, channel)
417
418
	def process_call(self, in_message):
419
		"""
420
		:type in_message: dict
421
		"""
422
		call_type = in_message.get(VarNames.CALL_TYPE)
423
		set_opponent_channel = False
424
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
425
		if call_type == CallType.OFFER:
426
			room_id = in_message[VarNames.CHANNEL]
427
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
428
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
429
			set_opponent_channel = True
430
			out_message[VarNames.CHANNEL] = room_id
431
		# TODO
432
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
433
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
434
435
	def create_new_room(self, message):
436
		room_name = message[VarNames.ROOM_NAME]
437
		if not room_name or len(room_name) > 16:
438
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
439
		room = Room(name=room_name)
440
		self.do_db(room.save)
441
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
442
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
443
		self.publish(subscribe_message, self.channel, True)
444
445
	def invite_user(self, message):
446
		room_id = message[VarNames.ROOM_ID]
447
		user_id = message[VarNames.USER_ID]
448
		if room_id not in self.channels:
449
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
450
		room = self.do_db(Room.objects.get, id=room_id)
451
		if room.is_private:
452
			raise ValidationError("You can't add users to direct room, create a new room instead")
453
		try:
454
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
455
		except IntegrityError:
456
			raise ValidationError("User is already in channel")
457
		users_in_room = {}
458
		for user in room.users.all():
459
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
460
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
461
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
462
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
463
464
	def create_room(self, user_rooms, user_id):
465
		if self.user_id == user_id:
466
			room_ids = list([room['room_id'] for room in user_rooms])
467
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
468
		else:
469
			rooms_query = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms)
470
			query_res = rooms_query.values('room__id', 'room__disabled')
471
		if len(query_res) > 0:
472
			room = query_res[0]
473
			room_id = room['room__id']
474
			self.update_room(room_id, room['room__disabled'])
475
		else:
476
			room = Room()
477
			room.save()
478
			room_id = room.id
479
			if self.user_id == user_id:
480
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
481
			else:
482
				RoomUsers.objects.bulk_create([
483
					RoomUsers(user_id=user_id, room_id=room_id),
484
					RoomUsers(user_id=self.user_id, room_id=room_id),
485
				])
486
		return room_id
487
488
	def update_room(self, room_id, disabled):
489
		if not disabled:
490
			raise ValidationError('This room already exist')
491
		else:
492
			Room.objects.filter(id=room_id).update(disabled=False)
493
494
	def create_user_channel(self, message):
495
		user_id = message[VarNames.USER_ID]
496
		# get all self private rooms ids
497
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
498
		# get private room that contains another user from rooms above
499
		room_id = self.create_room(user_rooms, user_id)
500
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
501
		self.publish(subscribe_message, self.channel, True)
502
		other_channel = RedisPrefix.generate_user(user_id)
503
		if self.channel != other_channel:
504
			self.publish(subscribe_message, other_channel, True)
505
506
	def delete_channel(self, message):
507
		room_id = message[VarNames.ROOM_ID]
508
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
509
			raise ValidationError('You are not allowed to exit this room')
510
		room = self.do_db(Room.objects.get, id=room_id)
511
		if room.disabled:
512
			raise ValidationError('Room is already deleted')
513
		if room.name is None:  # if private then disable
514
			room.disabled = True
515
		else: # if public -> leave the room, delete the link
516
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
517
			online = self.get_online_from_redis(room_id)
518
			online.remove(self.user_id)
519
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
520
		room.save()
521
		message = self.unsubscribe_direct_message(room_id)
522
		self.publish(message, room_id, True)
523
524
	def edit_message(self, data):
525
		message_id = data[VarNames.MESSAGE_ID]
526
		message = Message.objects.get(id=message_id)
527
		if message.sender_id != self.user_id:
528
			raise ValidationError("You can only edit your messages")
529
		if message.time + 60000 < get_milliseconds():
530
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
531
		if message.deleted:
532
			raise ValidationError("Already deleted")
533
		message.content = data[VarNames.CONTENT]
534
		selector = Message.objects.filter(id=message_id)
535
		if message.content is None:
536
			selector.update(deleted=True)
537
			action = Actions.DELETE_MESSAGE
538
		else:
539
			action = Actions.EDIT_MESSAGE
540
			selector.update(content=message.content)
541
		self.publish(self.create_send_message(message, action), message.room_id)
542
543
	def send_client_new_channel(self, message):
544
		room_id = message[VarNames.ROOM_ID]
545
		self.add_channel(room_id)
546
		self.add_online_user(room_id)
547
548
	def set_opponent_call_channel(self, message):
549
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
550
551
	def send_client_delete_channel(self, message):
552
		room_id = message[VarNames.ROOM_ID]
553
		self.async_redis.unsubscribe((room_id,))
554
		self.async_redis_publisher.hdel(room_id, self.id)
555
		self.channels.remove(room_id)
556
557
	def process_get_messages(self, data):
558
		"""
559
		:type data: dict
560
		"""
561
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
562
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
563
		room_id = data[VarNames.CHANNEL]
564
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
565
		if header_id is None:
566
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
567
		else:
568
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
569
		response = self.do_db(self.get_messages, messages, room_id)
570
		self.safe_write(response)
571
572
	def get_offline_messages(self):
573
		res = {}
574
		offline_messages = Message.objects.filter(
575
			id__gt=F('room__roomusers__last_read_message_id'),
576
			deleted=False,
577
			room__roomusers__user_id=self.user_id
578
		)
579
		for message in offline_messages:
580
			res.setdefault(message.room_id, []).append(self.create_message(message))
581
		return res
582
583
	def get_users_in_current_user_rooms(self):
584
		"""
585
		{
586
			"ROOM_ID:1": {
587
				"name": "All",
588
				"users": {
589
					"USER_ID:admin": {
590
						"name": "USER_NAME:admin",
591
						"sex": "SEX:Secret"
592
					},
593
					"USER_ID_2": {
594
						"name": "USER_NAME:Mike",
595
						"sex": "Male"
596
					}
597
				},
598
				"isPrivate": true
599
			}
600
		}
601
		"""
602
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
603
		res = {room['id']: {
604
				VarNames.ROOM_NAME: room['name'],
605
				VarNames.ROOM_USERS: {}
606
			} for room in user_rooms}
607
		room_ids = (room_id for room_id in res)
608
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
609
		for user in rooms_users:
610
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
611
		return res
612
613
	def set_js_user_structure(self, user_dict, user_id, name, sex):
614
		user_dict[user_id] = {
615
			VarNames.USER: name,
616
			VarNames.GENDER: GENDERS[sex]
617
		}
618
619
	def save_ip(self):
620
		if (self.do_db(UserJoinedInfo.objects.filter(
621
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
622
			return
623
		ip_address = self.get_or_create_ip()
624
		UserJoinedInfo.objects.create(
625
			ip=ip_address,
626
			user_id=self.user_id
627
		)
628
629
	def get_or_create_ip(self):
630
		try:
631
			ip_address = IpAddress.objects.get(ip=self.ip)
632
		except IpAddress.DoesNotExist:
633
			try:
634
				if not api_url:
635
					raise Exception('api url is absent')
636
				self.logger.debug("Creating ip record %s", self.ip)
637
				f = urlopen(api_url % self.ip)
638
				raw_response = f.read().decode("utf-8")
639
				response = json.loads(raw_response)
640
				if response['status'] != "success":
641
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
642
				ip_address = IpAddress.objects.create(
643
					ip=self.ip,
644
					isp=response['isp'],
645
					country=response['country'],
646
					region=response['regionName'],
647
					city=response['city'],
648
					country_code=response['countryCode']
649
				)
650
			except Exception as e:
651
				self.logger.error("Error while creating ip with country info, because %s", e)
652
				ip_address = IpAddress.objects.create(ip=self.ip)
653
		return ip_address
654
655
	def publish_logout(self, channel, log_data):
656
		# seems like async solves problem with connection lost and wrong data status
657
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
658
		online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
659
		log_data[channel] = {'online': online, 'is_online': is_online}
660
		if not is_online:
661
			message = self.room_online(online, Actions.LOGOUT, channel)
662
			self.publish(message, channel)
663
			return True
664
665
666
class AntiSpam(object):
667
668
	def __init__(self):
669
		self.spammed = 0
670
		self.info = {}
671
672
	def check_spam(self, json_message):
673
		message_length = len(json_message)
674
		info_key = int(round(time.time() * 100))
675
		self.info[info_key] = message_length
676
		if message_length > MAX_MESSAGE_SIZE:
677
			self.spammed += 1
678
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
679
		self.check_timed_spam()
680
681
	def check_timed_spam(self):
682
		# TODO implement me
683
		pass
684
		# raise ValidationError("You're chatting too much, calm down a bit!")
685
686
687
class TornadoHandler(WebSocketHandler, MessagesHandler):
688
689
	def __init__(self, *args, **kwargs):
690
		super(TornadoHandler, self).__init__(*args, **kwargs)
691
		self.connected = False
692
		self.anti_spam = AntiSpam()
693
694
	def data_received(self, chunk):
695
		pass
696
697
	def on_message(self, json_message):
698
		try:
699
			if not self.connected:
700
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
701
			if not json_message:
702
				raise ValidationError('Skipping null message')
703
			# self.anti_spam.check_spam(json_message)
704
			self.logger.debug('<< %s', json_message)
705
			message = json.loads(json_message)
706
			if message[VarNames.EVENT] not in self.pre_process_message:
707
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
708
			channel = message.get(VarNames.CHANNEL)
709
			if channel and channel not in self.channels:
710
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
711
			self.pre_process_message[message[VarNames.EVENT]](message)
712
		except ValidationError as e:
713
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
714
			self.safe_write(error_message)
715
716
	def on_close(self):
717
		if self.async_redis.subscribed:
718
			self.logger.info("Close event, unsubscribing from %s", self.channels)
719
			self.async_redis.unsubscribe(self.channels)
720
		else:
721
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
722
		log_data = {}
723
		gone_offline = False
724
		for channel in self.channels:
725
			if not isinstance(channel, int):
726
				continue
727
			self.sync_redis.hdel(channel, self.id)
728
			if self.connected:
729
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
730
		if gone_offline:
731
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
732
			self.logger.info("Updated %s last read message", res)
733
734
		self.disconnect(json.dumps(log_data))
735
736
	def disconnect(self, log_data, tries=0):
737
		"""
738
		Closes a connection if it's not in proggress, otherwice timeouts closing
739
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
740
		"""
741
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
742
			self.logger.debug('Closing a connection timeouts')
743
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
744
		else:
745
			self.logger.info("Close connection result: %s", log_data)
746
			self.async_redis.disconnect()
747
748
	def open(self):
749
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
750
		if sessionStore.exists(session_key):
751
			self.ip = self.get_client_ip()
752
			session = SessionStore(session_key)
753
			self.user_id = int(session["_auth_user_id"])
754
			log_params = {
755
				'user_id': str(self.user_id).zfill(3),
756
				'id': self.id,
757
				'ip': self.ip
758
			}
759
			self._logger = logging.LoggerAdapter(base_logger, log_params)
760
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
761
			self.async_redis.connect()
762
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
763
			self.sender_name = user_db.username
764
			self.sex = user_db.sex_str
765
			user_rooms = self.get_users_in_current_user_rooms()
766
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
767
			# get all missed messages
768
			self.channels.clear()
769
			self.channels.append(self.channel)
770
			for room_id in user_rooms:
771
				self.channels.append(room_id)
772
			self.listen(self.channels)
773
			off_messages = self.get_offline_messages()
774
			for room_id in user_rooms:
775
				self.add_online_user(room_id, off_messages.get(room_id))
776
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
777
			self.connected = True
778
			Thread(target=self.save_ip).start()
779
		else:
780
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
781
			self.close(403, "Session key %s has been rejected" % session_key)
782
783
	def check_origin(self, origin):
784
		"""
785
		check whether browser set domain matches origin
786
		"""
787
		parsed_origin = urlparse(origin)
788
		origin = parsed_origin.netloc
789
		origin_domain = origin.split(':')[0].lower()
790
		browser_set = self.request.headers.get("Host")
791
		browser_domain = browser_set.split(':')[0]
792
		return browser_domain == origin_domain
793
794
	def safe_write(self, message):
795
		"""
796
		Tries to send message, doesn't throw exception outside
797
		:type self: MessagesHandler
798
		"""
799
		# self.logger.debug('<< THREAD %s >>', os.getppid())
800
		try:
801
			if isinstance(message, dict):
802
				message = json.dumps(message)
803
			if not isinstance(message, str_type):
804
				raise ValueError('Wrong message type : %s' % str(message))
805
			self.logger.debug(">> %s", message)
806
			self.write_message(message)
807
		except tornado.websocket.WebSocketClosedError as e:
808
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
809
810
	def get_client_ip(self):
811
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
812