Completed
Push — master ( 7d7722...0882b1 )
by Andrew
01:07
created

MessagesCreator.create_message()   A

Complexity

Conditions 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 11
rs 9.4285
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, USER_ROOMS_QUERY, GENDERS, GET_DIRECT_ROOM_ID
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo
30
31
PY3 = sys.version > '3'
32
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions:
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	SYSTEM_MESSAGE = 'system'
54
	GROWL_MESSAGE = 'growl'
55
	GET_MESSAGES = 'loadMessages'
56
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
57
	DELETE_ROOM = 'deleteRoom'
58
	CREATE_ROOM_CHANNEL = 'addRoom'
59
	INVITE_USER = 'inviteUser'
60
	ADD_USER = 'addUserToAll'
61
62
63
class VarNames:
64
	RECEIVER_ID = 'receiverId'
65
	RECEIVER_NAME = 'receiverName'
66
	CALL_TYPE = 'type'
67
	USER = 'user'
68
	USER_ID =  'userId'
69
	TIME = 'time'
70
	CONTENT = 'content'
71
	IMG = 'image'
72
	EVENT = 'action'
73
	MESSAGE_ID = 'id'
74
	GENDER = 'sex'
75
	ROOM_NAME = 'name'
76
	ROOM_ID = 'roomId'
77
	ROOM_USERS = 'users'
78
	CHANNEL = 'channel'
79
	GET_MESSAGES_COUNT = 'count'
80
	GET_MESSAGES_HEADER_ID = 'headerId'
81
	CHANNEL_NAME = 'channel'
82
	IS_ROOM_PRIVATE = 'private'
83
	#ROOM_NAME = 'roomName'
84
	# ROOM_ID = 'roomId'
85
86
class CallType:
87
	OFFER = 'offer'
88
89
class HandlerNames:
90
	NAME = 'handler'
91
	CHANNELS = 'channels'
92
	CHAT = 'chat'
93
	GROWL = 'growl'
94
	WEBRTC = 'webrtc'
95
96
97
class RedisPrefix:
98
	USER_ID_CHANNEL_PREFIX = 'u'
99
	ROOM_CHANNEL_PREFIX = 'r'
100
	__ROOM_ONLINE__ = 'o:{}'
101
102
	@classmethod
103
	def generate_user(cls, key):
104
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
105
106
	@classmethod
107
	def generate_room(cls, key):
108
		return cls.ROOM_CHANNEL_PREFIX + str(key)
109
110
	@classmethod
111
	def extract_id(cls, channel):
112
		return int(channel[1:])
113
114
RedisPrefix.DEFAULT_CHANNEL = RedisPrefix.generate_room(ALL_ROOM_ID)
115
116
117
class MessagesCreator(object):
118
119
	def __init__(self, *args, **kwargs):
120
		super(MessagesCreator, self).__init__(*args, **kwargs)
121
		self.sex = None
122
		self.sender_name = None
123
		self.user_id = 0  # anonymous by default
124
125
	def default(self, content, event, handler):
126
		"""
127
		:return: {"action": event, "content": content, "time": "20:48:57"}
128
		"""
129
		return {
130
			VarNames.EVENT: event,
131
			VarNames.CONTENT: content,
132
			VarNames.USER_ID: self.user_id,
133
			VarNames.TIME: get_milliseconds(),
134
			HandlerNames.NAME: handler
135
		}
136
137
	def room_online(self, online, event, channel):
138
		"""
139
		:return: {"action": event, "content": content, "time": "20:48:57"}
140
		"""
141
		room_less = self.default(online, event, HandlerNames.CHAT)
142
		room_less[VarNames.CHANNEL_NAME] = channel
143
		room_less[VarNames.USER] = self.sender_name
144
		room_less[VarNames.GENDER] = self.sex
145
		return room_less
146
147
	def offer_call(self, content, message_type):
148
		"""
149
		:return: {"action": "call", "content": content, "time": "20:48:57"}
150
		"""
151
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
152
		message[VarNames.CALL_TYPE] = message_type
153
		message[VarNames.USER] = self.sender_name
154
		return message
155
156
	@classmethod
157
	def create_message(cls, message):
158
		res = {
159
			VarNames.USER_ID: message.sender_id,
160
			VarNames.CONTENT: message.content,
161
			VarNames.TIME: message.time,
162
			VarNames.MESSAGE_ID: message.id,
163
		}
164
		if message.img.name:
165
			res[VarNames.IMG] = message.img.url
166
		return res
167
168
	@classmethod
169
	def create_send_message(cls, message):
170
		"""
171
		:param message:
172
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
173
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
174
		"""
175
		channel = RedisPrefix.generate_room(message.room_id)
176
		res = cls.create_message(message)
177
		res[VarNames.EVENT] = Actions.PRINT_MESSAGE,
178
		res[VarNames.CHANNEL] = channel
179
		res[HandlerNames.NAME] = HandlerNames.CHAT
180
		return res
181
182
	@classmethod
183
	def get_messages(cls, messages, channel):
184
		"""
185
		:type messages: list[Messages]
186
		:type channel: str
187
		:type messages: QuerySet[Messages]
188
		"""
189
		return {
190
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
191
			VarNames.EVENT: Actions.GET_MESSAGES,
192
			VarNames.CHANNEL: channel,
193
			HandlerNames.NAME: HandlerNames.CHAT
194
		}
195
196
	@property
197
	def stored_redis_user(self):
198
		return  self.user_id
199
200
	@property
201
	def channel(self):
202
		return RedisPrefix.generate_user(self.user_id)
203
204
	def subscribe_direct_channel_message(self, room_id, other_user_id):
205
		return {
206
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
207
			VarNames.ROOM_ID: room_id,
208
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
209
			HandlerNames.NAME: HandlerNames.CHANNELS
210
		}
211
212
	def subscribe_room_channel_message(self, room_id, room_name):
213
		return {
214
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
215
			VarNames.ROOM_ID: room_id,
216
			VarNames.ROOM_USERS: [self.user_id],
217
			HandlerNames.NAME: HandlerNames.CHANNELS,
218
			VarNames.ROOM_NAME: room_name
219
		}
220
221
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
222
		return {
223
			VarNames.EVENT: Actions.INVITE_USER,
224
			VarNames.ROOM_ID: room_id,
225
			VarNames.USER_ID: user_id,
226
			HandlerNames.NAME: HandlerNames.CHANNELS,
227
			VarNames.ROOM_NAME: room_name,
228
			VarNames.CONTENT: users
229
		}
230
231
	def add_user_to_room(self, channel, user_id, content):
232
		return {
233
			VarNames.EVENT: Actions.ADD_USER,
234
			VarNames.CHANNEL: channel,
235
			VarNames.USER_ID: user_id,
236
			HandlerNames.NAME: HandlerNames.CHAT,
237
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
238
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
239
		}
240
241
	def unsubscribe_direct_message(self, room_id):
242
		return {
243
			VarNames.EVENT: Actions.DELETE_ROOM,
244
			VarNames.ROOM_ID: room_id,
245
			VarNames.USER_ID: self.user_id,
246
			HandlerNames.NAME: HandlerNames.CHANNELS,
247
			VarNames.TIME: get_milliseconds()
248
		}
249
250
251
class MessagesHandler(MessagesCreator):
252
253
	def __init__(self, *args, **kwargs):
254
		self.parsable_prefix = 'p'
255
		super(MessagesHandler, self).__init__(*args, **kwargs)
256
		self.id = id(self)
257
		self.log_id = str(self.id % 10000).rjust(4, '0')
258
		self.ip = None
259
		log_params = {
260
			'user_id': '000',
261
			'id': self.log_id,
262
			'ip': 'initializing'
263
		}
264
		from chat import global_redis
265
		self.async_redis_publisher = global_redis.async_redis_publisher
266
		self.sync_redis = global_redis.sync_redis
267
		self.channels = []
268
		self.call_receiver_channel = None
269
		self.logger = logging.LoggerAdapter(logger, log_params)
270
		self.async_redis = tornadoredis.Client()
271
		self.pre_process_message = {
272
			Actions.GET_MESSAGES: self.process_get_messages,
273
			Actions.SEND_MESSAGE: self.process_send_message,
274
			Actions.CALL: self.process_call,
275
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
276
			Actions.DELETE_ROOM: self.delete_channel,
277
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
278
			Actions.INVITE_USER: self.invite_user,
279
		}
280
		self.post_process_message = {
281
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
282
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
283
			Actions.DELETE_ROOM: self.send_client_delete_channel,
284
			Actions.INVITE_USER: self.send_client_new_channel,
285
			Actions.CALL: self.set_opponent_call_channel
286
		}
287
288
	@tornado.gen.engine
289
	def listen(self, channels):
290
		yield tornado.gen.Task(
291
			self.async_redis.subscribe, channels)
292
		self.async_redis.listen(self.new_message)
293
294
	@tornado.gen.engine
295
	def add_channel(self, channel):
296
		self.channels.append(channel)
297
		yield tornado.gen.Task(
298
			self.async_redis.subscribe, channel)
299
300
	def do_db(self, callback, *arg, **args):
301
		try:
302
			return callback(*arg, **args)
303
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
304
			self.logger.warning('%s, reconnecting' % e)  # TODO
305
			connection.close()
306
			return callback(*arg, **args)
307
308
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
309
		"""
310
		:rtype : dict
311
		returns (dict, bool) if check_type is present
312
		"""
313
		online = self.sync_redis.hgetall(channel)
314
		self.logger.debug('!! redis online: %s', online)
315
		result = set()
316
		user_is_online = False
317
		# redis stores REDIS_USER_FORMAT, so parse them
318
		if online:
319
			for key_hash, raw_user_id in online.items():  # py2 iteritems
320
				user_id = int(raw_user_id.decode('utf-8'))
321
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
322
					user_is_online = True
323
				result.add(user_id)
324
		result = list(result)
325
		return (result, user_is_online) if check_user_id else result
326
327
	def add_online_user(self, room_id):
328
		"""
329
		adds to redis
330
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
331
		:return:
332
		"""
333
		channel_key = RedisPrefix.generate_room(room_id)
334
		online = self.get_online_from_redis(channel_key)
335
		self.async_redis_publisher.hset(channel_key, self.id, self.stored_redis_user)
336
		if self.user_id not in online:  # if a new tab has been opened
337
			online.append(self.user_id)
338
			online_user_names_mes = self.room_online(
339
				online,
340
				Actions.LOGIN,
341
				channel_key
342
			)
343
			self.logger.info('!! First tab, sending refresh online for all')
344
			self.publish(online_user_names_mes, channel_key)
345
		else:  # Send user names to self
346
			online_user_names_mes = self.room_online(
347
				online,
348
				Actions.REFRESH_USER,
349
				channel_key
350
			)
351
			self.logger.info('!! Second tab, retrieving online for self')
352
			self.safe_write(online_user_names_mes)
353
354
	def publish(self, message, channel=None, parsable=False):
355
		if channel is None:
356
			raise ValidationError('lolol')
357
		jsoned_mess = json.dumps(message)
358
		self.logger.debug('<%s> %s', channel, jsoned_mess)
359
		if parsable:
360
			jsoned_mess = self.encode(jsoned_mess)
361
		self.async_redis_publisher.publish(channel, jsoned_mess)
362
363
	def encode(self, message):
364
		"""
365
		Marks message with prefix to specify that
366
		it should be decoded and proccesed before sending to client
367
		@param message: message to mark
368
		@return: marked message
369
		"""
370
		return self.parsable_prefix + message
371
372
	def decode(self, message):
373
		"""
374
		Check if message should be proccessed by server before writing to client
375
		@param message: message to check
376
		@return: Object structure of message if it should be processed, None if not
377
		"""
378
		if message.startswith(self.parsable_prefix):
379
			return json.loads(message[1:])
380
381
	def new_message(self, message):
382
		data = message.body
383
		if type(data) is not int:  # subscribe event
384
			decoded = self.decode(data)
385
			if decoded:
386
				data = decoded
387
			self.safe_write(data)
388
			if decoded:
389
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
390
391
	def safe_write(self, message):
392
		raise NotImplementedError('WebSocketHandler implements')
393
394
	def process_send_message(self, message):
395
		"""
396
		:type message: dict
397
		"""
398
		content = message[VarNames.CONTENT]
399
		receiver_id = message.get(VarNames.RECEIVER_ID)  # if receiver_id is None then its a private message
400
		self.logger.info('!! Sending message %s to user with id %s', content, receiver_id)
401
		channel = message[VarNames.CHANNEL]
402
		message_db = Message(
403
			sender_id=self.user_id,
404
			content=content
405
		)
406
		channel_id = RedisPrefix.extract_id(channel)
407
		message_db.room_id = channel_id
408
		if VarNames.IMG in message:
409
			message_db.img = extract_photo(message[VarNames.IMG])
410
		self.do_db(message_db.save)  # exit on hacked id with exception
411
		prepared_message = self.create_send_message(message_db)
412
		if message_db.receiver_id is None:
413
			self.logger.debug('!! Detected as public')
414
			self.publish(prepared_message, channel)
415
		else:
416
			self.publish(prepared_message, self.channel)
417
			self.logger.debug('!! Detected as private, channel %s', channel)
418
			if channel != self.channel:
419
				self.publish(prepared_message, channel)
420
421
	def process_call(self, in_message):
422
		"""
423
		:type in_message: dict
424
		"""
425
		call_type = in_message.get(VarNames.CALL_TYPE)
426
		set_opponent_channel = False
427
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
428
		if call_type == CallType.OFFER:
429
			to_channel = in_message[VarNames.CHANNEL]
430
			room_id = RedisPrefix.extract_id(to_channel)
431
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
432
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
433
			set_opponent_channel = True
434
			out_message[VarNames.CHANNEL] = to_channel
435
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
436
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
437
438
	def create_new_room(self, message):
439
		room_name = message[VarNames.ROOM_NAME]
440
		if not room_name or len(room_name) > 16:
441
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
442
		room = Room(name=room_name)
443
		room.save()
444
		room.users.add(self.user_id)
445
		room.save()
446
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
447
		self.publish(subscribe_message, self.channel, True)
448
449
	def invite_user(self, message):
450
		room_id = message[VarNames.ROOM_ID]
451
		user_id = message[VarNames.USER_ID]
452
		channel = RedisPrefix.generate_room(room_id)
453
		if channel not in self.channels:
454
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
455
		room = Room.objects.get(id=room_id)
456
		if room.is_private:
457
			raise ValidationError("You can't add users to direct room, create a new room instead")
458
		try:
459
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
460
		except IntegrityError:
461
			raise ValidationError("User is already in channel")
462
		users_in_room = {}
463
		for user in room.users.all():
464
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
465
		self.publish(self.add_user_to_room(channel, user_id, users_in_room[user_id]), channel)
466
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
467
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
468
469
	def create_user_channel(self, message):
470
		user_id = message[VarNames.USER_ID]
471
		cursor = connection.cursor()
472
		cursor.execute(GET_DIRECT_ROOM_ID, [self.user_id, user_id])
473
		query_res = cursor.fetchall()
474
		if len(query_res) > 0:
475
			result = query_res[0]
476
			room_id = result[0]
477
			disabled = result[1]
478
			if disabled is None:
479
				raise ValidationError('This room already exist')
480
			else:
481
				Room.objects.filter(id=room_id).update(disabled=None)
482
		else:
483
			room = Room()
484
			room.save()
485
			room.users.add(self.user_id, user_id)
486
			room.save()
487
			room_id = room.id
488
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
489
		self.publish(subscribe_message, self.channel, True)
490
		other_channel = RedisPrefix.generate_user(user_id)
491
		if self.channel != other_channel:
492
			self.publish(subscribe_message, other_channel, True)
493
494
	def delete_channel(self, message):
495
		room_id = message[VarNames.ROOM_ID]
496
		channel = RedisPrefix.generate_room(room_id)
497
		if channel not in self.channels or room_id == ALL_ROOM_ID:
498
			raise ValidationError('You are not allowed to delete this room')
499
		room = Room.objects.get(id=room_id)
500
		if room.disabled is not None:
501
			raise ValidationError('Room is already deleted')
502
		if room.name is None:  # if private then disable
503
			room.disabled = True
504
		else: # if public -> leave the room, delete the link
505
			room.users.remove(self.user_id)
506
			online = self.get_online_from_redis(channel)
507
			online.remove(self.user_id)
508
			self.publish(self.room_online(online, Actions.LOGOUT, channel), channel)
509
		room.save()
510
		message = self.unsubscribe_direct_message(room_id)
511
		self.publish(message, channel, True)
512
513
	def send_client_new_channel(self, message):
514
		room_id = message[VarNames.ROOM_ID]
515
		channel = RedisPrefix.generate_room(room_id)
516
		self.add_channel(channel)
517
		self.add_online_user(room_id)# TODO doesnt work if already subscribed
518
519
	def set_opponent_call_channel(self, message):
520
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
521
522
	def send_client_delete_channel(self, message):
523
		room_id = message[VarNames.ROOM_ID]
524
		channel = RedisPrefix.generate_room(room_id)
525
		self.async_redis.unsubscribe(channel)
526
		self.async_redis_publisher.hdel(channel, self.id)
527
		self.channels.remove(channel)
528
529
	def process_get_messages(self, data):
530
		"""
531
		:type data: dict
532
		"""
533
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
534
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
535
		channel = data[VarNames.CHANNEL]
536
		room_id = RedisPrefix.extract_id(channel)
537
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
538
		if header_id is None:
539
			messages = Message.objects.filter(Q(room_id=room_id)).order_by('-pk')[:count]
540
		else:
541
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id)).order_by('-pk')[:count]
542
		response = self.do_db(self.get_messages, messages, channel)
543
		self.safe_write(response)
544
545
	def get_users_in_current_user_rooms(self):
546
		"""
547
		{
548
			"ROOM_ID:1": {
549
				"name": "All",
550
				"users": {
551
					"USER_ID:admin": {
552
						"name": "USER_NAME:admin",
553
						"sex": "SEX:Secret"
554
					},
555
					"USER_ID_2": {
556
						"name": "USER_NAME:Mike",
557
						"sex": "Male"
558
					}
559
				},
560
				"isPrivate": true
561
			}
562
		}
563
		"""
564
		cursor = connection.cursor()
565
		cursor.execute(USER_ROOMS_QUERY, [self.user_id])
566
		query_res = cursor.fetchall()
567
		res = {}
568
		for user in query_res:
569
			user_id = user[0]
570
			user_name = user[1]
571
			user_sex = user[2]
572
			room_id = user[3]
573
			room_name = user[4]
574
			if room_id not in res:
575
				res[room_id] = {
576
					VarNames.ROOM_NAME: room_name,
577
					VarNames.ROOM_USERS: {}
578
				}
579
			self.set_js_user_structure(res[room_id][VarNames.ROOM_USERS], user_id, user_name, user_sex)
580
		return res
581
582
	def set_js_user_structure(self, user_dict, user_id, name, sex):
583
		user_dict[user_id] = {
584
			VarNames.USER: name,
585
			VarNames.GENDER: GENDERS[sex]
586
		}
587
588
	def save_ip(self):
589
		if (self.do_db(UserJoinedInfo.objects.filter(
590
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
591
			return
592
		ip_address = self.get_or_create_ip()
593
		UserJoinedInfo.objects.create(
594
			ip=ip_address,
595
			user_id=self.user_id
596
		)
597
598
	def get_or_create_ip(self):
599
		try:
600
			ip_address = IpAddress.objects.get(ip=self.ip)
601
		except IpAddress.DoesNotExist:
602
			try:
603
				if not api_url:
604
					raise Exception('api url is absent')
605
				self.logger.debug("Creating ip record %s", self.ip)
606
				f = urlopen(api_url % self.ip)
607
				raw_response = f.read().decode("utf-8")
608
				response = json.loads(raw_response)
609
				if response['status'] != "success":
610
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
611
				ip_address = IpAddress.objects.create(
612
					ip=self.ip,
613
					isp=response['isp'],
614
					country=response['country'],
615
					region=response['regionName'],
616
					city=response['city'],
617
					country_code=response['countryCode']
618
				)
619
			except Exception as e:
620
				self.logger.error("Error while creating ip with country info, because %s", e)
621
				ip_address = IpAddress.objects.create(ip=self.ip)
622
		return ip_address
623
624
625
class AntiSpam(object):
626
627
	def __init__(self):
628
		self.spammed = 0
629
		self.info = {}
630
631
	def check_spam(self, json_message):
632
		message_length = len(json_message)
633
		info_key = int(round(time.time() * 100))
634
		self.info[info_key] = message_length
635
		if message_length > MAX_MESSAGE_SIZE:
636
			self.spammed += 1
637
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
638
		self.check_timed_spam()
639
640
	def check_timed_spam(self):
641
		# TODO implement me
642
		pass
643
		# raise ValidationError("You're chatting too much, calm down a bit!")
644
645
646
class TornadoHandler(WebSocketHandler, MessagesHandler):
647
648
	def __init__(self, *args, **kwargs):
649
		super(TornadoHandler, self).__init__(*args, **kwargs)
650
		self.connected = False
651
		self.anti_spam = AntiSpam()
652
653
	def data_received(self, chunk):
654
		pass
655
656
	def on_message(self, json_message):
657
		try:
658
			if not self.connected:
659
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
660
			if not json_message:
661
				raise ValidationError('Skipping null message')
662
			# self.anti_spam.check_spam(json_message)
663
			self.logger.debug('<< %s', json_message)
664
			message = json.loads(json_message)
665
			if message[VarNames.EVENT] not in self.pre_process_message:
666
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
667
			channel = message.get(VarNames.CHANNEL)
668
			if channel and channel not in self.channels:
669
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
670
			self.pre_process_message[message[VarNames.EVENT]](message)
671
		except ValidationError as e:
672
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
673
			self.safe_write(error_message)
674
675
	def on_close(self):
676
		if self.async_redis.subscribed:
677
			self.async_redis.unsubscribe(self.channels)
678
		log_data = {}
679
		for channel in self.channels:
680
			if channel.startswith(RedisPrefix.ROOM_CHANNEL_PREFIX):
681
				self.sync_redis.hdel(channel, self.id)
682
				if self.connected:
683
					# seems like async solves problem with connection lost and wrong data status
684
					# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
685
					online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
686
					log_data[channel] = {'online': online, 'is_online': is_online}
687
					if not is_online:
688
						message = self.room_online(online, Actions.LOGOUT, channel)
689
						self.publish(message, channel)
690
		self.logger.info("Close connection result: %s", json.dumps(log_data))
691
		self.async_redis.disconnect()
692
693
	def open(self):
694
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
695
		if sessionStore.exists(session_key):
696
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
697
			self.ip = self.get_client_ip()
698
			session = SessionStore(session_key)
699
			self.user_id = int(session["_auth_user_id"])
700
			log_params = {
701
				'user_id': str(self.user_id).zfill(3),
702
				'id': self.log_id,
703
				'ip': self.ip
704
			}
705
			self.logger = logging.LoggerAdapter(logger, log_params)
706
			self.async_redis.connect()
707
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
708
			self.sender_name = user_db.username
709
			self.sex = user_db.sex_str
710
			user_rooms = self.get_users_in_current_user_rooms()
711
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
712
			self.channels.clear()
713
			self.channels.append(self.channel)
714
			for room_id in user_rooms:
715
				self.channels.append(RedisPrefix.generate_room(room_id))
716
			self.listen(self.channels)
717
			for room_id in user_rooms:
718
				self.add_online_user(room_id)
719
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
720
			self.connected = True
721
			Thread(target=self.save_ip).start()
722
		else:
723
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
724
			self.close(403, "Session key %s has been rejected" % session_key)
725
726
	def check_origin(self, origin):
727
		"""
728
		check whether browser set domain matches origin
729
		"""
730
		parsed_origin = urlparse(origin)
731
		origin = parsed_origin.netloc
732
		origin_domain = origin.split(':')[0].lower()
733
		browser_set = self.request.headers.get("Host")
734
		browser_domain = browser_set.split(':')[0]
735
		return browser_domain == origin_domain
736
737
	def safe_write(self, message):
738
		"""
739
		Tries to send message, doesn't throw exception outside
740
		:type self: MessagesHandler
741
		"""
742
		# self.logger.debug('<< THREAD %s >>', os.getppid())
743
		try:
744
			if isinstance(message, dict):
745
				message = json.dumps(message)
746
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
747
				raise ValueError('Wrong message type : %s' % str(message))
748
			self.logger.debug(">> %s", message)
749
			self.write_message(message)
750
		except tornado.websocket.WebSocketClosedError as e:
751
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
752
753
	def get_client_ip(self):
754
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
755