Completed
Push — master ( 592fe1...450c92 )
by Andrew
01:25
created

TornadoHandler   A

Complexity

Total Complexity 28

Size/Duplication

Total Lines 121
Duplicated Lines 0 %

Importance

Changes 6
Bugs 1 Features 0
Metric Value
c 6
b 1
f 0
dl 0
loc 121
rs 10
wmc 28

8 Methods

Rating   Name   Duplication   Size   Complexity  
A data_received() 0 2 1
A __init__() 0 4 1
B on_message() 0 18 7
C on_close() 0 27 7
A check_origin() 0 10 1
A get_client_ip() 0 2 1
B safe_write() 0 15 6
B open() 0 34 4
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo, RoomUsers
30
31
PY3 = sys.version > '3'
32
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions:
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	GROWL_MESSAGE = 'growl'
54
	GET_MESSAGES = 'loadMessages'
55
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
56
	DELETE_ROOM = 'deleteRoom'
57
	CREATE_ROOM_CHANNEL = 'addRoom'
58
	INVITE_USER = 'inviteUser'
59
	ADD_USER = 'addUserToAll'
60
	OFFLINE_MESSAGES = 'loadOfflineMessages'
61
62
63
class VarNames:
64
	CALL_TYPE = 'type'
65
	USER = 'user'
66
	USER_ID = 'userId'
67
	TIME = 'time'
68
	CONTENT = 'content'
69
	IMG = 'image'
70
	EVENT = 'action'
71
	MESSAGE_ID = 'id'
72
	GENDER = 'sex'
73
	ROOM_NAME = 'name'
74
	ROOM_ID = 'roomId'
75
	ROOM_USERS = 'users'
76
	CHANNEL = 'channel'
77
	GET_MESSAGES_COUNT = 'count'
78
	GET_MESSAGES_HEADER_ID = 'headerId'
79
	CHANNEL_NAME = 'channel'
80
	IS_ROOM_PRIVATE = 'private'
81
	#ROOM_NAME = 'roomName'
82
	# ROOM_ID = 'roomId'
83
84
85
class CallType:
86
	OFFER = 'offer'
87
88
class HandlerNames:
89
	NAME = 'handler'
90
	CHANNELS = 'channels'
91
	CHAT = 'chat'
92
	GROWL = 'growl'
93
	WEBRTC = 'webrtc'
94
	FILE = 'file'
95
96
97
class RedisPrefix:
98
	USER_ID_CHANNEL_PREFIX = 'u'
99
	__ROOM_ONLINE__ = 'o:{}'
100
101
	@classmethod
102
	def generate_user(cls, key):
103
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
104
105
RedisPrefix.DEFAULT_CHANNEL = ALL_ROOM_ID
106
107
108
class MessagesCreator(object):
109
110
	def __init__(self, *args, **kwargs):
111
		super(MessagesCreator, self).__init__(*args, **kwargs)
112
		self.sex = None
113
		self.sender_name = None
114
		self.user_id = 0  # anonymous by default
115
116
	def default(self, content, event, handler):
117
		"""
118
		:return: {"action": event, "content": content, "time": "20:48:57"}
119
		"""
120
		return {
121
			VarNames.EVENT: event,
122
			VarNames.CONTENT: content,
123
			VarNames.USER_ID: self.user_id,
124
			VarNames.TIME: get_milliseconds(),
125
			HandlerNames.NAME: handler
126
		}
127
128
	def room_online(self, online, event, channel):
129
		"""
130
		:return: {"action": event, "content": content, "time": "20:48:57"}
131
		"""
132
		room_less = self.default(online, event, HandlerNames.CHAT)
133
		room_less[VarNames.CHANNEL_NAME] = channel
134
		room_less[VarNames.USER] = self.sender_name
135
		room_less[VarNames.GENDER] = self.sex
136
		return room_less
137
138
	def offer_call(self, content, message_type):
139
		"""
140
		:return: {"action": "call", "content": content, "time": "20:48:57"}
141
		"""
142
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
143
		message[VarNames.CALL_TYPE] = message_type
144
		message[VarNames.USER] = self.sender_name
145
		return message
146
147
	@classmethod
148
	def create_message(cls, message):
149
		res = {
150
			VarNames.USER_ID: message.sender_id,
151
			VarNames.CONTENT: message.content,
152
			VarNames.TIME: message.time,
153
			VarNames.MESSAGE_ID: message.id,
154
		}
155
		if message.img.name:
156
			res[VarNames.IMG] = message.img.url
157
		return res
158
159
	@classmethod
160
	def create_send_message(cls, message):
161
		"""
162
		:param message:
163
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
164
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
165
		"""
166
		res = cls.create_message(message)
167
		res[VarNames.EVENT] = Actions.PRINT_MESSAGE
168
		res[VarNames.CHANNEL] = message.room_id
169
		res[HandlerNames.NAME] = HandlerNames.CHAT
170
		return res
171
172
	@classmethod
173
	def get_messages(cls, messages, channel):
174
		"""
175
		:type messages: list[Messages]
176
		:type channel: str
177
		:type messages: QuerySet[Messages]
178
		"""
179
		return {
180
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
181
			VarNames.EVENT: Actions.GET_MESSAGES,
182
			VarNames.CHANNEL: channel,
183
			HandlerNames.NAME: HandlerNames.CHAT
184
		}
185
186
	@property
187
	def stored_redis_user(self):
188
		return  self.user_id
189
190
	@property
191
	def channel(self):
192
		return RedisPrefix.generate_user(self.user_id)
193
194
	def subscribe_direct_channel_message(self, room_id, other_user_id):
195
		return {
196
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
197
			VarNames.ROOM_ID: room_id,
198
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
199
			HandlerNames.NAME: HandlerNames.CHANNELS
200
		}
201
202
	def subscribe_room_channel_message(self, room_id, room_name):
203
		return {
204
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
205
			VarNames.ROOM_ID: room_id,
206
			VarNames.ROOM_USERS: [self.user_id],
207
			HandlerNames.NAME: HandlerNames.CHANNELS,
208
			VarNames.ROOM_NAME: room_name
209
		}
210
211
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
212
		return {
213
			VarNames.EVENT: Actions.INVITE_USER,
214
			VarNames.ROOM_ID: room_id,
215
			VarNames.USER_ID: user_id,
216
			HandlerNames.NAME: HandlerNames.CHANNELS,
217
			VarNames.ROOM_NAME: room_name,
218
			VarNames.CONTENT: users
219
		}
220
221
	def add_user_to_room(self, channel, user_id, content):
222
		return {
223
			VarNames.EVENT: Actions.ADD_USER,
224
			VarNames.CHANNEL: channel,
225
			VarNames.USER_ID: user_id,
226
			HandlerNames.NAME: HandlerNames.CHAT,
227
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
228
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
229
		}
230
231
	def unsubscribe_direct_message(self, room_id):
232
		return {
233
			VarNames.EVENT: Actions.DELETE_ROOM,
234
			VarNames.ROOM_ID: room_id,
235
			VarNames.USER_ID: self.user_id,
236
			HandlerNames.NAME: HandlerNames.CHANNELS,
237
			VarNames.TIME: get_milliseconds()
238
		}
239
240
	def load_offline_message(self, offline_messages, channel_key):
241
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
242
		res[VarNames.CHANNEL] = channel_key
243
		return res
244
245
246
class MessagesHandler(MessagesCreator):
247
248
	def __init__(self, *args, **kwargs):
249
		self.parsable_prefix = 'p'
250
		super(MessagesHandler, self).__init__(*args, **kwargs)
251
		self.id = id(self)
252
		self.log_id = str(self.id % 10000).rjust(4, '0')
253
		self.ip = None
254
		from chat import global_redis
255
		self.async_redis_publisher = global_redis.async_redis_publisher
256
		self.sync_redis = global_redis.sync_redis
257
		self.channels = []
258
		self.call_receiver_channel = None
259
		self.logger = None
260
		self.async_redis = tornadoredis.Client()
261
		self.pre_process_message = {
262
			Actions.GET_MESSAGES: self.process_get_messages,
263
			Actions.SEND_MESSAGE: self.process_send_message,
264
			Actions.CALL: self.process_call,
265
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
266
			Actions.DELETE_ROOM: self.delete_channel,
267
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
268
			Actions.INVITE_USER: self.invite_user,
269
		}
270
		self.post_process_message = {
271
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
272
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
273
			Actions.DELETE_ROOM: self.send_client_delete_channel,
274
			Actions.INVITE_USER: self.send_client_new_channel,
275
			Actions.CALL: self.set_opponent_call_channel
276
		}
277
278
	@tornado.gen.engine
279
	def listen(self, channels):
280
		yield tornado.gen.Task(
281
			self.async_redis.subscribe, channels)
282
		self.async_redis.listen(self.new_message)
283
284
	@tornado.gen.engine
285
	def add_channel(self, channel):
286
		self.channels.append(channel)
287
		yield tornado.gen.Task(
288
			self.async_redis.subscribe, (channel,))
289
290
	def do_db(self, callback, *args, **kwargs):
291
		try:
292
			return callback(*args, **kwargs)
293
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
294
			self.logger.warning('%s, reconnecting' % e)  # TODO
295
			connection.close()
296
			return callback(*args, **kwargs)
297
298
	def execute_query(self, query, *args, **kwargs):
299
		cursor = connection.cursor()
300
		cursor.execute(query, *args, **kwargs)
301
		return cursor.fetchall()
302
303
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
304
		"""
305
		:rtype : dict
306
		returns (dict, bool) if check_type is present
307
		"""
308
		online = self.sync_redis.hgetall(channel)
309
		self.logger.debug('!! channel %s redis online: %s', channel, online)
310
		result = set()
311
		user_is_online = False
312
		# redis stores REDIS_USER_FORMAT, so parse them
313
		if online:
314
			for key_hash, raw_user_id in online.items():  # py2 iteritems
315
				user_id = int(raw_user_id.decode('utf-8'))
316
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
317
					user_is_online = True
318
				result.add(user_id)
319
		result = list(result)
320
		return (result, user_is_online) if check_user_id else result
321
322
	def add_online_user(self, room_id, offline_messages=None):
323
		"""
324
		adds to redis
325
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
326
		:return:
327
		"""
328
		online = self.get_online_from_redis(room_id)
329
		self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user)
330
		if self.user_id not in online:  # if a new tab has been opened
331
			online.append(self.user_id)
332
			online_user_names_mes = self.room_online(
333
				online,
334
				Actions.LOGIN,
335
				room_id
336
			)
337
			self.logger.info('!! First tab, sending refresh online for all')
338
			self.publish(online_user_names_mes, room_id)
339
			if offline_messages:
340
				self.safe_write(self.load_offline_message(offline_messages, room_id))
341
		else:  # Send user names to self
342
			online_user_names_mes = self.room_online(
343
				online,
344
				Actions.REFRESH_USER,
345
				room_id
346
			)
347
			self.logger.info('!! Second tab, retrieving online for self')
348
			self.safe_write(online_user_names_mes)
349
350
	def publish(self, message, channel=None, parsable=False):
351
		if channel is None:
352
			raise ValidationError('lolol')
353
		jsoned_mess = json.dumps(message)
354
		self.logger.debug('<%s> %s', channel, jsoned_mess)
355
		if parsable:
356
			jsoned_mess = self.encode(jsoned_mess)
357
		self.async_redis_publisher.publish(channel, jsoned_mess)
358
359
	def encode(self, message):
360
		"""
361
		Marks message with prefix to specify that
362
		it should be decoded and proccesed before sending to client
363
		@param message: message to mark
364
		@return: marked message
365
		"""
366
		return self.parsable_prefix + message
367
368
	def decode(self, message):
369
		"""
370
		Check if message should be proccessed by server before writing to client
371
		@param message: message to check
372
		@return: Object structure of message if it should be processed, None if not
373
		"""
374
		if message.startswith(self.parsable_prefix):
375
			return json.loads(message[1:])
376
377
	def new_message(self, message):
378
		data = message.body
379
		if type(data) is not int:  # subscribe event
380
			decoded = self.decode(data)
381
			if decoded:
382
				data = decoded
383
			self.safe_write(data)
384
			if decoded:
385
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
386
387
	def safe_write(self, message):
388
		raise NotImplementedError('WebSocketHandler implements')
389
390
	def process_send_message(self, message):
391
		"""
392
		:type message: dict
393
		"""
394
		channel = message[VarNames.CHANNEL]
395
		message_db = Message(
396
			sender_id=self.user_id,
397
			content=message[VarNames.CONTENT]
398
		)
399
		message_db.room_id = channel
400
		if VarNames.IMG in message:
401
			message_db.img = extract_photo(message[VarNames.IMG])
402
		self.do_db(message_db.save)  # exit on hacked id with exception
403
		prepared_message = self.create_send_message(message_db)
404
		self.publish(prepared_message, channel)
405
406
	def process_call(self, in_message):
407
		"""
408
		:type in_message: dict
409
		"""
410
		call_type = in_message.get(VarNames.CALL_TYPE)
411
		set_opponent_channel = False
412
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
413
		if call_type == CallType.OFFER:
414
			room_id = in_message[VarNames.CHANNEL]
415
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
416
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
417
			set_opponent_channel = True
418
			out_message[VarNames.CHANNEL] = room_id
419
		# TODO
420
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
421
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
422
423
	def create_new_room(self, message):
424
		room_name = message[VarNames.ROOM_NAME]
425
		if not room_name or len(room_name) > 16:
426
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
427
		room = Room(name=room_name)
428
		self.do_db(room.save)
429
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
430
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
431
		self.publish(subscribe_message, self.channel, True)
432
433
	def invite_user(self, message):
434
		room_id = message[VarNames.ROOM_ID]
435
		user_id = message[VarNames.USER_ID]
436
		if room_id not in self.channels:
437
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
438
		room = self.do_db(Room.objects.get, id=room_id)
439
		if room.is_private:
440
			raise ValidationError("You can't add users to direct room, create a new room instead")
441
		try:
442
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
443
		except IntegrityError:
444
			raise ValidationError("User is already in channel")
445
		users_in_room = {}
446
		for user in room.users.all():
447
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
448
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
449
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
450
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
451
452
	def create_user_channel(self, message):
453
		user_id = message[VarNames.USER_ID]
454
		# get all self private rooms ids
455
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
456
		# get private room that contains another user from rooms above
457
		query_res = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms).values('room__id', 'room__disabled')
458
		if len(query_res) > 0:
459
			room = query_res[0]
460
			room_id = room['room__id']
461
			disabled = room['room__disabled']
462
			if not disabled:
463
				raise ValidationError('This room already exist')
464
			else:
465
				Room.objects.filter(id=room_id).update(disabled=False)
466
		else:
467
			room = Room()
468
			room.save()
469
			room_id = room.id
470
			RoomUsers.objects.bulk_create([
471
				RoomUsers(user_id=user_id, room_id=room_id),
472
				RoomUsers(user_id=self.user_id, room_id=room_id),
473
			])
474
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
475
		self.publish(subscribe_message, self.channel, True)
476
		other_channel = RedisPrefix.generate_user(user_id)
477
		if self.channel != other_channel:
478
			self.publish(subscribe_message, other_channel, True)
479
480
	def delete_channel(self, message):
481
		room_id = message[VarNames.ROOM_ID]
482
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
483
			raise ValidationError('You are not allowed to exit this room')
484
		room = self.do_db(Room.objects.get, id=room_id)
485
		if room.disabled:
486
			raise ValidationError('Room is already deleted')
487
		if room.name is None:  # if private then disable
488
			room.disabled = True
489
		else: # if public -> leave the room, delete the link
490
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
491
			online = self.get_online_from_redis(room_id)
492
			online.remove(self.user_id)
493
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
494
		room.save()
495
		message = self.unsubscribe_direct_message(room_id)
496
		self.publish(message, room_id, True)
497
498
	def send_client_new_channel(self, message):
499
		room_id = message[VarNames.ROOM_ID]
500
		self.add_channel(room_id)
501
		self.add_online_user(room_id)# TODO doesnt work if already subscribed
502
503
	def set_opponent_call_channel(self, message):
504
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
505
506
	def send_client_delete_channel(self, message):
507
		room_id = message[VarNames.ROOM_ID]
508
		self.async_redis.unsubscribe((room_id,))
509
		self.async_redis_publisher.hdel(room_id, self.id)
510
		self.channels.remove(room_id)
511
512
	def process_get_messages(self, data):
513
		"""
514
		:type data: dict
515
		"""
516
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
517
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
518
		room_id = data[VarNames.CHANNEL]
519
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
520
		if header_id is None:
521
			messages = Message.objects.filter(Q(room_id=room_id)).order_by('-pk')[:count]
522
		else:
523
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id)).order_by('-pk')[:count]
524
		response = self.do_db(self.get_messages, messages, room_id)
525
		self.safe_write(response)
526
527
	def get_offline_messages(self):
528
		res = {}
529
		offline_messages = Message.objects.filter(
530
			id__gt=F('room__roomusers__last_read_message_id'),
531
			room__roomusers__user_id=self.user_id
532
		)
533
		for message in offline_messages:
534
			res.setdefault(message.room_id, []).append(self.create_message(message))
535
		return res
536
537
	def get_users_in_current_user_rooms(self):
538
		"""
539
		{
540
			"ROOM_ID:1": {
541
				"name": "All",
542
				"users": {
543
					"USER_ID:admin": {
544
						"name": "USER_NAME:admin",
545
						"sex": "SEX:Secret"
546
					},
547
					"USER_ID_2": {
548
						"name": "USER_NAME:Mike",
549
						"sex": "Male"
550
					}
551
				},
552
				"isPrivate": true
553
			}
554
		}
555
		"""
556
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
557
		res = {room['id']: {
558
				VarNames.ROOM_NAME: room['name'],
559
				VarNames.ROOM_USERS: {}
560
			} for room in user_rooms}
561
		room_ids = (room_id for room_id in res)
562
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
563
		for user in rooms_users:
564
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
565
		return res
566
567
	def set_js_user_structure(self, user_dict, user_id, name, sex):
568
		user_dict[user_id] = {
569
			VarNames.USER: name,
570
			VarNames.GENDER: GENDERS[sex]
571
		}
572
573
	def save_ip(self):
574
		if (self.do_db(UserJoinedInfo.objects.filter(
575
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
576
			return
577
		ip_address = self.get_or_create_ip()
578
		UserJoinedInfo.objects.create(
579
			ip=ip_address,
580
			user_id=self.user_id
581
		)
582
583
	def get_or_create_ip(self):
584
		try:
585
			ip_address = IpAddress.objects.get(ip=self.ip)
586
		except IpAddress.DoesNotExist:
587
			try:
588
				if not api_url:
589
					raise Exception('api url is absent')
590
				self.logger.debug("Creating ip record %s", self.ip)
591
				f = urlopen(api_url % self.ip)
592
				raw_response = f.read().decode("utf-8")
593
				response = json.loads(raw_response)
594
				if response['status'] != "success":
595
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
596
				ip_address = IpAddress.objects.create(
597
					ip=self.ip,
598
					isp=response['isp'],
599
					country=response['country'],
600
					region=response['regionName'],
601
					city=response['city'],
602
					country_code=response['countryCode']
603
				)
604
			except Exception as e:
605
				self.logger.error("Error while creating ip with country info, because %s", e)
606
				ip_address = IpAddress.objects.create(ip=self.ip)
607
		return ip_address
608
609
610
class AntiSpam(object):
611
612
	def __init__(self):
613
		self.spammed = 0
614
		self.info = {}
615
616
	def check_spam(self, json_message):
617
		message_length = len(json_message)
618
		info_key = int(round(time.time() * 100))
619
		self.info[info_key] = message_length
620
		if message_length > MAX_MESSAGE_SIZE:
621
			self.spammed += 1
622
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
623
		self.check_timed_spam()
624
625
	def check_timed_spam(self):
626
		# TODO implement me
627
		pass
628
		# raise ValidationError("You're chatting too much, calm down a bit!")
629
630
631
class TornadoHandler(WebSocketHandler, MessagesHandler):
632
633
	def __init__(self, *args, **kwargs):
634
		super(TornadoHandler, self).__init__(*args, **kwargs)
635
		self.connected = False
636
		self.anti_spam = AntiSpam()
637
638
	def data_received(self, chunk):
639
		pass
640
641
	def on_message(self, json_message):
642
		try:
643
			if not self.connected:
644
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
645
			if not json_message:
646
				raise ValidationError('Skipping null message')
647
			# self.anti_spam.check_spam(json_message)
648
			self.logger.debug('<< %s', json_message)
649
			message = json.loads(json_message)
650
			if message[VarNames.EVENT] not in self.pre_process_message:
651
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
652
			channel = message.get(VarNames.CHANNEL)
653
			if channel and channel not in self.channels:
654
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
655
			self.pre_process_message[message[VarNames.EVENT]](message)
656
		except ValidationError as e:
657
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
658
			self.safe_write(error_message)
659
660
	def on_close(self):
661
		if self.async_redis.subscribed:
662
			self.logger.info("Close event, unsubscribing from %s", self.channels)
663
			self.async_redis.unsubscribe(self.channels)
664
		else:
665
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
666
		log_data = {}
667
		update_last_read_message = True
668
		for channel in self.channels:
669
			if not isinstance(channel, int):
670
				continue
671
			self.sync_redis.hdel(channel, self.id)
672
			if self.connected:
673
				# seems like async solves problem with connection lost and wrong data status
674
				# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
675
				online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
676
				log_data[channel] = {'online': online, 'is_online': is_online}
677
				if not is_online:
678
					message = self.room_online(online, Actions.LOGOUT, channel)
679
					self.publish(message, channel)
680
					if update_last_read_message:
681
						update_last_read_message = False
682
						res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
683
						logger.info("Updated %s last read message", res)
684
685
		self.logger.info("Close connection result: %s", json.dumps(log_data))
686
		self.async_redis.disconnect()
687
688
	def open(self):
689
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
690
		if sessionStore.exists(session_key):
691
			self.ip = self.get_client_ip()
692
			session = SessionStore(session_key)
693
			self.user_id = int(session["_auth_user_id"])
694
			log_params = {
695
				'user_id': str(self.user_id).zfill(3),
696
				'id': self.log_id,
697
				'ip': self.ip
698
			}
699
			self.logger = logging.LoggerAdapter(logger, log_params)
700
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
701
			self.async_redis.connect()
702
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
703
			self.sender_name = user_db.username
704
			self.sex = user_db.sex_str
705
			user_rooms = self.get_users_in_current_user_rooms()
706
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
707
			# get all missed messages
708
			self.channels.clear()
709
			self.channels.append(self.channel)
710
			for room_id in user_rooms:
711
				self.channels.append(room_id)
712
			self.listen(self.channels)
713
			off_messages = self.get_offline_messages()
714
			for room_id in user_rooms:
715
				self.add_online_user(room_id, off_messages.get(room_id))
716
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
717
			self.connected = True
718
			Thread(target=self.save_ip).start()
719
		else:
720
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
721
			self.close(403, "Session key %s has been rejected" % session_key)
722
723
	def check_origin(self, origin):
724
		"""
725
		check whether browser set domain matches origin
726
		"""
727
		parsed_origin = urlparse(origin)
728
		origin = parsed_origin.netloc
729
		origin_domain = origin.split(':')[0].lower()
730
		browser_set = self.request.headers.get("Host")
731
		browser_domain = browser_set.split(':')[0]
732
		return browser_domain == origin_domain
733
734
	def safe_write(self, message):
735
		"""
736
		Tries to send message, doesn't throw exception outside
737
		:type self: MessagesHandler
738
		"""
739
		# self.logger.debug('<< THREAD %s >>', os.getppid())
740
		try:
741
			if isinstance(message, dict):
742
				message = json.dumps(message)
743
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
744
				raise ValueError('Wrong message type : %s' % str(message))
745
			self.logger.debug(">> %s", message)
746
			self.write_message(message)
747
		except tornado.websocket.WebSocketClosedError as e:
748
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
749
750
	def get_client_ip(self):
751
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
752