Completed
Push — master ( 76383b...7d7722 )
by Andrew
01:17
created

RedisPrefix   A

Complexity

Total Complexity 3

Size/Duplication

Total Lines 16
Duplicated Lines 0 %

Importance

Changes 1
Bugs 1 Features 0
Metric Value
c 1
b 1
f 0
dl 0
loc 16
rs 10
wmc 3

3 Methods

Rating   Name   Duplication   Size   Complexity  
A generate_user() 0 3 1
A generate_room() 0 3 1
A extract_id() 0 3 1
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, USER_ROOMS_QUERY, GENDERS, GET_DIRECT_ROOM_ID
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo
30
31
PY3 = sys.version > '3'
32
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions:
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	SYSTEM_MESSAGE = 'system'
54
	GROWL_MESSAGE = 'growl'
55
	GET_MESSAGES = 'messages'
56
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
57
	DELETE_ROOM = 'deleteRoom'
58
	CREATE_ROOM_CHANNEL = 'addRoom'
59
	INVITE_USER = 'inviteUser'
60
	ADD_USER = 'addUserToAll'
61
62
63
class VarNames:
64
	RECEIVER_ID = 'receiverId'
65
	RECEIVER_NAME = 'receiverName'
66
	CALL_TYPE = 'type'
67
	USER = 'user'
68
	USER_ID =  'userId'
69
	TIME = 'time'
70
	CONTENT = 'content'
71
	IMG = 'image'
72
	EVENT = 'action'
73
	MESSAGE_ID = 'id'
74
	GENDER = 'sex'
75
	ROOM_NAME = 'name'
76
	ROOM_ID = 'roomId'
77
	ROOM_USERS = 'users'
78
	CHANNEL = 'channel'
79
	CHANNEL_NAME = 'channel'
80
	IS_ROOM_PRIVATE = 'private'
81
	#ROOM_NAME = 'roomName'
82
	# ROOM_ID = 'roomId'
83
84
class CallType:
85
	OFFER = 'offer'
86
87
class HandlerNames:
88
	NAME = 'handler'
89
	CHANNELS = 'channels'
90
	CHAT = 'chat'
91
	GROWL = 'growl'
92
	WEBRTC = 'webrtc'
93
94
95
class RedisPrefix:
96
	USER_ID_CHANNEL_PREFIX = 'u'
97
	ROOM_CHANNEL_PREFIX = 'r'
98
	__ROOM_ONLINE__ = 'o:{}'
99
100
	@classmethod
101
	def generate_user(cls, key):
102
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
103
104
	@classmethod
105
	def generate_room(cls, key):
106
		return cls.ROOM_CHANNEL_PREFIX + str(key)
107
108
	@classmethod
109
	def extract_id(cls, channel):
110
		return int(channel[1:])
111
112
RedisPrefix.DEFAULT_CHANNEL = RedisPrefix.generate_room(ALL_ROOM_ID)
113
114
115
class MessagesCreator(object):
116
117
	def __init__(self, *args, **kwargs):
118
		super(MessagesCreator, self).__init__(*args, **kwargs)
119
		self.sex = None
120
		self.sender_name = None
121
		self.user_id = 0  # anonymous by default
122
123
	def default(self, content, event, handler):
124
		"""
125
		:return: {"action": event, "content": content, "time": "20:48:57"}
126
		"""
127
		return {
128
			VarNames.EVENT: event,
129
			VarNames.CONTENT: content,
130
			VarNames.USER_ID: self.user_id,
131
			VarNames.TIME: get_milliseconds(),
132
			HandlerNames.NAME: handler
133
		}
134
135
	def room_online(self, online, event, channel):
136
		"""
137
		:return: {"action": event, "content": content, "time": "20:48:57"}
138
		"""
139
		room_less = self.default(online, event, HandlerNames.CHAT)
140
		room_less[VarNames.CHANNEL_NAME] = channel
141
		room_less[VarNames.USER] = self.sender_name
142
		room_less[VarNames.GENDER] = self.sex
143
		return room_less
144
145
	def offer_call(self, content, message_type):
146
		"""
147
		:return: {"action": "call", "content": content, "time": "20:48:57"}
148
		"""
149
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
150
		message[VarNames.CALL_TYPE] = message_type
151
		message[VarNames.USER] = self.sender_name
152
		return message
153
154
	@classmethod
155
	def create_send_message(cls, message):
156
		"""
157
		:param message:
158
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
159
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
160
		"""
161
		if message.receiver_id:
162
			channel = RedisPrefix.generate_user(message.receiver_id)
163
		elif message.room_id:
164
			channel = RedisPrefix.generate_room(message.room_id)
165
		else:
166
			raise ValidationError('Channel is none')
167
		result = {
168
			VarNames.USER_ID: message.sender_id,
169
			VarNames.CONTENT: message.content,
170
			VarNames.TIME: message.time,
171
			VarNames.MESSAGE_ID: message.id,
172
			VarNames.EVENT: Actions.PRINT_MESSAGE,
173
			VarNames.CHANNEL: channel,
174
			HandlerNames.NAME: HandlerNames.CHAT
175
		}
176
		if message.img.name:
177
			result[VarNames.IMG] = message.img.url
178
		if message.receiver_id:
179
			result[VarNames.RECEIVER_ID] = message.receiver.id
180
			result[VarNames.RECEIVER_NAME] = message.receiver.username
181
		return result
182
183
	@classmethod
184
	def get_messages(cls, messages):
185
		"""
186
		:type messages: list[Messages]
187
		:type messages: QuerySet[Messages]
188
		"""
189
		return {
190
			VarNames.CONTENT: [cls.create_send_message(message) for message in messages],
191
			VarNames.EVENT: Actions.GET_MESSAGES
192
		}
193
194
	@property
195
	def stored_redis_user(self):
196
		return  self.user_id
197
198
	@property
199
	def channel(self):
200
		return RedisPrefix.generate_user(self.user_id)
201
202
	def subscribe_direct_channel_message(self, room_id, other_user_id):
203
		return {
204
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
205
			VarNames.ROOM_ID: room_id,
206
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
207
			HandlerNames.NAME: HandlerNames.CHANNELS
208
		}
209
210
	def subscribe_room_channel_message(self, room_id, room_name):
211
		return {
212
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
213
			VarNames.ROOM_ID: room_id,
214
			VarNames.ROOM_USERS: [self.user_id],
215
			HandlerNames.NAME: HandlerNames.CHANNELS,
216
			VarNames.ROOM_NAME: room_name
217
		}
218
219
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
220
		return {
221
			VarNames.EVENT: Actions.INVITE_USER,
222
			VarNames.ROOM_ID: room_id,
223
			VarNames.USER_ID: user_id,
224
			HandlerNames.NAME: HandlerNames.CHANNELS,
225
			VarNames.ROOM_NAME: room_name,
226
			VarNames.CONTENT: users
227
		}
228
229
	def add_user_to_room(self, channel, user_id, content):
230
		return {
231
			VarNames.EVENT: Actions.ADD_USER,
232
			VarNames.CHANNEL: channel,
233
			VarNames.USER_ID: user_id,
234
			HandlerNames.NAME: HandlerNames.CHAT,
235
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
236
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
237
		}
238
239
	def unsubscribe_direct_message(self, room_id):
240
		return {
241
			VarNames.EVENT: Actions.DELETE_ROOM,
242
			VarNames.ROOM_ID: room_id,
243
			VarNames.USER_ID: self.user_id,
244
			HandlerNames.NAME: HandlerNames.CHANNELS,
245
			VarNames.TIME: get_milliseconds()
246
		}
247
248
249
class MessagesHandler(MessagesCreator):
250
251
	def __init__(self, *args, **kwargs):
252
		self.parsable_prefix = 'p'
253
		super(MessagesHandler, self).__init__(*args, **kwargs)
254
		self.id = id(self)
255
		self.log_id = str(self.id % 10000).rjust(4, '0')
256
		self.ip = None
257
		log_params = {
258
			'user_id': '000',
259
			'id': self.log_id,
260
			'ip': 'initializing'
261
		}
262
		from chat import global_redis
263
		self.async_redis_publisher = global_redis.async_redis_publisher
264
		self.sync_redis = global_redis.sync_redis
265
		self.channels = []
266
		self.call_receiver_channel = None
267
		self.logger = logging.LoggerAdapter(logger, log_params)
268
		self.async_redis = tornadoredis.Client()
269
		self.pre_process_message = {
270
			Actions.GET_MESSAGES: self.process_get_messages,
271
			Actions.SEND_MESSAGE: self.process_send_message,
272
			Actions.CALL: self.process_call,
273
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
274
			Actions.DELETE_ROOM: self.delete_channel,
275
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
276
			Actions.INVITE_USER: self.invite_user,
277
		}
278
		self.post_process_message = {
279
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
280
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
281
			Actions.DELETE_ROOM: self.send_client_delete_channel,
282
			Actions.INVITE_USER: self.send_client_new_channel,
283
			Actions.CALL: self.set_opponent_call_channel
284
		}
285
286
	@tornado.gen.engine
287
	def listen(self, channels):
288
		yield tornado.gen.Task(
289
			self.async_redis.subscribe, channels)
290
		self.async_redis.listen(self.new_message)
291
292
	@tornado.gen.engine
293
	def add_channel(self, channel):
294
		self.channels.append(channel)
295
		yield tornado.gen.Task(
296
			self.async_redis.subscribe, channel)
297
298
	def do_db(self, callback, *arg, **args):
299
		try:
300
			return callback(*arg, **args)
301
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
302
			self.logger.warning('%s, reconnecting' % e)  # TODO
303
			connection.close()
304
			return callback(*arg, **args)
305
306
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
307
		"""
308
		:rtype : dict
309
		returns (dict, bool) if check_type is present
310
		"""
311
		online = self.sync_redis.hgetall(channel)
312
		self.logger.debug('!! redis online: %s', online)
313
		result = set()
314
		user_is_online = False
315
		# redis stores REDIS_USER_FORMAT, so parse them
316
		if online:
317
			for key_hash, raw_user_id in online.items():  # py2 iteritems
318
				user_id = int(raw_user_id.decode('utf-8'))
319
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
320
					user_is_online = True
321
				result.add(user_id)
322
		result = list(result)
323
		return (result, user_is_online) if check_user_id else result
324
325
	def add_online_user(self, room_id):
326
		"""
327
		adds to redis
328
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
329
		:return:
330
		"""
331
		channel_key = RedisPrefix.generate_room(room_id)
332
		online = self.get_online_from_redis(channel_key)
333
		self.async_redis_publisher.hset(channel_key, self.id, self.stored_redis_user)
334
		if self.user_id not in online:  # if a new tab has been opened
335
			online.append(self.user_id)
336
			online_user_names_mes = self.room_online(
337
				online,
338
				Actions.LOGIN,
339
				channel_key
340
			)
341
			self.logger.info('!! First tab, sending refresh online for all')
342
			self.publish(online_user_names_mes, channel_key)
343
		else:  # Send user names to self
344
			online_user_names_mes = self.room_online(
345
				online,
346
				Actions.REFRESH_USER,
347
				channel_key
348
			)
349
			self.logger.info('!! Second tab, retrieving online for self')
350
			self.safe_write(online_user_names_mes)
351
352
	def publish(self, message, channel=None, parsable=False):
353
		if channel is None:
354
			raise ValidationError('lolol')
355
		jsoned_mess = json.dumps(message)
356
		self.logger.debug('<%s> %s', channel, jsoned_mess)
357
		if parsable:
358
			jsoned_mess = self.encode(jsoned_mess)
359
		self.async_redis_publisher.publish(channel, jsoned_mess)
360
361
	def encode(self, message):
362
		"""
363
		Marks message with prefix to specify that
364
		it should be decoded and proccesed before sending to client
365
		@param message: message to mark
366
		@return: marked message
367
		"""
368
		return self.parsable_prefix + message
369
370
	def decode(self, message):
371
		"""
372
		Check if message should be proccessed by server before writing to client
373
		@param message: message to check
374
		@return: Object structure of message if it should be processed, None if not
375
		"""
376
		if message.startswith(self.parsable_prefix):
377
			return json.loads(message[1:])
378
379
	def new_message(self, message):
380
		data = message.body
381
		if type(data) is not int:  # subscribe event
382
			decoded = self.decode(data)
383
			if decoded:
384
				data = decoded
385
			self.safe_write(data)
386
			if decoded:
387
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
388
389
	def safe_write(self, message):
390
		raise NotImplementedError('WebSocketHandler implements')
391
392
	def process_send_message(self, message):
393
		"""
394
		:type message: dict
395
		"""
396
		content = message[VarNames.CONTENT]
397
		receiver_id = message.get(VarNames.RECEIVER_ID)  # if receiver_id is None then its a private message
398
		self.logger.info('!! Sending message %s to user with id %s', content, receiver_id)
399
		channel = message[VarNames.CHANNEL]
400
		message_db = Message(
401
			sender_id=self.user_id,
402
			content=content
403
		)
404
		channel_id = RedisPrefix.extract_id(channel)
405
		message_db.room_id = channel_id
406
		if VarNames.IMG in message:
407
			message_db.img = extract_photo(message[VarNames.IMG])
408
		self.do_db(message_db.save)  # exit on hacked id with exception
409
		prepared_message = self.create_send_message(message_db)
410
		if message_db.receiver_id is None:
411
			self.logger.debug('!! Detected as public')
412
			self.publish(prepared_message, channel)
413
		else:
414
			self.publish(prepared_message, self.channel)
415
			self.logger.debug('!! Detected as private, channel %s', channel)
416
			if channel != self.channel:
417
				self.publish(prepared_message, channel)
418
419
	def process_call(self, in_message):
420
		"""
421
		:type in_message: dict
422
		"""
423
		call_type = in_message.get(VarNames.CALL_TYPE)
424
		set_opponent_channel = False
425
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
426
		if call_type == CallType.OFFER:
427
			to_channel = in_message[VarNames.CHANNEL]
428
			room_id = RedisPrefix.extract_id(to_channel)
429
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
430
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
431
			set_opponent_channel = True
432
			out_message[VarNames.CHANNEL] = to_channel
433
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
434
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
435
436
	def create_new_room(self, message):
437
		room_name = message[VarNames.ROOM_NAME]
438
		if not room_name or len(room_name) > 16:
439
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
440
		room = Room(name=room_name)
441
		room.save()
442
		room.users.add(self.user_id)
443
		room.save()
444
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
445
		self.publish(subscribe_message, self.channel, True)
446
447
	def invite_user(self, message):
448
		room_id = message[VarNames.ROOM_ID]
449
		user_id = message[VarNames.USER_ID]
450
		channel = RedisPrefix.generate_room(room_id)
451
		if channel not in self.channels:
452
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
453
		room = Room.objects.get(id=room_id)
454
		if room.is_private:
455
			raise ValidationError("You can't add users to direct room, create a new room instead")
456
		try:
457
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
458
		except IntegrityError:
459
			raise ValidationError("User is already in channel")
460
		users_in_room = {}
461
		for user in room.users.all():
462
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
463
		self.publish(self.add_user_to_room(channel, user_id, users_in_room[user_id]), channel)
464
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
465
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
466
467
	def create_user_channel(self, message):
468
		user_id = message[VarNames.USER_ID]
469
		cursor = connection.cursor()
470
		cursor.execute(GET_DIRECT_ROOM_ID, [self.user_id, user_id])
471
		query_res = cursor.fetchall()
472
		if len(query_res) > 0:
473
			result = query_res[0]
474
			room_id = result[0]
475
			disabled = result[1]
476
			if disabled is None:
477
				raise ValidationError('This room already exist')
478
			else:
479
				Room.objects.filter(id=room_id).update(disabled=None)
480
		else:
481
			room = Room()
482
			room.save()
483
			room.users.add(self.user_id, user_id)
484
			room.save()
485
			room_id = room.id
486
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
487
		self.publish(subscribe_message, self.channel, True)
488
		other_channel = RedisPrefix.generate_user(user_id)
489
		if self.channel != other_channel:
490
			self.publish(subscribe_message, other_channel, True)
491
492
	def delete_channel(self, message):
493
		room_id = message[VarNames.ROOM_ID]
494
		channel = RedisPrefix.generate_room(room_id)
495
		if channel not in self.channels or room_id == ALL_ROOM_ID:
496
			raise ValidationError('You are not allowed to delete this room')
497
		room = Room.objects.get(id=room_id)
498
		if room.disabled is not None:
499
			raise ValidationError('Room is already deleted')
500
		if room.name is None:  # if private then disable
501
			room.disabled = True
502
		else: # if public -> leave the room, delete the link
503
			room.users.remove(self.user_id)
504
			online = self.get_online_from_redis(channel)
505
			online.remove(self.user_id)
506
			self.publish(self.room_online(online, Actions.LOGOUT, channel), channel)
507
		room.save()
508
		message = self.unsubscribe_direct_message(room_id)
509
		self.publish(message, channel, True)
510
511
	def send_client_new_channel(self, message):
512
		room_id = message[VarNames.ROOM_ID]
513
		channel = RedisPrefix.generate_room(room_id)
514
		self.add_channel(channel)
515
		self.add_online_user(room_id)# TODO doesnt work if already subscribed
516
517
	def set_opponent_call_channel(self, message):
518
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
519
520
	def send_client_delete_channel(self, message):
521
		room_id = message[VarNames.ROOM_ID]
522
		channel = RedisPrefix.generate_room(room_id)
523
		self.async_redis.unsubscribe(channel)
524
		self.async_redis_publisher.hdel(channel, self.id)
525
		self.channels.remove(channel)
526
527
	def process_get_messages(self, data):
528
		"""
529
		:type data: dict
530
		"""
531
		header_id = data.get('headerId', None)
532
		count = int(data.get('count', 10))
533
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
534
		if header_id is None:
535
			messages = Message.objects.filter(
536
				# Only public or private or private
537
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
538
			).order_by('-pk')[:count]
539
		else:
540
			messages = Message.objects.filter(
541
				Q(id__lt=header_id),
542
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
543
			).order_by('-pk')[:count]
544
		response = self.do_db(self.get_messages, messages)
545
		self.safe_write(response)
546
547
	def get_users_in_current_user_rooms(self):
548
		"""
549
		{
550
			"ROOM_ID:1": {
551
				"name": "All",
552
				"users": {
553
					"USER_ID:admin": {
554
						"name": "USER_NAME:admin",
555
						"sex": "SEX:Secret"
556
					},
557
					"USER_ID_2": {
558
						"name": "USER_NAME:Mike",
559
						"sex": "Male"
560
					}
561
				},
562
				"isPrivate": true
563
			}
564
		}
565
		"""
566
		cursor = connection.cursor()
567
		cursor.execute(USER_ROOMS_QUERY, [self.user_id])
568
		query_res = cursor.fetchall()
569
		res = {}
570
		for user in query_res:
571
			user_id = user[0]
572
			user_name = user[1]
573
			user_sex = user[2]
574
			room_id = user[3]
575
			room_name = user[4]
576
			if room_id not in res:
577
				res[room_id] = {
578
					VarNames.ROOM_NAME: room_name,
579
					VarNames.ROOM_USERS: {}
580
				}
581
			self.set_js_user_structure(res[room_id][VarNames.ROOM_USERS], user_id, user_name, user_sex)
582
		return res
583
584
	def set_js_user_structure(self, user_dict, user_id, name, sex):
585
		user_dict[user_id] = {
586
			VarNames.USER: name,
587
			VarNames.GENDER: GENDERS[sex]
588
		}
589
590
	def save_ip(self):
591
		if (self.do_db(UserJoinedInfo.objects.filter(
592
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
593
			return
594
		ip_address = self.get_or_create_ip()
595
		UserJoinedInfo.objects.create(
596
			ip=ip_address,
597
			user_id=self.user_id
598
		)
599
600
	def get_or_create_ip(self):
601
		try:
602
			ip_address = IpAddress.objects.get(ip=self.ip)
603
		except IpAddress.DoesNotExist:
604
			try:
605
				if not api_url:
606
					raise Exception('api url is absent')
607
				self.logger.debug("Creating ip record %s", self.ip)
608
				f = urlopen(api_url % self.ip)
609
				raw_response = f.read().decode("utf-8")
610
				response = json.loads(raw_response)
611
				if response['status'] != "success":
612
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
613
				ip_address = IpAddress.objects.create(
614
					ip=self.ip,
615
					isp=response['isp'],
616
					country=response['country'],
617
					region=response['regionName'],
618
					city=response['city'],
619
					country_code=response['countryCode']
620
				)
621
			except Exception as e:
622
				self.logger.error("Error while creating ip with country info, because %s", e)
623
				ip_address = IpAddress.objects.create(ip=self.ip)
624
		return ip_address
625
626
627
class AntiSpam(object):
628
629
	def __init__(self):
630
		self.spammed = 0
631
		self.info = {}
632
633
	def check_spam(self, json_message):
634
		message_length = len(json_message)
635
		info_key = int(round(time.time() * 100))
636
		self.info[info_key] = message_length
637
		if message_length > MAX_MESSAGE_SIZE:
638
			self.spammed += 1
639
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
640
		self.check_timed_spam()
641
642
	def check_timed_spam(self):
643
		# TODO implement me
644
		pass
645
		# raise ValidationError("You're chatting too much, calm down a bit!")
646
647
648
class TornadoHandler(WebSocketHandler, MessagesHandler):
649
650
	def __init__(self, *args, **kwargs):
651
		super(TornadoHandler, self).__init__(*args, **kwargs)
652
		self.connected = False
653
		self.anti_spam = AntiSpam()
654
655
	def data_received(self, chunk):
656
		pass
657
658
	def on_message(self, json_message):
659
		try:
660
			if not self.connected:
661
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
662
			if not json_message:
663
				raise ValidationError('Skipping null message')
664
			# self.anti_spam.check_spam(json_message)
665
			self.logger.debug('<< %s', json_message)
666
			message = json.loads(json_message)
667
			if message[VarNames.EVENT] not in self.pre_process_message:
668
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
669
			channel = message.get(VarNames.CHANNEL)
670
			if channel and channel not in self.channels:
671
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
672
			self.pre_process_message[message[VarNames.EVENT]](message)
673
		except ValidationError as e:
674
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
675
			self.safe_write(error_message)
676
677
	def on_close(self):
678
		if self.async_redis.subscribed:
679
			self.async_redis.unsubscribe(self.channels)
680
		log_data = {}
681
		for channel in self.channels:
682
			if channel.startswith(RedisPrefix.ROOM_CHANNEL_PREFIX):
683
				self.sync_redis.hdel(channel, self.id)
684
				if self.connected:
685
					# seems like async solves problem with connection lost and wrong data status
686
					# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
687
					online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
688
					log_data[channel] = {'online': online, 'is_online': is_online}
689
					if not is_online:
690
						message = self.room_online(online, Actions.LOGOUT, channel)
691
						self.publish(message, channel)
692
		self.logger.info("Close connection result: %s", json.dumps(log_data))
693
		self.async_redis.disconnect()
694
695
	def open(self):
696
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
697
		if sessionStore.exists(session_key):
698
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
699
			self.ip = self.get_client_ip()
700
			session = SessionStore(session_key)
701
			self.user_id = int(session["_auth_user_id"])
702
			log_params = {
703
				'user_id': str(self.user_id).zfill(3),
704
				'id': self.log_id,
705
				'ip': self.ip
706
			}
707
			self.logger = logging.LoggerAdapter(logger, log_params)
708
			self.async_redis.connect()
709
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
710
			self.sender_name = user_db.username
711
			self.sex = user_db.sex_str
712
			user_rooms = self.get_users_in_current_user_rooms()
713
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
714
			self.channels.clear()
715
			self.channels.append(self.channel)
716
			for room_id in user_rooms:
717
				self.channels.append(RedisPrefix.generate_room(room_id))
718
			self.listen(self.channels)
719
			for room_id in user_rooms:
720
				self.add_online_user(room_id)
721
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
722
			self.connected = True
723
			Thread(target=self.save_ip).start()
724
		else:
725
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
726
			self.close(403, "Session key %s has been rejected" % session_key)
727
728
	def check_origin(self, origin):
729
		"""
730
		check whether browser set domain matches origin
731
		"""
732
		parsed_origin = urlparse(origin)
733
		origin = parsed_origin.netloc
734
		origin_domain = origin.split(':')[0].lower()
735
		browser_set = self.request.headers.get("Host")
736
		browser_domain = browser_set.split(':')[0]
737
		return browser_domain == origin_domain
738
739
	def safe_write(self, message):
740
		"""
741
		Tries to send message, doesn't throw exception outside
742
		:type self: MessagesHandler
743
		"""
744
		# self.logger.debug('<< THREAD %s >>', os.getppid())
745
		try:
746
			if isinstance(message, dict):
747
				message = json.dumps(message)
748
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
749
				raise ValueError('Wrong message type : %s' % str(message))
750
			self.logger.debug(">> %s", message)
751
			self.write_message(message)
752
		except tornado.websocket.WebSocketClosedError as e:
753
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
754
755
	def get_client_ip(self):
756
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
757