Completed
Push — master ( 5850a4...5772bc )
by Andrew
01:00
created

AntiSpam.__init__()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 3
rs 10
1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from numbers import Number
7
from threading import Thread
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.ioloop
12
import tornado.web
13
import tornado.websocket
14
import tornadoredis
15
from django.conf import settings
16
from django.core.exceptions import ValidationError
17
from django.db import connection, OperationalError, InterfaceError, IntegrityError
18
from django.db.models import Q, F
19
from redis_sessions.session import SessionStore
20
from tornado import ioloop
21
from tornado.websocket import WebSocketHandler
22
23
from chat.utils import extract_photo, get_or_create_ip
24
25
try:  # py2
26
	from urlparse import urlparse
27
except ImportError:  # py3
28
	from urllib.parse import urlparse
29
30
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM
31
from chat.models import User, Message, Room, get_milliseconds, UserJoinedInfo, RoomUsers
32
33
PY3 = sys.version > '3'
34
str_type = str if PY3 else basestring
35
36
37
sessionStore = SessionStore()
38
39
base_logger = logging.getLogger(__name__)
40
41
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
42
#CONNECTION_POOL = tornadoredis.ConnectionPool(
43
#	max_connections=500,
44
#	wait_for_available=True)
45
46
47
class Actions(object):
48
	LOGIN = 'addOnlineUser'
49
	LOGOUT = 'removeOnlineUser'
50
	SEND_MESSAGE = 'sendMessage'
51
	PRINT_MESSAGE = 'printMessage'
52
	CALL = 'call'
53
	ROOMS = 'setRooms'
54
	REFRESH_USER = 'setOnlineUsers'
55
	GROWL_MESSAGE = 'growl'
56
	GET_MESSAGES = 'loadMessages'
57
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
58
	DELETE_ROOM = 'deleteRoom'
59
	EDIT_MESSAGE = 'editMessage'
60
	DELETE_MESSAGE = 'deleteMessage'
61
	CREATE_ROOM_CHANNEL = 'addRoom'
62
	INVITE_USER = 'inviteUser'
63
	ADD_USER = 'addUserToAll'
64
	OFFLINE_MESSAGES = 'loadOfflineMessages'
65
66
67
class VarNames(object):
68
	CALL_TYPE = 'type'
69
	USER = 'user'
70
	USER_ID = 'userId'
71
	TIME = 'time'
72
	CONTENT = 'content'
73
	IMG = 'image'
74
	EVENT = 'action'
75
	MESSAGE_ID = 'id'
76
	GENDER = 'sex'
77
	ROOM_NAME = 'name'
78
	FILE_NAME = 'filename'
79
	ROOM_ID = 'roomId'
80
	ROOM_USERS = 'users'
81
	CHANNEL = 'channel'
82
	GET_MESSAGES_COUNT = 'count'
83
	GET_MESSAGES_HEADER_ID = 'headerId'
84
	CHANNEL_NAME = 'channel'
85
	IS_ROOM_PRIVATE = 'private'
86
	#ROOM_NAME = 'roomName'
87
	# ROOM_ID = 'roomId'
88
89
90
class CallType(object):
91
	OFFER = 'offer'
92
93
class HandlerNames:
94
	NAME = 'handler'
95
	CHANNELS = 'channels'
96
	CHAT = 'chat'
97
	GROWL = 'growl'
98
	WEBRTC = 'webrtc'
99
	FILE = 'file'
100
101
102
class RedisPrefix:
103
	USER_ID_CHANNEL_PREFIX = 'u'
104
	__ROOM_ONLINE__ = 'o:{}'
105
106
	@classmethod
107
	def generate_user(cls, key):
108
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
109
110
RedisPrefix.DEFAULT_CHANNEL = ALL_ROOM_ID
111
112
113
class MessagesCreator(object):
114
115
	def __init__(self, *args, **kwargs):
116
		super(MessagesCreator, self).__init__(*args, **kwargs)
117
		self.sex = None
118
		self.sender_name = None
119
		self.user_id = 0  # anonymous by default
120
121
	def default(self, content, event, handler):
122
		"""
123
		:return: {"action": event, "content": content, "time": "20:48:57"}
124
		"""
125
		return {
126
			VarNames.EVENT: event,
127
			VarNames.CONTENT: content,
128
			VarNames.USER_ID: self.user_id,
129
			VarNames.TIME: get_milliseconds(),
130
			HandlerNames.NAME: handler
131
		}
132
133
	def room_online(self, online, event, channel):
134
		"""
135
		:return: {"action": event, "content": content, "time": "20:48:57"}
136
		"""
137
		room_less = self.default(online, event, HandlerNames.CHAT)
138
		room_less[VarNames.CHANNEL_NAME] = channel
139
		room_less[VarNames.USER] = self.sender_name
140
		room_less[VarNames.GENDER] = self.sex
141
		return room_less
142
143
	def offer_call(self, content, message_type):
144
		"""
145
		:return: {"action": "call", "content": content, "time": "20:48:57"}
146
		"""
147
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
148
		message[VarNames.CALL_TYPE] = message_type
149
		message[VarNames.USER] = self.sender_name
150
		return message
151
152
	@classmethod
153
	def create_message(cls, message):
154
		res = {
155
			VarNames.USER_ID: message.sender_id,
156
			VarNames.CONTENT: message.content,
157
			VarNames.TIME: message.time,
158
			VarNames.MESSAGE_ID: message.id,
159
		}
160
		if message.img.name:
161
			res[VarNames.IMG] = message.img.url
162
		return res
163
164
	@classmethod
165
	def create_send_message(cls, message, event=Actions.PRINT_MESSAGE):
166
		"""
167
		:param message:
168
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
169
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
170
		"""
171
		res = cls.create_message(message)
172
		res[VarNames.EVENT] = event
173
		res[VarNames.CHANNEL] = message.room_id
174
		res[HandlerNames.NAME] = HandlerNames.CHAT
175
		return res
176
177
	@classmethod
178
	def get_messages(cls, messages, channel):
179
		"""
180
		:type messages: list[Messages]
181
		:type channel: str
182
		:type messages: QuerySet[Messages]
183
		"""
184
		return {
185
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
186
			VarNames.EVENT: Actions.GET_MESSAGES,
187
			VarNames.CHANNEL: channel,
188
			HandlerNames.NAME: HandlerNames.CHAT
189
		}
190
191
	@property
192
	def stored_redis_user(self):
193
		return  self.user_id
194
195
	@property
196
	def channel(self):
197
		return RedisPrefix.generate_user(self.user_id)
198
199
	def subscribe_direct_channel_message(self, room_id, other_user_id):
200
		return {
201
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
202
			VarNames.ROOM_ID: room_id,
203
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
204
			HandlerNames.NAME: HandlerNames.CHANNELS
205
		}
206
207
	def subscribe_room_channel_message(self, room_id, room_name):
208
		return {
209
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
210
			VarNames.ROOM_ID: room_id,
211
			VarNames.ROOM_USERS: [self.user_id],
212
			HandlerNames.NAME: HandlerNames.CHANNELS,
213
			VarNames.ROOM_NAME: room_name
214
		}
215
216
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
217
		return {
218
			VarNames.EVENT: Actions.INVITE_USER,
219
			VarNames.ROOM_ID: room_id,
220
			VarNames.USER_ID: user_id,
221
			HandlerNames.NAME: HandlerNames.CHANNELS,
222
			VarNames.ROOM_NAME: room_name,
223
			VarNames.CONTENT: users
224
		}
225
226
	def add_user_to_room(self, channel, user_id, content):
227
		return {
228
			VarNames.EVENT: Actions.ADD_USER,
229
			VarNames.CHANNEL: channel,
230
			VarNames.USER_ID: user_id,
231
			HandlerNames.NAME: HandlerNames.CHAT,
232
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
233
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
234
		}
235
236
	def unsubscribe_direct_message(self, room_id):
237
		return {
238
			VarNames.EVENT: Actions.DELETE_ROOM,
239
			VarNames.ROOM_ID: room_id,
240
			VarNames.USER_ID: self.user_id,
241
			HandlerNames.NAME: HandlerNames.CHANNELS,
242
			VarNames.TIME: get_milliseconds()
243
		}
244
245
	def load_offline_message(self, offline_messages, channel_key):
246
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
247
		res[VarNames.CHANNEL] = channel_key
248
		return res
249
250
251
class MessagesHandler(MessagesCreator):
252
253
	def __init__(self, *args, **kwargs):
254
		self.closed_channels = None
255
		self.parsable_prefix = 'p'
256
		super(MessagesHandler, self).__init__(*args, **kwargs)
257
		self.id = id(self)
258
		self.ip = None
259
		from chat import global_redis
260
		self.async_redis_publisher = global_redis.async_redis_publisher
261
		self.sync_redis = global_redis.sync_redis
262
		self.channels = []
263
		self.call_receiver_channel = None
264
		self._logger = None
265
		self.async_redis = tornadoredis.Client()
266
		self.patch_tornadoredis()
267
		self.pre_process_message = {
268
			Actions.GET_MESSAGES: self.process_get_messages,
269
			Actions.SEND_MESSAGE: self.process_send_message,
270
			Actions.CALL: self.process_call,
271
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
272
			Actions.DELETE_ROOM: self.delete_channel,
273
			Actions.EDIT_MESSAGE: self.edit_message,
274
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
275
			Actions.INVITE_USER: self.invite_user,
276
		}
277
		self.post_process_message = {
278
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
279
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
280
			Actions.DELETE_ROOM: self.send_client_delete_channel,
281
			Actions.INVITE_USER: self.send_client_new_channel,
282
			Actions.CALL: self.set_opponent_call_channel
283
		}
284
285
	def patch_tornadoredis(self):  # TODO remove this
286
		self.async_redis.connection.old_read = self.async_redis.connection.read
287
		def new_read(instance, *args, **kwargs):
288
			try:
289
				return instance.old_read(*args, **kwargs)
290
			except Exception as e:
291
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
292
				self.logger.error(e)
293
				self.logger.error(
294
					"Exception info: "
295
					"self.connected = '%s';;; "
296
					"Redis default channel online = '%s';;; "
297
					"self.channels = '%s';;; "
298
					"self.closed_channels  = '%s';;;",
299
					self.connected, current_online, self.channels, self.closed_channels
300
				)
301
				raise e
302
		fabric = type(self.async_redis.connection.read)
303
		self.async_redis.connection.read = fabric(new_read, self.async_redis.connection)
304
305
	@property
306
	def connected(self):
307
		raise NotImplemented
308
309
	@connected.setter
310
	def connected(self, value):
311
		raise NotImplemented
312
313
	@tornado.gen.engine
314
	def listen(self, channels):
315
		yield tornado.gen.Task(
316
			self.async_redis.subscribe, channels)
317
		self.async_redis.listen(self.new_message)
318
319
	@property
320
	def logger(self):
321
		return self._logger if self._logger else base_logger
322
323
	@tornado.gen.engine
324
	def add_channel(self, channel):
325
		self.channels.append(channel)
326
		yield tornado.gen.Task(
327
			self.async_redis.subscribe, (channel,))
328
329
	def do_db(self, callback, *args, **kwargs):
330
		try:
331
			return callback(*args, **kwargs)
332
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
333
			self.logger.warning('%s, reconnecting' % e)  # TODO
334
			connection.close()
335
			return callback(*args, **kwargs)
336
337
	def execute_query(self, query, *args, **kwargs):
338
		cursor = connection.cursor()
339
		cursor.execute(query, *args, **kwargs)
340
		desc = cursor.description
341
		return [
342
			dict(zip([col[0] for col in desc], row))
343
			for row in cursor.fetchall()
344
			]
345
346
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
347
		"""
348
		:rtype : dict
349
		returns (dict, bool) if check_type is present
350
		"""
351
		online = self.sync_redis.hgetall(channel)
352
		self.logger.debug('!! channel %s redis online: %s', channel, online)
353
		result = set()
354
		user_is_online = False
355
		# redis stores REDIS_USER_FORMAT, so parse them
356
		if online:
357
			for key_hash, raw_user_id in online.items():  # py2 iteritems
358
				user_id = int(raw_user_id.decode('utf-8'))
359
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
360
					user_is_online = True
361
				result.add(user_id)
362
		result = list(result)
363
		return (result, user_is_online) if check_user_id else result
364
365
	def add_online_user(self, room_id, offline_messages=None):
366
		"""
367
		adds to redis
368
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
369
		:return:
370
		"""
371
		self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user)
372
		# since we add user to online first, latest trigger will always show correct online
373
		online, is_online = self.get_online_from_redis(room_id, self.user_id, self.id)
374
		if not is_online:  # if a new tab has been opened
375
			online.append(self.user_id)
376
			online_user_names_mes = self.room_online(
377
				online,
378
				Actions.LOGIN,
379
				room_id
380
			)
381
			self.logger.info('!! First tab, sending refresh online for all')
382
			self.publish(online_user_names_mes, room_id)
383
			if offline_messages:
384
				self.safe_write(self.load_offline_message(offline_messages, room_id))
385
		else:  # Send user names to self
386
			online_user_names_mes = self.room_online(
387
				online,
388
				Actions.REFRESH_USER,
389
				room_id
390
			)
391
			self.logger.info('!! Second tab, retrieving online for self')
392
			self.safe_write(online_user_names_mes)
393
394
	def publish(self, message, channel, parsable=False):
395
		jsoned_mess = json.dumps(message)
396
		self.logger.debug('<%s> %s', channel, jsoned_mess)
397
		if parsable:
398
			jsoned_mess = self.encode(jsoned_mess)
399
		self.async_redis_publisher.publish(channel, jsoned_mess)
400
401
	def encode(self, message):
402
		"""
403
		Marks message with prefix to specify that
404
		it should be decoded and proccesed before sending to client
405
		@param message: message to mark
406
		@return: marked message
407
		"""
408
		return self.parsable_prefix + message
409
410
	def remove_parsable_prefix(self, message):
411
		if message.startswith(self.parsable_prefix):
412
			return message[1:]
413
414
	def new_message(self, message):
415
		data = message.body
416
		if isinstance(data, str_type):  # subscribe event
417
			prefixless_str = self.remove_parsable_prefix(data)
418
			if prefixless_str:
419
				self.safe_write(prefixless_str)
420
				dict_message = json.loads(prefixless_str)
421
				self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
422
			else:
423
				self.safe_write(data)
424
425
	def safe_write(self, message):
426
		raise NotImplementedError('WebSocketHandler implements')
427
428
	def process_send_message(self, message):
429
		"""
430
		:type message: dict
431
		"""
432
		channel = message[VarNames.CHANNEL]
433
		message_db = Message(
434
			sender_id=self.user_id,
435
			content=message[VarNames.CONTENT]
436
		)
437
		message_db.room_id = channel
438
		if VarNames.IMG in message:
439
			message_db.img = extract_photo(message[VarNames.IMG], message.get(VarNames.FILE_NAME))
440
		self.do_db(message_db.save)  # exit on hacked id with exception
441
		prepared_message = self.create_send_message(message_db)
442
		self.publish(prepared_message, channel)
443
444
	def process_call(self, in_message):
445
		"""
446
		:type in_message: dict
447
		"""
448
		call_type = in_message.get(VarNames.CALL_TYPE)
449
		set_opponent_channel = False
450
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
451
		if call_type == CallType.OFFER:
452
			room_id = in_message[VarNames.CHANNEL]
453
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
454
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
455
			set_opponent_channel = True
456
			out_message[VarNames.CHANNEL] = room_id
457
		# TODO
458
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
459
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
460
461
	def create_new_room(self, message):
462
		room_name = message[VarNames.ROOM_NAME]
463
		if not room_name or len(room_name) > 16:
464
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
465
		room = Room(name=room_name)
466
		self.do_db(room.save)
467
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
468
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
469
		self.publish(subscribe_message, self.channel, True)
470
471
	def invite_user(self, message):
472
		room_id = message[VarNames.ROOM_ID]
473
		user_id = message[VarNames.USER_ID]
474
		if room_id not in self.channels:
475
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
476
		room = self.do_db(Room.objects.get, id=room_id)
477
		if room.is_private:
478
			raise ValidationError("You can't add users to direct room, create a new room instead")
479
		try:
480
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
481
		except IntegrityError:
482
			raise ValidationError("User is already in channel")
483
		users_in_room = {}
484
		for user in room.users.all():
485
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
486
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
487
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
488
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
489
490
	def create_room(self, user_rooms, user_id):
491
		if self.user_id == user_id:
492
			room_ids = list([room['room_id'] for room in user_rooms])
493
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
494
		else:
495
			rooms_query = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms)
496
			query_res = rooms_query.values('room__id', 'room__disabled')
497
		if len(query_res) > 0:
498
			room = query_res[0]
499
			room_id = room['room__id']
500
			self.update_room(room_id, room['room__disabled'])
501
		else:
502
			room = Room()
503
			room.save()
504
			room_id = room.id
505
			if self.user_id == user_id:
506
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
507
			else:
508
				RoomUsers.objects.bulk_create([
509
					RoomUsers(user_id=user_id, room_id=room_id),
510
					RoomUsers(user_id=self.user_id, room_id=room_id),
511
				])
512
		return room_id
513
514
	def update_room(self, room_id, disabled):
515
		if not disabled:
516
			raise ValidationError('This room already exist')
517
		else:
518
			Room.objects.filter(id=room_id).update(disabled=False)
519
520
	def create_user_channel(self, message):
521
		user_id = message[VarNames.USER_ID]
522
		# get all self private rooms ids
523
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
524
		# get private room that contains another user from rooms above
525
		room_id = self.create_room(user_rooms, user_id)
526
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
527
		self.publish(subscribe_message, self.channel, True)
528
		other_channel = RedisPrefix.generate_user(user_id)
529
		if self.channel != other_channel:
530
			self.publish(subscribe_message, other_channel, True)
531
532
	def delete_channel(self, message):
533
		room_id = message[VarNames.ROOM_ID]
534
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
535
			raise ValidationError('You are not allowed to exit this room')
536
		room = self.do_db(Room.objects.get, id=room_id)
537
		if room.disabled:
538
			raise ValidationError('Room is already deleted')
539
		if room.name is None:  # if private then disable
540
			room.disabled = True
541
		else: # if public -> leave the room, delete the link
542
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
543
			online = self.get_online_from_redis(room_id)
544
			online.remove(self.user_id)
545
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
546
		room.save()
547
		message = self.unsubscribe_direct_message(room_id)
548
		self.publish(message, room_id, True)
549
550
	def edit_message(self, data):
551
		message_id = data[VarNames.MESSAGE_ID]
552
		message = Message.objects.get(id=message_id)
553
		if message.sender_id != self.user_id:
554
			raise ValidationError("You can only edit your messages")
555
		if message.time + 60000 < get_milliseconds():
556
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
557
		if message.deleted:
558
			raise ValidationError("Already deleted")
559
		message.content = data[VarNames.CONTENT]
560
		selector = Message.objects.filter(id=message_id)
561
		if message.content is None:
562
			selector.update(deleted=True)
563
			action = Actions.DELETE_MESSAGE
564
		else:
565
			action = Actions.EDIT_MESSAGE
566
			selector.update(content=message.content)
567
		self.publish(self.create_send_message(message, action), message.room_id)
568
569
	def send_client_new_channel(self, message):
570
		room_id = message[VarNames.ROOM_ID]
571
		self.add_channel(room_id)
572
		self.add_online_user(room_id)
573
574
	def set_opponent_call_channel(self, message):
575
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
576
577
	def send_client_delete_channel(self, message):
578
		room_id = message[VarNames.ROOM_ID]
579
		self.async_redis.unsubscribe((room_id,))
580
		self.async_redis_publisher.hdel(room_id, self.id)
581
		self.channels.remove(room_id)
582
583
	def process_get_messages(self, data):
584
		"""
585
		:type data: dict
586
		"""
587
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
588
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
589
		room_id = data[VarNames.CHANNEL]
590
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
591
		if header_id is None:
592
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
593
		else:
594
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
595
		response = self.do_db(self.get_messages, messages, room_id)
596
		self.safe_write(response)
597
598
	def get_offline_messages(self):
599
		res = {}
600
		offline_messages = Message.objects.filter(
601
			id__gt=F('room__roomusers__last_read_message_id'),
602
			deleted=False,
603
			room__roomusers__user_id=self.user_id
604
		)
605
		for message in offline_messages:
606
			res.setdefault(message.room_id, []).append(self.create_message(message))
607
		return res
608
609
	def get_users_in_current_user_rooms(self):
610
		"""
611
		{
612
			"ROOM_ID:1": {
613
				"name": "All",
614
				"users": {
615
					"USER_ID:admin": {
616
						"name": "USER_NAME:admin",
617
						"sex": "SEX:Secret"
618
					},
619
					"USER_ID_2": {
620
						"name": "USER_NAME:Mike",
621
						"sex": "Male"
622
					}
623
				},
624
				"isPrivate": true
625
			}
626
		}
627
		"""
628
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
629
		res = {room['id']: {
630
				VarNames.ROOM_NAME: room['name'],
631
				VarNames.ROOM_USERS: {}
632
			} for room in user_rooms}
633
		room_ids = (room_id for room_id in res)
634
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
635
		for user in rooms_users:
636
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
637
		return res
638
639
	def set_js_user_structure(self, user_dict, user_id, name, sex):
640
		user_dict[user_id] = {
641
			VarNames.USER: name,
642
			VarNames.GENDER: GENDERS[sex]
643
		}
644
645
	def save_ip(self):
646
		if (self.do_db(UserJoinedInfo.objects.filter(
647
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
648
			return
649
		ip_address = get_or_create_ip(self.id, self.logger)
650
		UserJoinedInfo.objects.create(
651
			ip=ip_address,
652
			user_id=self.user_id
653
		)
654
655
	def publish_logout(self, channel, log_data):
656
		# seems like async solves problem with connection lost and wrong data status
657
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
658
		online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
659
		log_data[channel] = {'online': online, 'is_online': is_online}
660
		if not is_online:
661
			message = self.room_online(online, Actions.LOGOUT, channel)
662
			self.publish(message, channel)
663
			return True
664
665
666
class AntiSpam(object):
667
668
	def __init__(self):
669
		self.spammed = 0
670
		self.info = {}
671
672
	def check_spam(self, json_message):
673
		message_length = len(json_message)
674
		info_key = int(round(time.time() * 100))
675
		self.info[info_key] = message_length
676
		if message_length > MAX_MESSAGE_SIZE:
677
			self.spammed += 1
678
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
679
		self.check_timed_spam()
680
681
	def check_timed_spam(self):
682
		# TODO implement me
683
		pass
684
		# raise ValidationError("You're chatting too much, calm down a bit!")
685
686
687
class TornadoHandler(WebSocketHandler, MessagesHandler):
688
689
	def __init__(self, *args, **kwargs):
690
		super(TornadoHandler, self).__init__(*args, **kwargs)
691
		self.__connected__ = False
692
		self.anti_spam = AntiSpam()
693
694
	@property
695
	def connected(self):
696
		return self.__connected__
697
698
	@connected.setter
699
	def connected(self, value):
700
		self.__connected__ = value
701
702
	def data_received(self, chunk):
703
		pass
704
705
	def on_message(self, json_message):
706
		try:
707
			if not self.connected:
708
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
709
			if not json_message:
710
				raise ValidationError('Skipping null message')
711
			# self.anti_spam.check_spam(json_message)
712
			self.logger.debug('<< %.1000s', json_message)
713
			message = json.loads(json_message)
714
			if message[VarNames.EVENT] not in self.pre_process_message:
715
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
716
			channel = message.get(VarNames.CHANNEL)
717
			if channel and channel not in self.channels:
718
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
719
			self.pre_process_message[message[VarNames.EVENT]](message)
720
		except ValidationError as e:
721
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
722
			self.safe_write(error_message)
723
724
	def on_close(self):
725
		if self.async_redis.subscribed:
726
			self.logger.info("Close event, unsubscribing from %s", self.channels)
727
			self.async_redis.unsubscribe(self.channels)
728
		else:
729
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
730
		log_data = {}
731
		gone_offline = False
732
		for channel in self.channels:
733
			if not isinstance(channel, Number):
734
				continue
735
			self.sync_redis.hdel(channel, self.id)
736
			if self.connected:
737
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
738
		if gone_offline:
739
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
740
			self.logger.info("Updated %s last read message", res)
741
		self.disconnect(json.dumps(log_data))
742
743
	def disconnect(self, log_data, tries=0):
744
		"""
745
		Closes a connection if it's not in proggress, otherwice timeouts closing
746
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
747
		"""
748
		self.connected = False
749
		self.closed_channels = self.channels
750
		self.channels = []
751
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
752
			self.logger.debug('Closing a connection timeouts')
753
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
754
		else:
755
			self.logger.info("Close connection result: %s", log_data)
756
			self.async_redis.disconnect()
757
758
	def open(self):
759
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
760
		if sessionStore.exists(session_key):
761
			self.ip = self.get_client_ip()
762
			session = SessionStore(session_key)
763
			self.user_id = int(session["_auth_user_id"])
764
			log_params = {
765
				'user_id': str(self.user_id).zfill(3),
766
				'id': self.id,
767
				'ip': self.ip
768
			}
769
			self._logger = logging.LoggerAdapter(base_logger, log_params)
770
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
771
			self.async_redis.connect()
772
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
773
			self.sender_name = user_db.username
774
			self.sex = user_db.sex_str
775
			user_rooms = self.get_users_in_current_user_rooms()
776
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
777
			# get all missed messages
778
			self.channels = []  # py2 doesn't support clear()
779
			self.channels.append(self.channel)
780
			for room_id in user_rooms:
781
				self.channels.append(room_id)
782
			self.listen(self.channels)
783
			off_messages = self.get_offline_messages()
784
			for room_id in user_rooms:
785
				self.add_online_user(room_id, off_messages.get(room_id))
786
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
787
			self.connected = True
788
			Thread(target=self.save_ip).start()
789
		else:
790
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
791
			self.close(403, "Session key %s has been rejected" % session_key)
792
793
	def check_origin(self, origin):
794
		"""
795
		check whether browser set domain matches origin
796
		"""
797
		parsed_origin = urlparse(origin)
798
		origin = parsed_origin.netloc
799
		origin_domain = origin.split(':')[0].lower()
800
		browser_set = self.request.headers.get("Host")
801
		browser_domain = browser_set.split(':')[0]
802
		return browser_domain == origin_domain
803
804
	def safe_write(self, message):
805
		"""
806
		Tries to send message, doesn't throw exception outside
807
		:type self: MessagesHandler
808
		"""
809
		# self.logger.debug('<< THREAD %s >>', os.getppid())
810
		try:
811
			if isinstance(message, dict):
812
				message = json.dumps(message)
813
			if not isinstance(message, str_type):
814
				raise ValueError('Wrong message type : %s' % str(message))
815
			self.logger.debug(">> %.1000s", message)
816
			self.write_message(message)
817
		except tornado.websocket.WebSocketClosedError as e:
818
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
819
820
	def get_client_ip(self):
821
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
822