Completed
Push — master ( f51498...da0343 )
by Andrew
01:04
created

MessagesHandler.patch_tornadoredis()   A

Complexity

Conditions 3

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 12
rs 9.4285

1 Method

Rating   Name   Duplication   Size   Complexity  
A MessagesHandler.new_read() 0 8 2
1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from threading import Thread
7
from urllib.request import urlopen
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.ioloop
12
import tornado.web
13
import tornado.websocket
14
import tornadoredis
15
from django.conf import settings
16
from django.core.exceptions import ValidationError
17
from django.db import connection, OperationalError, InterfaceError, IntegrityError
18
from django.db.models import Q, F
19
from redis_sessions.session import SessionStore
20
from tornado import ioloop
21
from tornado.websocket import WebSocketHandler
22
23
from chat.utils import extract_photo
24
25
try:
26
	from urllib.parse import urlparse  # py2
27
except ImportError:
28
	from urlparse import urlparse  # py3
29
30
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM
31
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo, RoomUsers
32
33
PY3 = sys.version > '3'
34
str_type = str if PY3 else basestring
35
api_url = getattr(settings, "IP_API_URL", None)
36
37
sessionStore = SessionStore()
38
39
base_logger = logging.getLogger(__name__)
40
41
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
42
#CONNECTION_POOL = tornadoredis.ConnectionPool(
43
#	max_connections=500,
44
#	wait_for_available=True)
45
46
47
class Actions(object):
48
	LOGIN = 'addOnlineUser'
49
	LOGOUT = 'removeOnlineUser'
50
	SEND_MESSAGE = 'sendMessage'
51
	PRINT_MESSAGE = 'printMessage'
52
	CALL = 'call'
53
	ROOMS = 'setRooms'
54
	REFRESH_USER = 'setOnlineUsers'
55
	GROWL_MESSAGE = 'growl'
56
	GET_MESSAGES = 'loadMessages'
57
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
58
	DELETE_ROOM = 'deleteRoom'
59
	EDIT_MESSAGE = 'editMessage'
60
	DELETE_MESSAGE = 'deleteMessage'
61
	CREATE_ROOM_CHANNEL = 'addRoom'
62
	INVITE_USER = 'inviteUser'
63
	ADD_USER = 'addUserToAll'
64
	OFFLINE_MESSAGES = 'loadOfflineMessages'
65
66
67
class VarNames(object):
68
	CALL_TYPE = 'type'
69
	USER = 'user'
70
	USER_ID = 'userId'
71
	TIME = 'time'
72
	CONTENT = 'content'
73
	IMG = 'image'
74
	EVENT = 'action'
75
	MESSAGE_ID = 'id'
76
	GENDER = 'sex'
77
	ROOM_NAME = 'name'
78
	ROOM_ID = 'roomId'
79
	ROOM_USERS = 'users'
80
	CHANNEL = 'channel'
81
	GET_MESSAGES_COUNT = 'count'
82
	GET_MESSAGES_HEADER_ID = 'headerId'
83
	CHANNEL_NAME = 'channel'
84
	IS_ROOM_PRIVATE = 'private'
85
	#ROOM_NAME = 'roomName'
86
	# ROOM_ID = 'roomId'
87
88
89
class CallType(object):
90
	OFFER = 'offer'
91
92
class HandlerNames:
93
	NAME = 'handler'
94
	CHANNELS = 'channels'
95
	CHAT = 'chat'
96
	GROWL = 'growl'
97
	WEBRTC = 'webrtc'
98
	FILE = 'file'
99
100
101
class RedisPrefix:
102
	USER_ID_CHANNEL_PREFIX = 'u'
103
	__ROOM_ONLINE__ = 'o:{}'
104
105
	@classmethod
106
	def generate_user(cls, key):
107
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
108
109
RedisPrefix.DEFAULT_CHANNEL = ALL_ROOM_ID
110
111
112
class MessagesCreator(object):
113
114
	def __init__(self, *args, **kwargs):
115
		super(MessagesCreator, self).__init__(*args, **kwargs)
116
		self.sex = None
117
		self.sender_name = None
118
		self.user_id = 0  # anonymous by default
119
120
	def default(self, content, event, handler):
121
		"""
122
		:return: {"action": event, "content": content, "time": "20:48:57"}
123
		"""
124
		return {
125
			VarNames.EVENT: event,
126
			VarNames.CONTENT: content,
127
			VarNames.USER_ID: self.user_id,
128
			VarNames.TIME: get_milliseconds(),
129
			HandlerNames.NAME: handler
130
		}
131
132
	def room_online(self, online, event, channel):
133
		"""
134
		:return: {"action": event, "content": content, "time": "20:48:57"}
135
		"""
136
		room_less = self.default(online, event, HandlerNames.CHAT)
137
		room_less[VarNames.CHANNEL_NAME] = channel
138
		room_less[VarNames.USER] = self.sender_name
139
		room_less[VarNames.GENDER] = self.sex
140
		return room_less
141
142
	def offer_call(self, content, message_type):
143
		"""
144
		:return: {"action": "call", "content": content, "time": "20:48:57"}
145
		"""
146
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
147
		message[VarNames.CALL_TYPE] = message_type
148
		message[VarNames.USER] = self.sender_name
149
		return message
150
151
	@classmethod
152
	def create_message(cls, message):
153
		res = {
154
			VarNames.USER_ID: message.sender_id,
155
			VarNames.CONTENT: message.content,
156
			VarNames.TIME: message.time,
157
			VarNames.MESSAGE_ID: message.id,
158
		}
159
		if message.img.name:
160
			res[VarNames.IMG] = message.img.url
161
		return res
162
163
	@classmethod
164
	def create_send_message(cls, message, event=Actions.PRINT_MESSAGE):
165
		"""
166
		:param message:
167
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
168
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
169
		"""
170
		res = cls.create_message(message)
171
		res[VarNames.EVENT] = event
172
		res[VarNames.CHANNEL] = message.room_id
173
		res[HandlerNames.NAME] = HandlerNames.CHAT
174
		return res
175
176
	@classmethod
177
	def get_messages(cls, messages, channel):
178
		"""
179
		:type messages: list[Messages]
180
		:type channel: str
181
		:type messages: QuerySet[Messages]
182
		"""
183
		return {
184
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
185
			VarNames.EVENT: Actions.GET_MESSAGES,
186
			VarNames.CHANNEL: channel,
187
			HandlerNames.NAME: HandlerNames.CHAT
188
		}
189
190
	@property
191
	def stored_redis_user(self):
192
		return  self.user_id
193
194
	@property
195
	def channel(self):
196
		return RedisPrefix.generate_user(self.user_id)
197
198
	def subscribe_direct_channel_message(self, room_id, other_user_id):
199
		return {
200
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
201
			VarNames.ROOM_ID: room_id,
202
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
203
			HandlerNames.NAME: HandlerNames.CHANNELS
204
		}
205
206
	def subscribe_room_channel_message(self, room_id, room_name):
207
		return {
208
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
209
			VarNames.ROOM_ID: room_id,
210
			VarNames.ROOM_USERS: [self.user_id],
211
			HandlerNames.NAME: HandlerNames.CHANNELS,
212
			VarNames.ROOM_NAME: room_name
213
		}
214
215
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
216
		return {
217
			VarNames.EVENT: Actions.INVITE_USER,
218
			VarNames.ROOM_ID: room_id,
219
			VarNames.USER_ID: user_id,
220
			HandlerNames.NAME: HandlerNames.CHANNELS,
221
			VarNames.ROOM_NAME: room_name,
222
			VarNames.CONTENT: users
223
		}
224
225
	def add_user_to_room(self, channel, user_id, content):
226
		return {
227
			VarNames.EVENT: Actions.ADD_USER,
228
			VarNames.CHANNEL: channel,
229
			VarNames.USER_ID: user_id,
230
			HandlerNames.NAME: HandlerNames.CHAT,
231
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
232
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
233
		}
234
235
	def unsubscribe_direct_message(self, room_id):
236
		return {
237
			VarNames.EVENT: Actions.DELETE_ROOM,
238
			VarNames.ROOM_ID: room_id,
239
			VarNames.USER_ID: self.user_id,
240
			HandlerNames.NAME: HandlerNames.CHANNELS,
241
			VarNames.TIME: get_milliseconds()
242
		}
243
244
	def load_offline_message(self, offline_messages, channel_key):
245
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
246
		res[VarNames.CHANNEL] = channel_key
247
		return res
248
249
250
class MessagesHandler(MessagesCreator):
251
252
	def __init__(self, *args, **kwargs):
253
		self.parsable_prefix = 'p'
254
		super(MessagesHandler, self).__init__(*args, **kwargs)
255
		self.id = id(self)
256
		self.ip = None
257
		from chat import global_redis
258
		self.async_redis_publisher = global_redis.async_redis_publisher
259
		self.sync_redis = global_redis.sync_redis
260
		self.channels = []
261
		self.call_receiver_channel = None
262
		self._logger = None
263
		self.async_redis = tornadoredis.Client()
264
		self.patch_tornadoredis()
265
		self.pre_process_message = {
266
			Actions.GET_MESSAGES: self.process_get_messages,
267
			Actions.SEND_MESSAGE: self.process_send_message,
268
			Actions.CALL: self.process_call,
269
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
270
			Actions.DELETE_ROOM: self.delete_channel,
271
			Actions.EDIT_MESSAGE: self.edit_message,
272
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
273
			Actions.INVITE_USER: self.invite_user,
274
		}
275
		self.post_process_message = {
276
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
277
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
278
			Actions.DELETE_ROOM: self.send_client_delete_channel,
279
			Actions.INVITE_USER: self.send_client_new_channel,
280
			Actions.CALL: self.set_opponent_call_channel
281
		}
282
283
	def patch_tornadoredis(self):  # TODO remove this
284
		self.async_redis.connection.old_read = self.async_redis.connection.read
285
		def new_read(instance, *args, **kwargs):
286
			try:
287
				return instance.old_read(*args, **kwargs)
288
			except Exception as e:
289
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
290
				self.logger.error(e)
291
				self.logger.error("Current online" + str(current_online))
292
				raise e
293
		fabric = type(self.async_redis.connection.read)
294
		self.async_redis.connection.read = fabric(new_read, self.async_redis.connection)
295
296
	@tornado.gen.engine
297
	def listen(self, channels):
298
		yield tornado.gen.Task(
299
			self.async_redis.subscribe, channels)
300
		self.async_redis.listen(self.new_message)
301
302
	@property
303
	def logger(self):
304
		return self._logger if self._logger else base_logger
305
306
	@tornado.gen.engine
307
	def add_channel(self, channel):
308
		self.channels.append(channel)
309
		yield tornado.gen.Task(
310
			self.async_redis.subscribe, (channel,))
311
312
	def do_db(self, callback, *args, **kwargs):
313
		try:
314
			return callback(*args, **kwargs)
315
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
316
			self.logger.warning('%s, reconnecting' % e)  # TODO
317
			connection.close()
318
			return callback(*args, **kwargs)
319
320
	def execute_query(self, query, *args, **kwargs):
321
		cursor = connection.cursor()
322
		cursor.execute(query, *args, **kwargs)
323
		desc = cursor.description
324
		return [
325
			dict(zip([col[0] for col in desc], row))
326
			for row in cursor.fetchall()
327
			]
328
329
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
330
		"""
331
		:rtype : dict
332
		returns (dict, bool) if check_type is present
333
		"""
334
		online = self.sync_redis.hgetall(channel)
335
		self.logger.debug('!! channel %s redis online: %s', channel, online)
336
		result = set()
337
		user_is_online = False
338
		# redis stores REDIS_USER_FORMAT, so parse them
339
		if online:
340
			for key_hash, raw_user_id in online.items():  # py2 iteritems
341
				user_id = int(raw_user_id.decode('utf-8'))
342
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
343
					user_is_online = True
344
				result.add(user_id)
345
		result = list(result)
346
		return (result, user_is_online) if check_user_id else result
347
348
	def add_online_user(self, room_id, offline_messages=None):
349
		"""
350
		adds to redis
351
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
352
		:return:
353
		"""
354
		self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user)
355
		# since we add user to online first, latest trigger will always show correct online
356
		online, is_online = self.get_online_from_redis(room_id, self.user_id, self.id)
357
		if not is_online:  # if a new tab has been opened
358
			online.append(self.user_id)
359
			online_user_names_mes = self.room_online(
360
				online,
361
				Actions.LOGIN,
362
				room_id
363
			)
364
			self.logger.info('!! First tab, sending refresh online for all')
365
			self.publish(online_user_names_mes, room_id)
366
			if offline_messages:
367
				self.safe_write(self.load_offline_message(offline_messages, room_id))
368
		else:  # Send user names to self
369
			online_user_names_mes = self.room_online(
370
				online,
371
				Actions.REFRESH_USER,
372
				room_id
373
			)
374
			self.logger.info('!! Second tab, retrieving online for self')
375
			self.safe_write(online_user_names_mes)
376
377
	def publish(self, message, channel, parsable=False):
378
		jsoned_mess = json.dumps(message)
379
		self.logger.debug('<%s> %s', channel, jsoned_mess)
380
		if parsable:
381
			jsoned_mess = self.encode(jsoned_mess)
382
		self.async_redis_publisher.publish(channel, jsoned_mess)
383
384
	def encode(self, message):
385
		"""
386
		Marks message with prefix to specify that
387
		it should be decoded and proccesed before sending to client
388
		@param message: message to mark
389
		@return: marked message
390
		"""
391
		return self.parsable_prefix + message
392
393
	def decode(self, message):
394
		"""
395
		Check if message should be proccessed by server before writing to client
396
		@param message: message to check
397
		@type message: str
398
		@return: Object structure of message if it should be processed, None if not
399
		"""
400
		if message.startswith(self.parsable_prefix):
401
			return json.loads(message[1:])
402
403
	def new_message(self, message):
404
		data = message.body
405
		if isinstance(data, str_type):  # subscribe event
406
			decoded = self.decode(data)
407
			if decoded:
408
				data = decoded
409
			self.safe_write(data)
410
			if decoded:
411
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
412
413
	def safe_write(self, message):
414
		raise NotImplementedError('WebSocketHandler implements')
415
416
	def process_send_message(self, message):
417
		"""
418
		:type message: dict
419
		"""
420
		channel = message[VarNames.CHANNEL]
421
		message_db = Message(
422
			sender_id=self.user_id,
423
			content=message[VarNames.CONTENT]
424
		)
425
		message_db.room_id = channel
426
		if VarNames.IMG in message:
427
			message_db.img = extract_photo(message[VarNames.IMG])
428
		self.do_db(message_db.save)  # exit on hacked id with exception
429
		prepared_message = self.create_send_message(message_db)
430
		self.publish(prepared_message, channel)
431
432
	def process_call(self, in_message):
433
		"""
434
		:type in_message: dict
435
		"""
436
		call_type = in_message.get(VarNames.CALL_TYPE)
437
		set_opponent_channel = False
438
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
439
		if call_type == CallType.OFFER:
440
			room_id = in_message[VarNames.CHANNEL]
441
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
442
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
443
			set_opponent_channel = True
444
			out_message[VarNames.CHANNEL] = room_id
445
		# TODO
446
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
447
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
448
449
	def create_new_room(self, message):
450
		room_name = message[VarNames.ROOM_NAME]
451
		if not room_name or len(room_name) > 16:
452
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
453
		room = Room(name=room_name)
454
		self.do_db(room.save)
455
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
456
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
457
		self.publish(subscribe_message, self.channel, True)
458
459
	def invite_user(self, message):
460
		room_id = message[VarNames.ROOM_ID]
461
		user_id = message[VarNames.USER_ID]
462
		if room_id not in self.channels:
463
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
464
		room = self.do_db(Room.objects.get, id=room_id)
465
		if room.is_private:
466
			raise ValidationError("You can't add users to direct room, create a new room instead")
467
		try:
468
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
469
		except IntegrityError:
470
			raise ValidationError("User is already in channel")
471
		users_in_room = {}
472
		for user in room.users.all():
473
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
474
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
475
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
476
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
477
478
	def create_room(self, user_rooms, user_id):
479
		if self.user_id == user_id:
480
			room_ids = list([room['room_id'] for room in user_rooms])
481
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
482
		else:
483
			rooms_query = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms)
484
			query_res = rooms_query.values('room__id', 'room__disabled')
485
		if len(query_res) > 0:
486
			room = query_res[0]
487
			room_id = room['room__id']
488
			self.update_room(room_id, room['room__disabled'])
489
		else:
490
			room = Room()
491
			room.save()
492
			room_id = room.id
493
			if self.user_id == user_id:
494
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
495
			else:
496
				RoomUsers.objects.bulk_create([
497
					RoomUsers(user_id=user_id, room_id=room_id),
498
					RoomUsers(user_id=self.user_id, room_id=room_id),
499
				])
500
		return room_id
501
502
	def update_room(self, room_id, disabled):
503
		if not disabled:
504
			raise ValidationError('This room already exist')
505
		else:
506
			Room.objects.filter(id=room_id).update(disabled=False)
507
508
	def create_user_channel(self, message):
509
		user_id = message[VarNames.USER_ID]
510
		# get all self private rooms ids
511
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
512
		# get private room that contains another user from rooms above
513
		room_id = self.create_room(user_rooms, user_id)
514
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
515
		self.publish(subscribe_message, self.channel, True)
516
		other_channel = RedisPrefix.generate_user(user_id)
517
		if self.channel != other_channel:
518
			self.publish(subscribe_message, other_channel, True)
519
520
	def delete_channel(self, message):
521
		room_id = message[VarNames.ROOM_ID]
522
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
523
			raise ValidationError('You are not allowed to exit this room')
524
		room = self.do_db(Room.objects.get, id=room_id)
525
		if room.disabled:
526
			raise ValidationError('Room is already deleted')
527
		if room.name is None:  # if private then disable
528
			room.disabled = True
529
		else: # if public -> leave the room, delete the link
530
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
531
			online = self.get_online_from_redis(room_id)
532
			online.remove(self.user_id)
533
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
534
		room.save()
535
		message = self.unsubscribe_direct_message(room_id)
536
		self.publish(message, room_id, True)
537
538
	def edit_message(self, data):
539
		message_id = data[VarNames.MESSAGE_ID]
540
		message = Message.objects.get(id=message_id)
541
		if message.sender_id != self.user_id:
542
			raise ValidationError("You can only edit your messages")
543
		if message.time + 60000 < get_milliseconds():
544
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
545
		if message.deleted:
546
			raise ValidationError("Already deleted")
547
		message.content = data[VarNames.CONTENT]
548
		selector = Message.objects.filter(id=message_id)
549
		if message.content is None:
550
			selector.update(deleted=True)
551
			action = Actions.DELETE_MESSAGE
552
		else:
553
			action = Actions.EDIT_MESSAGE
554
			selector.update(content=message.content)
555
		self.publish(self.create_send_message(message, action), message.room_id)
556
557
	def send_client_new_channel(self, message):
558
		room_id = message[VarNames.ROOM_ID]
559
		self.add_channel(room_id)
560
		self.add_online_user(room_id)
561
562
	def set_opponent_call_channel(self, message):
563
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
564
565
	def send_client_delete_channel(self, message):
566
		room_id = message[VarNames.ROOM_ID]
567
		self.async_redis.unsubscribe((room_id,))
568
		self.async_redis_publisher.hdel(room_id, self.id)
569
		self.channels.remove(room_id)
570
571
	def process_get_messages(self, data):
572
		"""
573
		:type data: dict
574
		"""
575
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
576
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
577
		room_id = data[VarNames.CHANNEL]
578
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
579
		if header_id is None:
580
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
581
		else:
582
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
583
		response = self.do_db(self.get_messages, messages, room_id)
584
		self.safe_write(response)
585
586
	def get_offline_messages(self):
587
		res = {}
588
		offline_messages = Message.objects.filter(
589
			id__gt=F('room__roomusers__last_read_message_id'),
590
			deleted=False,
591
			room__roomusers__user_id=self.user_id
592
		)
593
		for message in offline_messages:
594
			res.setdefault(message.room_id, []).append(self.create_message(message))
595
		return res
596
597
	def get_users_in_current_user_rooms(self):
598
		"""
599
		{
600
			"ROOM_ID:1": {
601
				"name": "All",
602
				"users": {
603
					"USER_ID:admin": {
604
						"name": "USER_NAME:admin",
605
						"sex": "SEX:Secret"
606
					},
607
					"USER_ID_2": {
608
						"name": "USER_NAME:Mike",
609
						"sex": "Male"
610
					}
611
				},
612
				"isPrivate": true
613
			}
614
		}
615
		"""
616
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
617
		res = {room['id']: {
618
				VarNames.ROOM_NAME: room['name'],
619
				VarNames.ROOM_USERS: {}
620
			} for room in user_rooms}
621
		room_ids = (room_id for room_id in res)
622
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
623
		for user in rooms_users:
624
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
625
		return res
626
627
	def set_js_user_structure(self, user_dict, user_id, name, sex):
628
		user_dict[user_id] = {
629
			VarNames.USER: name,
630
			VarNames.GENDER: GENDERS[sex]
631
		}
632
633
	def save_ip(self):
634
		if (self.do_db(UserJoinedInfo.objects.filter(
635
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
636
			return
637
		ip_address = self.get_or_create_ip()
638
		UserJoinedInfo.objects.create(
639
			ip=ip_address,
640
			user_id=self.user_id
641
		)
642
643
	def get_or_create_ip(self):
644
		try:
645
			ip_address = IpAddress.objects.get(ip=self.ip)
646
		except IpAddress.DoesNotExist:
647
			try:
648
				if not api_url:
649
					raise Exception('api url is absent')
650
				self.logger.debug("Creating ip record %s", self.ip)
651
				f = urlopen(api_url % self.ip)
652
				raw_response = f.read().decode("utf-8")
653
				response = json.loads(raw_response)
654
				if response['status'] != "success":
655
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
656
				ip_address = IpAddress.objects.create(
657
					ip=self.ip,
658
					isp=response['isp'],
659
					country=response['country'],
660
					region=response['regionName'],
661
					city=response['city'],
662
					country_code=response['countryCode']
663
				)
664
			except Exception as e:
665
				self.logger.error("Error while creating ip with country info, because %s", e)
666
				ip_address = IpAddress.objects.create(ip=self.ip)
667
		return ip_address
668
669
	def publish_logout(self, channel, log_data):
670
		# seems like async solves problem with connection lost and wrong data status
671
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
672
		online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
673
		log_data[channel] = {'online': online, 'is_online': is_online}
674
		if not is_online:
675
			message = self.room_online(online, Actions.LOGOUT, channel)
676
			self.publish(message, channel)
677
			return True
678
679
680
class AntiSpam(object):
681
682
	def __init__(self):
683
		self.spammed = 0
684
		self.info = {}
685
686
	def check_spam(self, json_message):
687
		message_length = len(json_message)
688
		info_key = int(round(time.time() * 100))
689
		self.info[info_key] = message_length
690
		if message_length > MAX_MESSAGE_SIZE:
691
			self.spammed += 1
692
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
693
		self.check_timed_spam()
694
695
	def check_timed_spam(self):
696
		# TODO implement me
697
		pass
698
		# raise ValidationError("You're chatting too much, calm down a bit!")
699
700
701
class TornadoHandler(WebSocketHandler, MessagesHandler):
702
703
	def __init__(self, *args, **kwargs):
704
		super(TornadoHandler, self).__init__(*args, **kwargs)
705
		self.connected = False
706
		self.anti_spam = AntiSpam()
707
708
	def data_received(self, chunk):
709
		pass
710
711
	def on_message(self, json_message):
712
		try:
713
			if not self.connected:
714
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
715
			if not json_message:
716
				raise ValidationError('Skipping null message')
717
			# self.anti_spam.check_spam(json_message)
718
			self.logger.debug('<< %s', json_message)
719
			message = json.loads(json_message)
720
			if message[VarNames.EVENT] not in self.pre_process_message:
721
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
722
			channel = message.get(VarNames.CHANNEL)
723
			if channel and channel not in self.channels:
724
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
725
			self.pre_process_message[message[VarNames.EVENT]](message)
726
		except ValidationError as e:
727
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
728
			self.safe_write(error_message)
729
730
	def on_close(self):
731
		if self.async_redis.subscribed:
732
			self.logger.info("Close event, unsubscribing from %s", self.channels)
733
			self.async_redis.unsubscribe(self.channels)
734
		else:
735
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
736
		log_data = {}
737
		gone_offline = False
738
		for channel in self.channels:
739
			if not isinstance(channel, int):
740
				continue
741
			self.sync_redis.hdel(channel, self.id)
742
			if self.connected:
743
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
744
		if gone_offline:
745
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
746
			self.logger.info("Updated %s last read message", res)
747
748
		self.disconnect(json.dumps(log_data))
749
750
	def disconnect(self, log_data, tries=0):
751
		"""
752
		Closes a connection if it's not in proggress, otherwice timeouts closing
753
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
754
		"""
755
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
756
			self.logger.debug('Closing a connection timeouts')
757
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
758
		else:
759
			self.logger.info("Close connection result: %s", log_data)
760
			self.async_redis.disconnect()
761
762
	def open(self):
763
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
764
		if sessionStore.exists(session_key):
765
			self.ip = self.get_client_ip()
766
			session = SessionStore(session_key)
767
			self.user_id = int(session["_auth_user_id"])
768
			log_params = {
769
				'user_id': str(self.user_id).zfill(3),
770
				'id': self.id,
771
				'ip': self.ip
772
			}
773
			self._logger = logging.LoggerAdapter(base_logger, log_params)
774
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
775
			self.async_redis.connect()
776
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
777
			self.sender_name = user_db.username
778
			self.sex = user_db.sex_str
779
			user_rooms = self.get_users_in_current_user_rooms()
780
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
781
			# get all missed messages
782
			self.channels.clear()
783
			self.channels.append(self.channel)
784
			for room_id in user_rooms:
785
				self.channels.append(room_id)
786
			self.listen(self.channels)
787
			off_messages = self.get_offline_messages()
788
			for room_id in user_rooms:
789
				self.add_online_user(room_id, off_messages.get(room_id))
790
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
791
			self.connected = True
792
			Thread(target=self.save_ip).start()
793
		else:
794
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
795
			self.close(403, "Session key %s has been rejected" % session_key)
796
797
	def check_origin(self, origin):
798
		"""
799
		check whether browser set domain matches origin
800
		"""
801
		parsed_origin = urlparse(origin)
802
		origin = parsed_origin.netloc
803
		origin_domain = origin.split(':')[0].lower()
804
		browser_set = self.request.headers.get("Host")
805
		browser_domain = browser_set.split(':')[0]
806
		return browser_domain == origin_domain
807
808
	def safe_write(self, message):
809
		"""
810
		Tries to send message, doesn't throw exception outside
811
		:type self: MessagesHandler
812
		"""
813
		# self.logger.debug('<< THREAD %s >>', os.getppid())
814
		try:
815
			if isinstance(message, dict):
816
				message = json.dumps(message)
817
			if not isinstance(message, str_type):
818
				raise ValueError('Wrong message type : %s' % str(message))
819
			self.logger.debug(">> %s", message)
820
			self.write_message(message)
821
		except tornado.websocket.WebSocketClosedError as e:
822
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
823
824
	def get_client_ip(self):
825
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
826