Completed
Push — master ( 7a266a...e90c63 )
by Andrew
01:02
created

MessagesHandler.publish_logout()   A

Complexity

Conditions 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 9
rs 9.6666
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo, RoomUsers
30
31
PY3 = sys.version > '3'
32
str_type = str if PY3 else basestring
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
base_logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions(object):
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	GROWL_MESSAGE = 'growl'
54
	GET_MESSAGES = 'loadMessages'
55
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
56
	DELETE_ROOM = 'deleteRoom'
57
	EDIT_MESSAGE = 'editMessage'
58
	DELETE_MESSAGE = 'deleteMessage'
59
	CREATE_ROOM_CHANNEL = 'addRoom'
60
	INVITE_USER = 'inviteUser'
61
	ADD_USER = 'addUserToAll'
62
	OFFLINE_MESSAGES = 'loadOfflineMessages'
63
64
65
class VarNames(object):
66
	CALL_TYPE = 'type'
67
	USER = 'user'
68
	USER_ID = 'userId'
69
	TIME = 'time'
70
	CONTENT = 'content'
71
	IMG = 'image'
72
	EVENT = 'action'
73
	MESSAGE_ID = 'id'
74
	GENDER = 'sex'
75
	ROOM_NAME = 'name'
76
	ROOM_ID = 'roomId'
77
	ROOM_USERS = 'users'
78
	CHANNEL = 'channel'
79
	GET_MESSAGES_COUNT = 'count'
80
	GET_MESSAGES_HEADER_ID = 'headerId'
81
	CHANNEL_NAME = 'channel'
82
	IS_ROOM_PRIVATE = 'private'
83
	#ROOM_NAME = 'roomName'
84
	# ROOM_ID = 'roomId'
85
86
87
class CallType(object):
88
	OFFER = 'offer'
89
90
class HandlerNames:
91
	NAME = 'handler'
92
	CHANNELS = 'channels'
93
	CHAT = 'chat'
94
	GROWL = 'growl'
95
	WEBRTC = 'webrtc'
96
	FILE = 'file'
97
98
99
class RedisPrefix:
100
	USER_ID_CHANNEL_PREFIX = 'u'
101
	__ROOM_ONLINE__ = 'o:{}'
102
103
	@classmethod
104
	def generate_user(cls, key):
105
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
106
107
RedisPrefix.DEFAULT_CHANNEL = ALL_ROOM_ID
108
109
110
class MessagesCreator(object):
111
112
	def __init__(self, *args, **kwargs):
113
		super(MessagesCreator, self).__init__(*args, **kwargs)
114
		self.sex = None
115
		self.sender_name = None
116
		self.user_id = 0  # anonymous by default
117
118
	def default(self, content, event, handler):
119
		"""
120
		:return: {"action": event, "content": content, "time": "20:48:57"}
121
		"""
122
		return {
123
			VarNames.EVENT: event,
124
			VarNames.CONTENT: content,
125
			VarNames.USER_ID: self.user_id,
126
			VarNames.TIME: get_milliseconds(),
127
			HandlerNames.NAME: handler
128
		}
129
130
	def room_online(self, online, event, channel):
131
		"""
132
		:return: {"action": event, "content": content, "time": "20:48:57"}
133
		"""
134
		room_less = self.default(online, event, HandlerNames.CHAT)
135
		room_less[VarNames.CHANNEL_NAME] = channel
136
		room_less[VarNames.USER] = self.sender_name
137
		room_less[VarNames.GENDER] = self.sex
138
		return room_less
139
140
	def offer_call(self, content, message_type):
141
		"""
142
		:return: {"action": "call", "content": content, "time": "20:48:57"}
143
		"""
144
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
145
		message[VarNames.CALL_TYPE] = message_type
146
		message[VarNames.USER] = self.sender_name
147
		return message
148
149
	@classmethod
150
	def create_message(cls, message):
151
		res = {
152
			VarNames.USER_ID: message.sender_id,
153
			VarNames.CONTENT: message.content,
154
			VarNames.TIME: message.time,
155
			VarNames.MESSAGE_ID: message.id,
156
		}
157
		if message.img.name:
158
			res[VarNames.IMG] = message.img.url
159
		return res
160
161
	@classmethod
162
	def create_send_message(cls, message, event=Actions.PRINT_MESSAGE):
163
		"""
164
		:param message:
165
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
166
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
167
		"""
168
		res = cls.create_message(message)
169
		res[VarNames.EVENT] = event
170
		res[VarNames.CHANNEL] = message.room_id
171
		res[HandlerNames.NAME] = HandlerNames.CHAT
172
		return res
173
174
	@classmethod
175
	def get_messages(cls, messages, channel):
176
		"""
177
		:type messages: list[Messages]
178
		:type channel: str
179
		:type messages: QuerySet[Messages]
180
		"""
181
		return {
182
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
183
			VarNames.EVENT: Actions.GET_MESSAGES,
184
			VarNames.CHANNEL: channel,
185
			HandlerNames.NAME: HandlerNames.CHAT
186
		}
187
188
	@property
189
	def stored_redis_user(self):
190
		return  self.user_id
191
192
	@property
193
	def channel(self):
194
		return RedisPrefix.generate_user(self.user_id)
195
196
	def subscribe_direct_channel_message(self, room_id, other_user_id):
197
		return {
198
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
199
			VarNames.ROOM_ID: room_id,
200
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
201
			HandlerNames.NAME: HandlerNames.CHANNELS
202
		}
203
204
	def subscribe_room_channel_message(self, room_id, room_name):
205
		return {
206
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
207
			VarNames.ROOM_ID: room_id,
208
			VarNames.ROOM_USERS: [self.user_id],
209
			HandlerNames.NAME: HandlerNames.CHANNELS,
210
			VarNames.ROOM_NAME: room_name
211
		}
212
213
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
214
		return {
215
			VarNames.EVENT: Actions.INVITE_USER,
216
			VarNames.ROOM_ID: room_id,
217
			VarNames.USER_ID: user_id,
218
			HandlerNames.NAME: HandlerNames.CHANNELS,
219
			VarNames.ROOM_NAME: room_name,
220
			VarNames.CONTENT: users
221
		}
222
223
	def add_user_to_room(self, channel, user_id, content):
224
		return {
225
			VarNames.EVENT: Actions.ADD_USER,
226
			VarNames.CHANNEL: channel,
227
			VarNames.USER_ID: user_id,
228
			HandlerNames.NAME: HandlerNames.CHAT,
229
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
230
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
231
		}
232
233
	def unsubscribe_direct_message(self, room_id):
234
		return {
235
			VarNames.EVENT: Actions.DELETE_ROOM,
236
			VarNames.ROOM_ID: room_id,
237
			VarNames.USER_ID: self.user_id,
238
			HandlerNames.NAME: HandlerNames.CHANNELS,
239
			VarNames.TIME: get_milliseconds()
240
		}
241
242
	def load_offline_message(self, offline_messages, channel_key):
243
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
244
		res[VarNames.CHANNEL] = channel_key
245
		return res
246
247
248
class MessagesHandler(MessagesCreator):
249
250
	def __init__(self, *args, **kwargs):
251
		self.parsable_prefix = 'p'
252
		super(MessagesHandler, self).__init__(*args, **kwargs)
253
		self.id = id(self)
254
		self.ip = None
255
		from chat import global_redis
256
		self.async_redis_publisher = global_redis.async_redis_publisher
257
		self.sync_redis = global_redis.sync_redis
258
		self.channels = []
259
		self.call_receiver_channel = None
260
		self._logger = None
261
		self.async_redis = tornadoredis.Client()
262
		self.pre_process_message = {
263
			Actions.GET_MESSAGES: self.process_get_messages,
264
			Actions.SEND_MESSAGE: self.process_send_message,
265
			Actions.CALL: self.process_call,
266
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
267
			Actions.DELETE_ROOM: self.delete_channel,
268
			Actions.EDIT_MESSAGE: self.edit_message,
269
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
270
			Actions.INVITE_USER: self.invite_user,
271
		}
272
		self.post_process_message = {
273
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
274
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
275
			Actions.DELETE_ROOM: self.send_client_delete_channel,
276
			Actions.INVITE_USER: self.send_client_new_channel,
277
			Actions.CALL: self.set_opponent_call_channel
278
		}
279
280
	@tornado.gen.engine
281
	def listen(self, channels):
282
		yield tornado.gen.Task(
283
			self.async_redis.subscribe, channels)
284
		self.async_redis.listen(self.new_message)
285
286
	@property
287
	def logger(self):
288
		return self._logger if self._logger else base_logger
289
290
	@tornado.gen.engine
291
	def add_channel(self, channel):
292
		self.channels.append(channel)
293
		yield tornado.gen.Task(
294
			self.async_redis.subscribe, (channel,))
295
296
	def do_db(self, callback, *args, **kwargs):
297
		try:
298
			return callback(*args, **kwargs)
299
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
300
			self.logger.warning('%s, reconnecting' % e)  # TODO
301
			connection.close()
302
			return callback(*args, **kwargs)
303
304
	def execute_query(self, query, *args, **kwargs):
305
		cursor = connection.cursor()
306
		cursor.execute(query, *args, **kwargs)
307
		desc = cursor.description
308
		return [
309
			dict(zip([col[0] for col in desc], row))
310
			for row in cursor.fetchall()
311
			]
312
313
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
314
		"""
315
		:rtype : dict
316
		returns (dict, bool) if check_type is present
317
		"""
318
		online = self.sync_redis.hgetall(channel)
319
		self.logger.debug('!! channel %s redis online: %s', channel, online)
320
		result = set()
321
		user_is_online = False
322
		# redis stores REDIS_USER_FORMAT, so parse them
323
		if online:
324
			for key_hash, raw_user_id in online.items():  # py2 iteritems
325
				user_id = int(raw_user_id.decode('utf-8'))
326
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
327
					user_is_online = True
328
				result.add(user_id)
329
		result = list(result)
330
		return (result, user_is_online) if check_user_id else result
331
332
	def add_online_user(self, room_id, offline_messages=None):
333
		"""
334
		adds to redis
335
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
336
		:return:
337
		"""
338
		self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user)
339
		# since we add user to online first, latest trigger will always show correct online
340
		online, is_online = self.get_online_from_redis(room_id, self.user_id, self.id)
341
		if not is_online:  # if a new tab has been opened
342
			online.append(self.user_id)
343
			online_user_names_mes = self.room_online(
344
				online,
345
				Actions.LOGIN,
346
				room_id
347
			)
348
			self.logger.info('!! First tab, sending refresh online for all')
349
			self.publish(online_user_names_mes, room_id)
350
			if offline_messages:
351
				self.safe_write(self.load_offline_message(offline_messages, room_id))
352
		else:  # Send user names to self
353
			online_user_names_mes = self.room_online(
354
				online,
355
				Actions.REFRESH_USER,
356
				room_id
357
			)
358
			self.logger.info('!! Second tab, retrieving online for self')
359
			self.safe_write(online_user_names_mes)
360
361
	def publish(self, message, channel, parsable=False):
362
		jsoned_mess = json.dumps(message)
363
		self.logger.debug('<%s> %s', channel, jsoned_mess)
364
		if parsable:
365
			jsoned_mess = self.encode(jsoned_mess)
366
		self.async_redis_publisher.publish(channel, jsoned_mess)
367
368
	def encode(self, message):
369
		"""
370
		Marks message with prefix to specify that
371
		it should be decoded and proccesed before sending to client
372
		@param message: message to mark
373
		@return: marked message
374
		"""
375
		return self.parsable_prefix + message
376
377
	def decode(self, message):
378
		"""
379
		Check if message should be proccessed by server before writing to client
380
		@param message: message to check
381
		@type message: str
382
		@return: Object structure of message if it should be processed, None if not
383
		"""
384
		if message.startswith(self.parsable_prefix):
385
			return json.loads(message[1:])
386
387
	def new_message(self, message):
388
		data = message.body
389
		if isinstance(data, str_type):  # subscribe event
390
			decoded = self.decode(data)
391
			if decoded:
392
				data = decoded
393
			self.safe_write(data)
394
			if decoded:
395
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
396
397
	def safe_write(self, message):
398
		raise NotImplementedError('WebSocketHandler implements')
399
400
	def process_send_message(self, message):
401
		"""
402
		:type message: dict
403
		"""
404
		channel = message[VarNames.CHANNEL]
405
		message_db = Message(
406
			sender_id=self.user_id,
407
			content=message[VarNames.CONTENT]
408
		)
409
		message_db.room_id = channel
410
		if VarNames.IMG in message:
411
			message_db.img = extract_photo(message[VarNames.IMG])
412
		self.do_db(message_db.save)  # exit on hacked id with exception
413
		prepared_message = self.create_send_message(message_db)
414
		self.publish(prepared_message, channel)
415
416
	def process_call(self, in_message):
417
		"""
418
		:type in_message: dict
419
		"""
420
		call_type = in_message.get(VarNames.CALL_TYPE)
421
		set_opponent_channel = False
422
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
423
		if call_type == CallType.OFFER:
424
			room_id = in_message[VarNames.CHANNEL]
425
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
426
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
427
			set_opponent_channel = True
428
			out_message[VarNames.CHANNEL] = room_id
429
		# TODO
430
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
431
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
432
433
	def create_new_room(self, message):
434
		room_name = message[VarNames.ROOM_NAME]
435
		if not room_name or len(room_name) > 16:
436
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
437
		room = Room(name=room_name)
438
		self.do_db(room.save)
439
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
440
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
441
		self.publish(subscribe_message, self.channel, True)
442
443
	def invite_user(self, message):
444
		room_id = message[VarNames.ROOM_ID]
445
		user_id = message[VarNames.USER_ID]
446
		if room_id not in self.channels:
447
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
448
		room = self.do_db(Room.objects.get, id=room_id)
449
		if room.is_private:
450
			raise ValidationError("You can't add users to direct room, create a new room instead")
451
		try:
452
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
453
		except IntegrityError:
454
			raise ValidationError("User is already in channel")
455
		users_in_room = {}
456
		for user in room.users.all():
457
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
458
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
459
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
460
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
461
462
	def create_room(self, user_rooms, user_id):
463
		if self.user_id == user_id:
464
			room_ids = list([room['room_id'] for room in user_rooms])
465
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
466
		else:
467
			rooms_query = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms)
468
			query_res = rooms_query.values('room__id', 'room__disabled')
469
		if len(query_res) > 0:
470
			room = query_res[0]
471
			room_id = room['room__id']
472
			self.update_room(room_id, room['room__disabled'])
473
		else:
474
			room = Room()
475
			room.save()
476
			room_id = room.id
477
			if self.user_id == user_id:
478
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
479
			else:
480
				RoomUsers.objects.bulk_create([
481
					RoomUsers(user_id=user_id, room_id=room_id),
482
					RoomUsers(user_id=self.user_id, room_id=room_id),
483
				])
484
		return room_id
485
486
	def update_room(self, room_id, disabled):
487
		if not disabled:
488
			raise ValidationError('This room already exist')
489
		else:
490
			Room.objects.filter(id=room_id).update(disabled=False)
491
492
	def create_user_channel(self, message):
493
		user_id = message[VarNames.USER_ID]
494
		# get all self private rooms ids
495
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
496
		# get private room that contains another user from rooms above
497
		room_id = self.create_room(user_rooms, user_id)
498
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
499
		self.publish(subscribe_message, self.channel, True)
500
		other_channel = RedisPrefix.generate_user(user_id)
501
		if self.channel != other_channel:
502
			self.publish(subscribe_message, other_channel, True)
503
504
	def delete_channel(self, message):
505
		room_id = message[VarNames.ROOM_ID]
506
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
507
			raise ValidationError('You are not allowed to exit this room')
508
		room = self.do_db(Room.objects.get, id=room_id)
509
		if room.disabled:
510
			raise ValidationError('Room is already deleted')
511
		if room.name is None:  # if private then disable
512
			room.disabled = True
513
		else: # if public -> leave the room, delete the link
514
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
515
			online = self.get_online_from_redis(room_id)
516
			online.remove(self.user_id)
517
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
518
		room.save()
519
		message = self.unsubscribe_direct_message(room_id)
520
		self.publish(message, room_id, True)
521
522
	def edit_message(self, data):
523
		message_id = data[VarNames.MESSAGE_ID]
524
		message = Message.objects.get(id=message_id)
525
		if message.sender_id != self.user_id:
526
			raise ValidationError("You can only edit your messages")
527
		if message.time + 60000 < get_milliseconds():
528
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
529
		if message.deleted:
530
			raise ValidationError("Already deleted")
531
		message.content = data[VarNames.CONTENT]
532
		selector = Message.objects.filter(id=message_id)
533
		if message.content is None:
534
			selector.update(deleted=True)
535
			action = Actions.DELETE_MESSAGE
536
		else:
537
			action = Actions.EDIT_MESSAGE
538
			selector.update(content=message.content)
539
		self.publish(self.create_send_message(message, action), message.room_id)
540
541
	def send_client_new_channel(self, message):
542
		room_id = message[VarNames.ROOM_ID]
543
		self.add_channel(room_id)
544
		self.add_online_user(room_id)
545
546
	def set_opponent_call_channel(self, message):
547
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
548
549
	def send_client_delete_channel(self, message):
550
		room_id = message[VarNames.ROOM_ID]
551
		self.async_redis.unsubscribe((room_id,))
552
		self.async_redis_publisher.hdel(room_id, self.id)
553
		self.channels.remove(room_id)
554
555
	def process_get_messages(self, data):
556
		"""
557
		:type data: dict
558
		"""
559
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
560
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
561
		room_id = data[VarNames.CHANNEL]
562
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
563
		if header_id is None:
564
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
565
		else:
566
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
567
		response = self.do_db(self.get_messages, messages, room_id)
568
		self.safe_write(response)
569
570
	def get_offline_messages(self):
571
		res = {}
572
		offline_messages = Message.objects.filter(
573
			id__gt=F('room__roomusers__last_read_message_id'),
574
			deleted=False,
575
			room__roomusers__user_id=self.user_id
576
		)
577
		for message in offline_messages:
578
			res.setdefault(message.room_id, []).append(self.create_message(message))
579
		return res
580
581
	def get_users_in_current_user_rooms(self):
582
		"""
583
		{
584
			"ROOM_ID:1": {
585
				"name": "All",
586
				"users": {
587
					"USER_ID:admin": {
588
						"name": "USER_NAME:admin",
589
						"sex": "SEX:Secret"
590
					},
591
					"USER_ID_2": {
592
						"name": "USER_NAME:Mike",
593
						"sex": "Male"
594
					}
595
				},
596
				"isPrivate": true
597
			}
598
		}
599
		"""
600
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
601
		res = {room['id']: {
602
				VarNames.ROOM_NAME: room['name'],
603
				VarNames.ROOM_USERS: {}
604
			} for room in user_rooms}
605
		room_ids = (room_id for room_id in res)
606
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
607
		for user in rooms_users:
608
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
609
		return res
610
611
	def set_js_user_structure(self, user_dict, user_id, name, sex):
612
		user_dict[user_id] = {
613
			VarNames.USER: name,
614
			VarNames.GENDER: GENDERS[sex]
615
		}
616
617
	def save_ip(self):
618
		if (self.do_db(UserJoinedInfo.objects.filter(
619
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
620
			return
621
		ip_address = self.get_or_create_ip()
622
		UserJoinedInfo.objects.create(
623
			ip=ip_address,
624
			user_id=self.user_id
625
		)
626
627
	def get_or_create_ip(self):
628
		try:
629
			ip_address = IpAddress.objects.get(ip=self.ip)
630
		except IpAddress.DoesNotExist:
631
			try:
632
				if not api_url:
633
					raise Exception('api url is absent')
634
				self.logger.debug("Creating ip record %s", self.ip)
635
				f = urlopen(api_url % self.ip)
636
				raw_response = f.read().decode("utf-8")
637
				response = json.loads(raw_response)
638
				if response['status'] != "success":
639
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
640
				ip_address = IpAddress.objects.create(
641
					ip=self.ip,
642
					isp=response['isp'],
643
					country=response['country'],
644
					region=response['regionName'],
645
					city=response['city'],
646
					country_code=response['countryCode']
647
				)
648
			except Exception as e:
649
				self.logger.error("Error while creating ip with country info, because %s", e)
650
				ip_address = IpAddress.objects.create(ip=self.ip)
651
		return ip_address
652
653
	def publish_logout(self, channel, log_data):
654
		# seems like async solves problem with connection lost and wrong data status
655
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
656
		online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
657
		log_data[channel] = {'online': online, 'is_online': is_online}
658
		if not is_online:
659
			message = self.room_online(online, Actions.LOGOUT, channel)
660
			self.publish(message, channel)
661
			return True
662
663
664
class AntiSpam(object):
665
666
	def __init__(self):
667
		self.spammed = 0
668
		self.info = {}
669
670
	def check_spam(self, json_message):
671
		message_length = len(json_message)
672
		info_key = int(round(time.time() * 100))
673
		self.info[info_key] = message_length
674
		if message_length > MAX_MESSAGE_SIZE:
675
			self.spammed += 1
676
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
677
		self.check_timed_spam()
678
679
	def check_timed_spam(self):
680
		# TODO implement me
681
		pass
682
		# raise ValidationError("You're chatting too much, calm down a bit!")
683
684
685
class TornadoHandler(WebSocketHandler, MessagesHandler):
686
687
	def __init__(self, *args, **kwargs):
688
		super(TornadoHandler, self).__init__(*args, **kwargs)
689
		self.connected = False
690
		self.anti_spam = AntiSpam()
691
692
	def data_received(self, chunk):
693
		pass
694
695
	def on_message(self, json_message):
696
		try:
697
			if not self.connected:
698
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
699
			if not json_message:
700
				raise ValidationError('Skipping null message')
701
			# self.anti_spam.check_spam(json_message)
702
			self.logger.debug('<< %s', json_message)
703
			message = json.loads(json_message)
704
			if message[VarNames.EVENT] not in self.pre_process_message:
705
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
706
			channel = message.get(VarNames.CHANNEL)
707
			if channel and channel not in self.channels:
708
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
709
			self.pre_process_message[message[VarNames.EVENT]](message)
710
		except ValidationError as e:
711
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
712
			self.safe_write(error_message)
713
714
	def on_close(self):
715
		if self.async_redis.subscribed:
716
			self.logger.info("Close event, unsubscribing from %s", self.channels)
717
			self.async_redis.unsubscribe(self.channels)
718
		else:
719
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
720
		log_data = {}
721
		gone_offline = False
722
		for channel in self.channels:
723
			if not isinstance(channel, int):
724
				continue
725
			self.sync_redis.hdel(channel, self.id)
726
			if self.connected:
727
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
728
		if gone_offline:
729
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
730
			self.logger.info("Updated %s last read message", res)
731
732
		self.logger.info("Close connection result: %s", json.dumps(log_data))
733
		self.async_redis.disconnect()
734
735
	def open(self):
736
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
737
		if sessionStore.exists(session_key):
738
			self.ip = self.get_client_ip()
739
			session = SessionStore(session_key)
740
			self.user_id = int(session["_auth_user_id"])
741
			log_params = {
742
				'user_id': str(self.user_id).zfill(3),
743
				'id': self.id,
744
				'ip': self.ip
745
			}
746
			self._logger = logging.LoggerAdapter(base_logger, log_params)
747
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
748
			self.async_redis.connect()
749
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
750
			self.sender_name = user_db.username
751
			self.sex = user_db.sex_str
752
			user_rooms = self.get_users_in_current_user_rooms()
753
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
754
			# get all missed messages
755
			self.channels.clear()
756
			self.channels.append(self.channel)
757
			for room_id in user_rooms:
758
				self.channels.append(room_id)
759
			self.listen(self.channels)
760
			off_messages = self.get_offline_messages()
761
			for room_id in user_rooms:
762
				self.add_online_user(room_id, off_messages.get(room_id))
763
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
764
			self.connected = True
765
			Thread(target=self.save_ip).start()
766
		else:
767
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
768
			self.close(403, "Session key %s has been rejected" % session_key)
769
770
	def check_origin(self, origin):
771
		"""
772
		check whether browser set domain matches origin
773
		"""
774
		parsed_origin = urlparse(origin)
775
		origin = parsed_origin.netloc
776
		origin_domain = origin.split(':')[0].lower()
777
		browser_set = self.request.headers.get("Host")
778
		browser_domain = browser_set.split(':')[0]
779
		return browser_domain == origin_domain
780
781
	def safe_write(self, message):
782
		"""
783
		Tries to send message, doesn't throw exception outside
784
		:type self: MessagesHandler
785
		"""
786
		# self.logger.debug('<< THREAD %s >>', os.getppid())
787
		try:
788
			if isinstance(message, dict):
789
				message = json.dumps(message)
790
			if not isinstance(message, str_type):
791
				raise ValueError('Wrong message type : %s' % str(message))
792
			self.logger.debug(">> %s", message)
793
			self.write_message(message)
794
		except tornado.websocket.WebSocketClosedError as e:
795
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
796
797
	def get_client_ip(self):
798
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
799