Completed
Push — master ( da0343...48c94d )
by Andrew
01:03
created

MessagesHandler.connected()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 3
rs 10
1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from threading import Thread
7
from urllib.request import urlopen
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.ioloop
12
import tornado.web
13
import tornado.websocket
14
import tornadoredis
15
from django.conf import settings
16
from django.core.exceptions import ValidationError
17
from django.db import connection, OperationalError, InterfaceError, IntegrityError
18
from django.db.models import Q, F
19
from redis_sessions.session import SessionStore
20
from tornado import ioloop
21
from tornado.websocket import WebSocketHandler
22
23
from chat.utils import extract_photo
24
25
try:
26
	from urllib.parse import urlparse  # py2
27
except ImportError:
28
	from urlparse import urlparse  # py3
29
30
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM
31
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo, RoomUsers
32
33
PY3 = sys.version > '3'
34
str_type = str if PY3 else basestring
35
api_url = getattr(settings, "IP_API_URL", None)
36
37
sessionStore = SessionStore()
38
39
base_logger = logging.getLogger(__name__)
40
41
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
42
#CONNECTION_POOL = tornadoredis.ConnectionPool(
43
#	max_connections=500,
44
#	wait_for_available=True)
45
46
47
class Actions(object):
48
	LOGIN = 'addOnlineUser'
49
	LOGOUT = 'removeOnlineUser'
50
	SEND_MESSAGE = 'sendMessage'
51
	PRINT_MESSAGE = 'printMessage'
52
	CALL = 'call'
53
	ROOMS = 'setRooms'
54
	REFRESH_USER = 'setOnlineUsers'
55
	GROWL_MESSAGE = 'growl'
56
	GET_MESSAGES = 'loadMessages'
57
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
58
	DELETE_ROOM = 'deleteRoom'
59
	EDIT_MESSAGE = 'editMessage'
60
	DELETE_MESSAGE = 'deleteMessage'
61
	CREATE_ROOM_CHANNEL = 'addRoom'
62
	INVITE_USER = 'inviteUser'
63
	ADD_USER = 'addUserToAll'
64
	OFFLINE_MESSAGES = 'loadOfflineMessages'
65
66
67
class VarNames(object):
68
	CALL_TYPE = 'type'
69
	USER = 'user'
70
	USER_ID = 'userId'
71
	TIME = 'time'
72
	CONTENT = 'content'
73
	IMG = 'image'
74
	EVENT = 'action'
75
	MESSAGE_ID = 'id'
76
	GENDER = 'sex'
77
	ROOM_NAME = 'name'
78
	ROOM_ID = 'roomId'
79
	ROOM_USERS = 'users'
80
	CHANNEL = 'channel'
81
	GET_MESSAGES_COUNT = 'count'
82
	GET_MESSAGES_HEADER_ID = 'headerId'
83
	CHANNEL_NAME = 'channel'
84
	IS_ROOM_PRIVATE = 'private'
85
	#ROOM_NAME = 'roomName'
86
	# ROOM_ID = 'roomId'
87
88
89
class CallType(object):
90
	OFFER = 'offer'
91
92
class HandlerNames:
93
	NAME = 'handler'
94
	CHANNELS = 'channels'
95
	CHAT = 'chat'
96
	GROWL = 'growl'
97
	WEBRTC = 'webrtc'
98
	FILE = 'file'
99
100
101
class RedisPrefix:
102
	USER_ID_CHANNEL_PREFIX = 'u'
103
	__ROOM_ONLINE__ = 'o:{}'
104
105
	@classmethod
106
	def generate_user(cls, key):
107
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
108
109
RedisPrefix.DEFAULT_CHANNEL = ALL_ROOM_ID
110
111
112
class MessagesCreator(object):
113
114
	def __init__(self, *args, **kwargs):
115
		super(MessagesCreator, self).__init__(*args, **kwargs)
116
		self.sex = None
117
		self.sender_name = None
118
		self.user_id = 0  # anonymous by default
119
120
	def default(self, content, event, handler):
121
		"""
122
		:return: {"action": event, "content": content, "time": "20:48:57"}
123
		"""
124
		return {
125
			VarNames.EVENT: event,
126
			VarNames.CONTENT: content,
127
			VarNames.USER_ID: self.user_id,
128
			VarNames.TIME: get_milliseconds(),
129
			HandlerNames.NAME: handler
130
		}
131
132
	def room_online(self, online, event, channel):
133
		"""
134
		:return: {"action": event, "content": content, "time": "20:48:57"}
135
		"""
136
		room_less = self.default(online, event, HandlerNames.CHAT)
137
		room_less[VarNames.CHANNEL_NAME] = channel
138
		room_less[VarNames.USER] = self.sender_name
139
		room_less[VarNames.GENDER] = self.sex
140
		return room_less
141
142
	def offer_call(self, content, message_type):
143
		"""
144
		:return: {"action": "call", "content": content, "time": "20:48:57"}
145
		"""
146
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
147
		message[VarNames.CALL_TYPE] = message_type
148
		message[VarNames.USER] = self.sender_name
149
		return message
150
151
	@classmethod
152
	def create_message(cls, message):
153
		res = {
154
			VarNames.USER_ID: message.sender_id,
155
			VarNames.CONTENT: message.content,
156
			VarNames.TIME: message.time,
157
			VarNames.MESSAGE_ID: message.id,
158
		}
159
		if message.img.name:
160
			res[VarNames.IMG] = message.img.url
161
		return res
162
163
	@classmethod
164
	def create_send_message(cls, message, event=Actions.PRINT_MESSAGE):
165
		"""
166
		:param message:
167
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
168
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
169
		"""
170
		res = cls.create_message(message)
171
		res[VarNames.EVENT] = event
172
		res[VarNames.CHANNEL] = message.room_id
173
		res[HandlerNames.NAME] = HandlerNames.CHAT
174
		return res
175
176
	@classmethod
177
	def get_messages(cls, messages, channel):
178
		"""
179
		:type messages: list[Messages]
180
		:type channel: str
181
		:type messages: QuerySet[Messages]
182
		"""
183
		return {
184
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
185
			VarNames.EVENT: Actions.GET_MESSAGES,
186
			VarNames.CHANNEL: channel,
187
			HandlerNames.NAME: HandlerNames.CHAT
188
		}
189
190
	@property
191
	def stored_redis_user(self):
192
		return  self.user_id
193
194
	@property
195
	def channel(self):
196
		return RedisPrefix.generate_user(self.user_id)
197
198
	def subscribe_direct_channel_message(self, room_id, other_user_id):
199
		return {
200
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
201
			VarNames.ROOM_ID: room_id,
202
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
203
			HandlerNames.NAME: HandlerNames.CHANNELS
204
		}
205
206
	def subscribe_room_channel_message(self, room_id, room_name):
207
		return {
208
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
209
			VarNames.ROOM_ID: room_id,
210
			VarNames.ROOM_USERS: [self.user_id],
211
			HandlerNames.NAME: HandlerNames.CHANNELS,
212
			VarNames.ROOM_NAME: room_name
213
		}
214
215
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
216
		return {
217
			VarNames.EVENT: Actions.INVITE_USER,
218
			VarNames.ROOM_ID: room_id,
219
			VarNames.USER_ID: user_id,
220
			HandlerNames.NAME: HandlerNames.CHANNELS,
221
			VarNames.ROOM_NAME: room_name,
222
			VarNames.CONTENT: users
223
		}
224
225
	def add_user_to_room(self, channel, user_id, content):
226
		return {
227
			VarNames.EVENT: Actions.ADD_USER,
228
			VarNames.CHANNEL: channel,
229
			VarNames.USER_ID: user_id,
230
			HandlerNames.NAME: HandlerNames.CHAT,
231
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
232
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
233
		}
234
235
	def unsubscribe_direct_message(self, room_id):
236
		return {
237
			VarNames.EVENT: Actions.DELETE_ROOM,
238
			VarNames.ROOM_ID: room_id,
239
			VarNames.USER_ID: self.user_id,
240
			HandlerNames.NAME: HandlerNames.CHANNELS,
241
			VarNames.TIME: get_milliseconds()
242
		}
243
244
	def load_offline_message(self, offline_messages, channel_key):
245
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
246
		res[VarNames.CHANNEL] = channel_key
247
		return res
248
249
250
class MessagesHandler(MessagesCreator):
251
252
	def __init__(self, *args, **kwargs):
253
		self.closed_channels = None
254
		self.parsable_prefix = 'p'
255
		super(MessagesHandler, self).__init__(*args, **kwargs)
256
		self.id = id(self)
257
		self.ip = None
258
		from chat import global_redis
259
		self.async_redis_publisher = global_redis.async_redis_publisher
260
		self.sync_redis = global_redis.sync_redis
261
		self.channels = []
262
		self.call_receiver_channel = None
263
		self._logger = None
264
		self.async_redis = tornadoredis.Client()
265
		self.patch_tornadoredis()
266
		self.pre_process_message = {
267
			Actions.GET_MESSAGES: self.process_get_messages,
268
			Actions.SEND_MESSAGE: self.process_send_message,
269
			Actions.CALL: self.process_call,
270
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
271
			Actions.DELETE_ROOM: self.delete_channel,
272
			Actions.EDIT_MESSAGE: self.edit_message,
273
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
274
			Actions.INVITE_USER: self.invite_user,
275
		}
276
		self.post_process_message = {
277
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
278
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
279
			Actions.DELETE_ROOM: self.send_client_delete_channel,
280
			Actions.INVITE_USER: self.send_client_new_channel,
281
			Actions.CALL: self.set_opponent_call_channel
282
		}
283
284
	def patch_tornadoredis(self):  # TODO remove this
285
		self.async_redis.connection.old_read = self.async_redis.connection.read
286
		def new_read(instance, *args, **kwargs):
287
			try:
288
				return instance.old_read(*args, **kwargs)
289
			except Exception as e:
290
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
291
				self.logger.error(e)
292
				self.logger.error(
293
					"Exception info: "
294
					"self.connected = '%s';;; "
295
					"Redis default channel online = '%s';;; "
296
					"self.channels = '%s';;; "
297
					"self.closed_channels  = '%s';;;",
298
					self.connected, current_online, self.channels, self.closed_channels
299
				)
300
				raise e
301
		fabric = type(self.async_redis.connection.read)
302
		self.async_redis.connection.read = fabric(new_read, self.async_redis.connection)
303
304
	@property
305
	def connected(self):
306
		raise NotImplemented
307
308
	@connected.setter
309
	def connected(self, value):
310
		raise NotImplemented
311
312
	@tornado.gen.engine
313
	def listen(self, channels):
314
		yield tornado.gen.Task(
315
			self.async_redis.subscribe, channels)
316
		self.async_redis.listen(self.new_message)
317
318
	@property
319
	def logger(self):
320
		return self._logger if self._logger else base_logger
321
322
	@tornado.gen.engine
323
	def add_channel(self, channel):
324
		self.channels.append(channel)
325
		yield tornado.gen.Task(
326
			self.async_redis.subscribe, (channel,))
327
328
	def do_db(self, callback, *args, **kwargs):
329
		try:
330
			return callback(*args, **kwargs)
331
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
332
			self.logger.warning('%s, reconnecting' % e)  # TODO
333
			connection.close()
334
			return callback(*args, **kwargs)
335
336
	def execute_query(self, query, *args, **kwargs):
337
		cursor = connection.cursor()
338
		cursor.execute(query, *args, **kwargs)
339
		desc = cursor.description
340
		return [
341
			dict(zip([col[0] for col in desc], row))
342
			for row in cursor.fetchall()
343
			]
344
345
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
346
		"""
347
		:rtype : dict
348
		returns (dict, bool) if check_type is present
349
		"""
350
		online = self.sync_redis.hgetall(channel)
351
		self.logger.debug('!! channel %s redis online: %s', channel, online)
352
		result = set()
353
		user_is_online = False
354
		# redis stores REDIS_USER_FORMAT, so parse them
355
		if online:
356
			for key_hash, raw_user_id in online.items():  # py2 iteritems
357
				user_id = int(raw_user_id.decode('utf-8'))
358
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
359
					user_is_online = True
360
				result.add(user_id)
361
		result = list(result)
362
		return (result, user_is_online) if check_user_id else result
363
364
	def add_online_user(self, room_id, offline_messages=None):
365
		"""
366
		adds to redis
367
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
368
		:return:
369
		"""
370
		self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user)
371
		# since we add user to online first, latest trigger will always show correct online
372
		online, is_online = self.get_online_from_redis(room_id, self.user_id, self.id)
373
		if not is_online:  # if a new tab has been opened
374
			online.append(self.user_id)
375
			online_user_names_mes = self.room_online(
376
				online,
377
				Actions.LOGIN,
378
				room_id
379
			)
380
			self.logger.info('!! First tab, sending refresh online for all')
381
			self.publish(online_user_names_mes, room_id)
382
			if offline_messages:
383
				self.safe_write(self.load_offline_message(offline_messages, room_id))
384
		else:  # Send user names to self
385
			online_user_names_mes = self.room_online(
386
				online,
387
				Actions.REFRESH_USER,
388
				room_id
389
			)
390
			self.logger.info('!! Second tab, retrieving online for self')
391
			self.safe_write(online_user_names_mes)
392
393
	def publish(self, message, channel, parsable=False):
394
		jsoned_mess = json.dumps(message)
395
		self.logger.debug('<%s> %s', channel, jsoned_mess)
396
		if parsable:
397
			jsoned_mess = self.encode(jsoned_mess)
398
		self.async_redis_publisher.publish(channel, jsoned_mess)
399
400
	def encode(self, message):
401
		"""
402
		Marks message with prefix to specify that
403
		it should be decoded and proccesed before sending to client
404
		@param message: message to mark
405
		@return: marked message
406
		"""
407
		return self.parsable_prefix + message
408
409
	def decode(self, message):
410
		"""
411
		Check if message should be proccessed by server before writing to client
412
		@param message: message to check
413
		@type message: str
414
		@return: Object structure of message if it should be processed, None if not
415
		"""
416
		if message.startswith(self.parsable_prefix):
417
			return json.loads(message[1:])
418
419
	def new_message(self, message):
420
		data = message.body
421
		if isinstance(data, str_type):  # subscribe event
422
			decoded = self.decode(data)
423
			if decoded:
424
				data = decoded
425
			self.safe_write(data)
426
			if decoded:
427
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
428
429
	def safe_write(self, message):
430
		raise NotImplementedError('WebSocketHandler implements')
431
432
	def process_send_message(self, message):
433
		"""
434
		:type message: dict
435
		"""
436
		channel = message[VarNames.CHANNEL]
437
		message_db = Message(
438
			sender_id=self.user_id,
439
			content=message[VarNames.CONTENT]
440
		)
441
		message_db.room_id = channel
442
		if VarNames.IMG in message:
443
			message_db.img = extract_photo(message[VarNames.IMG])
444
		self.do_db(message_db.save)  # exit on hacked id with exception
445
		prepared_message = self.create_send_message(message_db)
446
		self.publish(prepared_message, channel)
447
448
	def process_call(self, in_message):
449
		"""
450
		:type in_message: dict
451
		"""
452
		call_type = in_message.get(VarNames.CALL_TYPE)
453
		set_opponent_channel = False
454
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
455
		if call_type == CallType.OFFER:
456
			room_id = in_message[VarNames.CHANNEL]
457
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
458
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
459
			set_opponent_channel = True
460
			out_message[VarNames.CHANNEL] = room_id
461
		# TODO
462
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
463
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
464
465
	def create_new_room(self, message):
466
		room_name = message[VarNames.ROOM_NAME]
467
		if not room_name or len(room_name) > 16:
468
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
469
		room = Room(name=room_name)
470
		self.do_db(room.save)
471
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
472
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
473
		self.publish(subscribe_message, self.channel, True)
474
475
	def invite_user(self, message):
476
		room_id = message[VarNames.ROOM_ID]
477
		user_id = message[VarNames.USER_ID]
478
		if room_id not in self.channels:
479
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
480
		room = self.do_db(Room.objects.get, id=room_id)
481
		if room.is_private:
482
			raise ValidationError("You can't add users to direct room, create a new room instead")
483
		try:
484
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
485
		except IntegrityError:
486
			raise ValidationError("User is already in channel")
487
		users_in_room = {}
488
		for user in room.users.all():
489
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
490
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
491
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
492
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
493
494
	def create_room(self, user_rooms, user_id):
495
		if self.user_id == user_id:
496
			room_ids = list([room['room_id'] for room in user_rooms])
497
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
498
		else:
499
			rooms_query = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms)
500
			query_res = rooms_query.values('room__id', 'room__disabled')
501
		if len(query_res) > 0:
502
			room = query_res[0]
503
			room_id = room['room__id']
504
			self.update_room(room_id, room['room__disabled'])
505
		else:
506
			room = Room()
507
			room.save()
508
			room_id = room.id
509
			if self.user_id == user_id:
510
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
511
			else:
512
				RoomUsers.objects.bulk_create([
513
					RoomUsers(user_id=user_id, room_id=room_id),
514
					RoomUsers(user_id=self.user_id, room_id=room_id),
515
				])
516
		return room_id
517
518
	def update_room(self, room_id, disabled):
519
		if not disabled:
520
			raise ValidationError('This room already exist')
521
		else:
522
			Room.objects.filter(id=room_id).update(disabled=False)
523
524
	def create_user_channel(self, message):
525
		user_id = message[VarNames.USER_ID]
526
		# get all self private rooms ids
527
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
528
		# get private room that contains another user from rooms above
529
		room_id = self.create_room(user_rooms, user_id)
530
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
531
		self.publish(subscribe_message, self.channel, True)
532
		other_channel = RedisPrefix.generate_user(user_id)
533
		if self.channel != other_channel:
534
			self.publish(subscribe_message, other_channel, True)
535
536
	def delete_channel(self, message):
537
		room_id = message[VarNames.ROOM_ID]
538
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
539
			raise ValidationError('You are not allowed to exit this room')
540
		room = self.do_db(Room.objects.get, id=room_id)
541
		if room.disabled:
542
			raise ValidationError('Room is already deleted')
543
		if room.name is None:  # if private then disable
544
			room.disabled = True
545
		else: # if public -> leave the room, delete the link
546
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
547
			online = self.get_online_from_redis(room_id)
548
			online.remove(self.user_id)
549
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
550
		room.save()
551
		message = self.unsubscribe_direct_message(room_id)
552
		self.publish(message, room_id, True)
553
554
	def edit_message(self, data):
555
		message_id = data[VarNames.MESSAGE_ID]
556
		message = Message.objects.get(id=message_id)
557
		if message.sender_id != self.user_id:
558
			raise ValidationError("You can only edit your messages")
559
		if message.time + 60000 < get_milliseconds():
560
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
561
		if message.deleted:
562
			raise ValidationError("Already deleted")
563
		message.content = data[VarNames.CONTENT]
564
		selector = Message.objects.filter(id=message_id)
565
		if message.content is None:
566
			selector.update(deleted=True)
567
			action = Actions.DELETE_MESSAGE
568
		else:
569
			action = Actions.EDIT_MESSAGE
570
			selector.update(content=message.content)
571
		self.publish(self.create_send_message(message, action), message.room_id)
572
573
	def send_client_new_channel(self, message):
574
		room_id = message[VarNames.ROOM_ID]
575
		self.add_channel(room_id)
576
		self.add_online_user(room_id)
577
578
	def set_opponent_call_channel(self, message):
579
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
580
581
	def send_client_delete_channel(self, message):
582
		room_id = message[VarNames.ROOM_ID]
583
		self.async_redis.unsubscribe((room_id,))
584
		self.async_redis_publisher.hdel(room_id, self.id)
585
		self.channels.remove(room_id)
586
587
	def process_get_messages(self, data):
588
		"""
589
		:type data: dict
590
		"""
591
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
592
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
593
		room_id = data[VarNames.CHANNEL]
594
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
595
		if header_id is None:
596
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
597
		else:
598
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
599
		response = self.do_db(self.get_messages, messages, room_id)
600
		self.safe_write(response)
601
602
	def get_offline_messages(self):
603
		res = {}
604
		offline_messages = Message.objects.filter(
605
			id__gt=F('room__roomusers__last_read_message_id'),
606
			deleted=False,
607
			room__roomusers__user_id=self.user_id
608
		)
609
		for message in offline_messages:
610
			res.setdefault(message.room_id, []).append(self.create_message(message))
611
		return res
612
613
	def get_users_in_current_user_rooms(self):
614
		"""
615
		{
616
			"ROOM_ID:1": {
617
				"name": "All",
618
				"users": {
619
					"USER_ID:admin": {
620
						"name": "USER_NAME:admin",
621
						"sex": "SEX:Secret"
622
					},
623
					"USER_ID_2": {
624
						"name": "USER_NAME:Mike",
625
						"sex": "Male"
626
					}
627
				},
628
				"isPrivate": true
629
			}
630
		}
631
		"""
632
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
633
		res = {room['id']: {
634
				VarNames.ROOM_NAME: room['name'],
635
				VarNames.ROOM_USERS: {}
636
			} for room in user_rooms}
637
		room_ids = (room_id for room_id in res)
638
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
639
		for user in rooms_users:
640
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
641
		return res
642
643
	def set_js_user_structure(self, user_dict, user_id, name, sex):
644
		user_dict[user_id] = {
645
			VarNames.USER: name,
646
			VarNames.GENDER: GENDERS[sex]
647
		}
648
649
	def save_ip(self):
650
		if (self.do_db(UserJoinedInfo.objects.filter(
651
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
652
			return
653
		ip_address = self.get_or_create_ip()
654
		UserJoinedInfo.objects.create(
655
			ip=ip_address,
656
			user_id=self.user_id
657
		)
658
659
	def get_or_create_ip(self):
660
		try:
661
			ip_address = IpAddress.objects.get(ip=self.ip)
662
		except IpAddress.DoesNotExist:
663
			try:
664
				if not api_url:
665
					raise Exception('api url is absent')
666
				self.logger.debug("Creating ip record %s", self.ip)
667
				f = urlopen(api_url % self.ip)
668
				raw_response = f.read().decode("utf-8")
669
				response = json.loads(raw_response)
670
				if response['status'] != "success":
671
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
672
				ip_address = IpAddress.objects.create(
673
					ip=self.ip,
674
					isp=response['isp'],
675
					country=response['country'],
676
					region=response['regionName'],
677
					city=response['city'],
678
					country_code=response['countryCode']
679
				)
680
			except Exception as e:
681
				self.logger.error("Error while creating ip with country info, because %s", e)
682
				ip_address = IpAddress.objects.create(ip=self.ip)
683
		return ip_address
684
685
	def publish_logout(self, channel, log_data):
686
		# seems like async solves problem with connection lost and wrong data status
687
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
688
		online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
689
		log_data[channel] = {'online': online, 'is_online': is_online}
690
		if not is_online:
691
			message = self.room_online(online, Actions.LOGOUT, channel)
692
			self.publish(message, channel)
693
			return True
694
695
696
class AntiSpam(object):
697
698
	def __init__(self):
699
		self.spammed = 0
700
		self.info = {}
701
702
	def check_spam(self, json_message):
703
		message_length = len(json_message)
704
		info_key = int(round(time.time() * 100))
705
		self.info[info_key] = message_length
706
		if message_length > MAX_MESSAGE_SIZE:
707
			self.spammed += 1
708
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
709
		self.check_timed_spam()
710
711
	def check_timed_spam(self):
712
		# TODO implement me
713
		pass
714
		# raise ValidationError("You're chatting too much, calm down a bit!")
715
716
717
class TornadoHandler(WebSocketHandler, MessagesHandler):
718
719
	def __init__(self, *args, **kwargs):
720
		super(TornadoHandler, self).__init__(*args, **kwargs)
721
		self.__connected__ = False
722
		self.anti_spam = AntiSpam()
723
724
	@property
725
	def connected(self):
726
		return self.__connected__
727
728
	@connected.setter
729
	def connected(self, value):
730
		self.__connected__ = value
731
732
	def data_received(self, chunk):
733
		pass
734
735
	def on_message(self, json_message):
736
		try:
737
			if not self.connected:
738
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
739
			if not json_message:
740
				raise ValidationError('Skipping null message')
741
			# self.anti_spam.check_spam(json_message)
742
			self.logger.debug('<< %s', json_message)
743
			message = json.loads(json_message)
744
			if message[VarNames.EVENT] not in self.pre_process_message:
745
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
746
			channel = message.get(VarNames.CHANNEL)
747
			if channel and channel not in self.channels:
748
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
749
			self.pre_process_message[message[VarNames.EVENT]](message)
750
		except ValidationError as e:
751
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
752
			self.safe_write(error_message)
753
754
	def on_close(self):
755
		if self.async_redis.subscribed:
756
			self.logger.info("Close event, unsubscribing from %s", self.channels)
757
			self.async_redis.unsubscribe(self.channels)
758
		else:
759
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
760
		log_data = {}
761
		gone_offline = False
762
		for channel in self.channels:
763
			if not isinstance(channel, int):
764
				continue
765
			self.sync_redis.hdel(channel, self.id)
766
			if self.connected:
767
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
768
		if gone_offline:
769
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
770
			self.logger.info("Updated %s last read message", res)
771
		self.disconnect(json.dumps(log_data))
772
773
	def disconnect(self, log_data, tries=0):
774
		"""
775
		Closes a connection if it's not in proggress, otherwice timeouts closing
776
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
777
		"""
778
		self.connected = False
779
		self.closed_channels = self.channels
780
		self.channels = []
781
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
782
			self.logger.debug('Closing a connection timeouts')
783
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
784
		else:
785
			self.logger.info("Close connection result: %s", log_data)
786
			self.async_redis.disconnect()
787
788
	def open(self):
789
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
790
		if sessionStore.exists(session_key):
791
			self.ip = self.get_client_ip()
792
			session = SessionStore(session_key)
793
			self.user_id = int(session["_auth_user_id"])
794
			log_params = {
795
				'user_id': str(self.user_id).zfill(3),
796
				'id': self.id,
797
				'ip': self.ip
798
			}
799
			self._logger = logging.LoggerAdapter(base_logger, log_params)
800
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
801
			self.async_redis.connect()
802
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
803
			self.sender_name = user_db.username
804
			self.sex = user_db.sex_str
805
			user_rooms = self.get_users_in_current_user_rooms()
806
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
807
			# get all missed messages
808
			self.channels.clear()
809
			self.channels.append(self.channel)
810
			for room_id in user_rooms:
811
				self.channels.append(room_id)
812
			self.listen(self.channels)
813
			off_messages = self.get_offline_messages()
814
			for room_id in user_rooms:
815
				self.add_online_user(room_id, off_messages.get(room_id))
816
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
817
			self.connected = True
818
			Thread(target=self.save_ip).start()
819
		else:
820
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
821
			self.close(403, "Session key %s has been rejected" % session_key)
822
823
	def check_origin(self, origin):
824
		"""
825
		check whether browser set domain matches origin
826
		"""
827
		parsed_origin = urlparse(origin)
828
		origin = parsed_origin.netloc
829
		origin_domain = origin.split(':')[0].lower()
830
		browser_set = self.request.headers.get("Host")
831
		browser_domain = browser_set.split(':')[0]
832
		return browser_domain == origin_domain
833
834
	def safe_write(self, message):
835
		"""
836
		Tries to send message, doesn't throw exception outside
837
		:type self: MessagesHandler
838
		"""
839
		# self.logger.debug('<< THREAD %s >>', os.getppid())
840
		try:
841
			if isinstance(message, dict):
842
				message = json.dumps(message)
843
			if not isinstance(message, str_type):
844
				raise ValueError('Wrong message type : %s' % str(message))
845
			self.logger.debug(">> %s", message)
846
			self.write_message(message)
847
		except tornado.websocket.WebSocketClosedError as e:
848
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
849
850
	def get_client_ip(self):
851
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
852