Completed
Push — master ( 8cdef5...b0b3ad )
by Andrew
01:12
created

MessagesHandler.safe_write()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 2
rs 10
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError, IntegrityError
17
from django.db.models import Q, F
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo, RoomUsers
30
31
PY3 = sys.version > '3'
32
33
api_url = getattr(settings, "IP_API_URL", None)
34
35
sessionStore = SessionStore()
36
37
logger = logging.getLogger(__name__)
38
39
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
40
#CONNECTION_POOL = tornadoredis.ConnectionPool(
41
#	max_connections=500,
42
#	wait_for_available=True)
43
44
45
class Actions:
46
	LOGIN = 'addOnlineUser'
47
	LOGOUT = 'removeOnlineUser'
48
	SEND_MESSAGE = 'sendMessage'
49
	PRINT_MESSAGE = 'printMessage'
50
	CALL = 'call'
51
	ROOMS = 'setRooms'
52
	REFRESH_USER = 'setOnlineUsers'
53
	GROWL_MESSAGE = 'growl'
54
	GET_MESSAGES = 'loadMessages'
55
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
56
	DELETE_ROOM = 'deleteRoom'
57
	CREATE_ROOM_CHANNEL = 'addRoom'
58
	INVITE_USER = 'inviteUser'
59
	ADD_USER = 'addUserToAll'
60
	OFFLINE_MESSAGES = 'loadOfflineMessages'
61
62
63
class VarNames:
64
	CALL_TYPE = 'type'
65
	USER = 'user'
66
	USER_ID = 'userId'
67
	TIME = 'time'
68
	CONTENT = 'content'
69
	IMG = 'image'
70
	EVENT = 'action'
71
	MESSAGE_ID = 'id'
72
	GENDER = 'sex'
73
	ROOM_NAME = 'name'
74
	ROOM_ID = 'roomId'
75
	ROOM_USERS = 'users'
76
	CHANNEL = 'channel'
77
	GET_MESSAGES_COUNT = 'count'
78
	GET_MESSAGES_HEADER_ID = 'headerId'
79
	CHANNEL_NAME = 'channel'
80
	IS_ROOM_PRIVATE = 'private'
81
	#ROOM_NAME = 'roomName'
82
	# ROOM_ID = 'roomId'
83
84
85
class CallType:
86
	OFFER = 'offer'
87
88
class HandlerNames:
89
	NAME = 'handler'
90
	CHANNELS = 'channels'
91
	CHAT = 'chat'
92
	GROWL = 'growl'
93
	WEBRTC = 'webrtc'
94
	FILE = 'file'
95
96
97
class RedisPrefix:
98
	USER_ID_CHANNEL_PREFIX = 'u'
99
	ROOM_CHANNEL_PREFIX = 'r'
100
	__ROOM_ONLINE__ = 'o:{}'
101
102
	@classmethod
103
	def generate_user(cls, key):
104
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
105
106
	@classmethod
107
	def generate_room(cls, key):
108
		return cls.ROOM_CHANNEL_PREFIX + str(key)
109
110
	@classmethod
111
	def extract_id(cls, channel):
112
		return int(channel[1:])
113
114
RedisPrefix.DEFAULT_CHANNEL = RedisPrefix.generate_room(ALL_ROOM_ID)
115
116
117
class MessagesCreator(object):
118
119
	def __init__(self, *args, **kwargs):
120
		super(MessagesCreator, self).__init__(*args, **kwargs)
121
		self.sex = None
122
		self.sender_name = None
123
		self.user_id = 0  # anonymous by default
124
125
	def default(self, content, event, handler):
126
		"""
127
		:return: {"action": event, "content": content, "time": "20:48:57"}
128
		"""
129
		return {
130
			VarNames.EVENT: event,
131
			VarNames.CONTENT: content,
132
			VarNames.USER_ID: self.user_id,
133
			VarNames.TIME: get_milliseconds(),
134
			HandlerNames.NAME: handler
135
		}
136
137
	def room_online(self, online, event, channel):
138
		"""
139
		:return: {"action": event, "content": content, "time": "20:48:57"}
140
		"""
141
		room_less = self.default(online, event, HandlerNames.CHAT)
142
		room_less[VarNames.CHANNEL_NAME] = channel
143
		room_less[VarNames.USER] = self.sender_name
144
		room_less[VarNames.GENDER] = self.sex
145
		return room_less
146
147
	def offer_call(self, content, message_type):
148
		"""
149
		:return: {"action": "call", "content": content, "time": "20:48:57"}
150
		"""
151
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
152
		message[VarNames.CALL_TYPE] = message_type
153
		message[VarNames.USER] = self.sender_name
154
		return message
155
156
	@classmethod
157
	def create_message(cls, message):
158
		res = {
159
			VarNames.USER_ID: message.sender_id,
160
			VarNames.CONTENT: message.content,
161
			VarNames.TIME: message.time,
162
			VarNames.MESSAGE_ID: message.id,
163
		}
164
		if message.img.name:
165
			res[VarNames.IMG] = message.img.url
166
		return res
167
168
	@classmethod
169
	def create_send_message(cls, message):
170
		"""
171
		:param message:
172
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
173
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
174
		"""
175
		channel = RedisPrefix.generate_room(message.room_id)
176
		res = cls.create_message(message)
177
		res[VarNames.EVENT] = Actions.PRINT_MESSAGE
178
		res[VarNames.CHANNEL] = channel
179
		res[HandlerNames.NAME] = HandlerNames.CHAT
180
		return res
181
182
	@classmethod
183
	def get_messages(cls, messages, channel):
184
		"""
185
		:type messages: list[Messages]
186
		:type channel: str
187
		:type messages: QuerySet[Messages]
188
		"""
189
		return {
190
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
191
			VarNames.EVENT: Actions.GET_MESSAGES,
192
			VarNames.CHANNEL: channel,
193
			HandlerNames.NAME: HandlerNames.CHAT
194
		}
195
196
	@property
197
	def stored_redis_user(self):
198
		return  self.user_id
199
200
	@property
201
	def channel(self):
202
		return RedisPrefix.generate_user(self.user_id)
203
204
	def subscribe_direct_channel_message(self, room_id, other_user_id):
205
		return {
206
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
207
			VarNames.ROOM_ID: room_id,
208
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
209
			HandlerNames.NAME: HandlerNames.CHANNELS
210
		}
211
212
	def subscribe_room_channel_message(self, room_id, room_name):
213
		return {
214
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
215
			VarNames.ROOM_ID: room_id,
216
			VarNames.ROOM_USERS: [self.user_id],
217
			HandlerNames.NAME: HandlerNames.CHANNELS,
218
			VarNames.ROOM_NAME: room_name
219
		}
220
221
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
222
		return {
223
			VarNames.EVENT: Actions.INVITE_USER,
224
			VarNames.ROOM_ID: room_id,
225
			VarNames.USER_ID: user_id,
226
			HandlerNames.NAME: HandlerNames.CHANNELS,
227
			VarNames.ROOM_NAME: room_name,
228
			VarNames.CONTENT: users
229
		}
230
231
	def add_user_to_room(self, channel, user_id, content):
232
		return {
233
			VarNames.EVENT: Actions.ADD_USER,
234
			VarNames.CHANNEL: channel,
235
			VarNames.USER_ID: user_id,
236
			HandlerNames.NAME: HandlerNames.CHAT,
237
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
238
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
239
		}
240
241
	def unsubscribe_direct_message(self, room_id):
242
		return {
243
			VarNames.EVENT: Actions.DELETE_ROOM,
244
			VarNames.ROOM_ID: room_id,
245
			VarNames.USER_ID: self.user_id,
246
			HandlerNames.NAME: HandlerNames.CHANNELS,
247
			VarNames.TIME: get_milliseconds()
248
		}
249
250
	def load_offline_message(self, offline_messages, channel_key):
251
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
252
		res[VarNames.CHANNEL] = channel_key
253
		return res
254
255
256
class MessagesHandler(MessagesCreator):
257
258
	def __init__(self, *args, **kwargs):
259
		self.parsable_prefix = 'p'
260
		super(MessagesHandler, self).__init__(*args, **kwargs)
261
		self.id = id(self)
262
		self.log_id = str(self.id % 10000).rjust(4, '0')
263
		self.ip = None
264
		from chat import global_redis
265
		self.async_redis_publisher = global_redis.async_redis_publisher
266
		self.sync_redis = global_redis.sync_redis
267
		self.channels = []
268
		self.call_receiver_channel = None
269
		self.logger = None
270
		self.async_redis = tornadoredis.Client()
271
		self.pre_process_message = {
272
			Actions.GET_MESSAGES: self.process_get_messages,
273
			Actions.SEND_MESSAGE: self.process_send_message,
274
			Actions.CALL: self.process_call,
275
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
276
			Actions.DELETE_ROOM: self.delete_channel,
277
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
278
			Actions.INVITE_USER: self.invite_user,
279
		}
280
		self.post_process_message = {
281
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
282
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
283
			Actions.DELETE_ROOM: self.send_client_delete_channel,
284
			Actions.INVITE_USER: self.send_client_new_channel,
285
			Actions.CALL: self.set_opponent_call_channel
286
		}
287
288
	@tornado.gen.engine
289
	def listen(self, channels):
290
		yield tornado.gen.Task(
291
			self.async_redis.subscribe, channels)
292
		self.async_redis.listen(self.new_message)
293
294
	@tornado.gen.engine
295
	def add_channel(self, channel):
296
		self.channels.append(channel)
297
		yield tornado.gen.Task(
298
			self.async_redis.subscribe, channel)
299
300
	def do_db(self, callback, *args, **kwargs):
301
		try:
302
			return callback(*args, **kwargs)
303
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
304
			self.logger.warning('%s, reconnecting' % e)  # TODO
305
			connection.close()
306
			return callback(*args, **kwargs)
307
308
	def execute_query(self, query, *args, **kwargs):
309
		cursor = connection.cursor()
310
		cursor.execute(query, *args, **kwargs)
311
		return cursor.fetchall()
312
313
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
314
		"""
315
		:rtype : dict
316
		returns (dict, bool) if check_type is present
317
		"""
318
		online = self.sync_redis.hgetall(channel)
319
		self.logger.debug('!! redis online: %s', online)
320
		result = set()
321
		user_is_online = False
322
		# redis stores REDIS_USER_FORMAT, so parse them
323
		if online:
324
			for key_hash, raw_user_id in online.items():  # py2 iteritems
325
				user_id = int(raw_user_id.decode('utf-8'))
326
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
327
					user_is_online = True
328
				result.add(user_id)
329
		result = list(result)
330
		return (result, user_is_online) if check_user_id else result
331
332
	def add_online_user(self, room_id, offline_messages=None):
333
		"""
334
		adds to redis
335
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
336
		:return:
337
		"""
338
		channel_key = RedisPrefix.generate_room(room_id)
339
		online = self.get_online_from_redis(channel_key)
340
		self.async_redis_publisher.hset(channel_key, self.id, self.stored_redis_user)
341
		if self.user_id not in online:  # if a new tab has been opened
342
			online.append(self.user_id)
343
			online_user_names_mes = self.room_online(
344
				online,
345
				Actions.LOGIN,
346
				channel_key
347
			)
348
			self.logger.info('!! First tab, sending refresh online for all')
349
			self.publish(online_user_names_mes, channel_key)
350
			if offline_messages:
351
				self.safe_write(self.load_offline_message(offline_messages, channel_key))
352
		else:  # Send user names to self
353
			online_user_names_mes = self.room_online(
354
				online,
355
				Actions.REFRESH_USER,
356
				channel_key
357
			)
358
			self.logger.info('!! Second tab, retrieving online for self')
359
			self.safe_write(online_user_names_mes)
360
361
	def publish(self, message, channel=None, parsable=False):
362
		if channel is None:
363
			raise ValidationError('lolol')
364
		jsoned_mess = json.dumps(message)
365
		self.logger.debug('<%s> %s', channel, jsoned_mess)
366
		if parsable:
367
			jsoned_mess = self.encode(jsoned_mess)
368
		self.async_redis_publisher.publish(channel, jsoned_mess)
369
370
	def encode(self, message):
371
		"""
372
		Marks message with prefix to specify that
373
		it should be decoded and proccesed before sending to client
374
		@param message: message to mark
375
		@return: marked message
376
		"""
377
		return self.parsable_prefix + message
378
379
	def decode(self, message):
380
		"""
381
		Check if message should be proccessed by server before writing to client
382
		@param message: message to check
383
		@return: Object structure of message if it should be processed, None if not
384
		"""
385
		if message.startswith(self.parsable_prefix):
386
			return json.loads(message[1:])
387
388
	def new_message(self, message):
389
		data = message.body
390
		if type(data) is not int:  # subscribe event
391
			decoded = self.decode(data)
392
			if decoded:
393
				data = decoded
394
			self.safe_write(data)
395
			if decoded:
396
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
397
398
	def safe_write(self, message):
399
		raise NotImplementedError('WebSocketHandler implements')
400
401
	def process_send_message(self, message):
402
		"""
403
		:type message: dict
404
		"""
405
		channel = message[VarNames.CHANNEL]
406
		message_db = Message(
407
			sender_id=self.user_id,
408
			content=message[VarNames.CONTENT]
409
		)
410
		channel_id = RedisPrefix.extract_id(channel)
411
		message_db.room_id = channel_id
412
		if VarNames.IMG in message:
413
			message_db.img = extract_photo(message[VarNames.IMG])
414
		self.do_db(message_db.save)  # exit on hacked id with exception
415
		prepared_message = self.create_send_message(message_db)
416
		self.publish(prepared_message, channel)
417
418
	def process_call(self, in_message):
419
		"""
420
		:type in_message: dict
421
		"""
422
		call_type = in_message.get(VarNames.CALL_TYPE)
423
		set_opponent_channel = False
424
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
425
		if call_type == CallType.OFFER:
426
			to_channel = in_message[VarNames.CHANNEL]
427
			room_id = RedisPrefix.extract_id(to_channel)
428
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
429
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
430
			set_opponent_channel = True
431
			out_message[VarNames.CHANNEL] = to_channel
432
		# TODO
433
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
434
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
435
436
	def create_new_room(self, message):
437
		room_name = message[VarNames.ROOM_NAME]
438
		if not room_name or len(room_name) > 16:
439
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
440
		room = Room(name=room_name)
441
		self.do_db(room.save)
442
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
443
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
444
		self.publish(subscribe_message, self.channel, True)
445
446
	def invite_user(self, message):
447
		room_id = message[VarNames.ROOM_ID]
448
		user_id = message[VarNames.USER_ID]
449
		channel = RedisPrefix.generate_room(room_id)
450
		if channel not in self.channels:
451
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
452
		room = self.do_db(Room.objects.get, id=room_id)
453
		if room.is_private:
454
			raise ValidationError("You can't add users to direct room, create a new room instead")
455
		try:
456
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
457
		except IntegrityError:
458
			raise ValidationError("User is already in channel")
459
		users_in_room = {}
460
		for user in room.users.all():
461
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
462
		self.publish(self.add_user_to_room(channel, user_id, users_in_room[user_id]), channel)
463
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
464
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
465
466
	def create_user_channel(self, message):
467
		user_id = message[VarNames.USER_ID]
468
		# get all self private rooms ids
469
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
470
		# get private room that contains another user from rooms above
471
		query_res = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms).values('room__id', 'room__disabled')
472
		if len(query_res) > 0:
473
			room = query_res[0]
474
			room_id = room['room__id']
475
			disabled = room['room__disabled']
476
			if not disabled:
477
				raise ValidationError('This room already exist')
478
			else:
479
				Room.objects.filter(id=room_id).update(disabled=False)
480
		else:
481
			room = Room()
482
			room.save()
483
			room_id = room.id
484
			RoomUsers.objects.bulk_create([
485
				RoomUsers(user_id=user_id, room_id=room_id),
486
				RoomUsers(user_id=self.user_id, room_id=room_id),
487
			])
488
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
489
		self.publish(subscribe_message, self.channel, True)
490
		other_channel = RedisPrefix.generate_user(user_id)
491
		if self.channel != other_channel:
492
			self.publish(subscribe_message, other_channel, True)
493
494
	def delete_channel(self, message):
495
		room_id = message[VarNames.ROOM_ID]
496
		channel = RedisPrefix.generate_room(room_id)
497
		if channel not in self.channels or room_id == ALL_ROOM_ID:
498
			raise ValidationError('You are not allowed to exit this room')
499
		room = self.do_db(Room.objects.get, id=room_id)
500
		if room.disabled:
501
			raise ValidationError('Room is already deleted')
502
		if room.name is None:  # if private then disable
503
			room.disabled = True
504
		else: # if public -> leave the room, delete the link
505
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
506
			online = self.get_online_from_redis(channel)
507
			online.remove(self.user_id)
508
			self.publish(self.room_online(online, Actions.LOGOUT, channel), channel)
509
		room.save()
510
		message = self.unsubscribe_direct_message(room_id)
511
		self.publish(message, channel, True)
512
513
	def send_client_new_channel(self, message):
514
		room_id = message[VarNames.ROOM_ID]
515
		channel = RedisPrefix.generate_room(room_id)
516
		self.add_channel(channel)
517
		self.add_online_user(room_id)# TODO doesnt work if already subscribed
518
519
	def set_opponent_call_channel(self, message):
520
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
521
522
	def send_client_delete_channel(self, message):
523
		room_id = message[VarNames.ROOM_ID]
524
		channel = RedisPrefix.generate_room(room_id)
525
		self.async_redis.unsubscribe(channel)
526
		self.async_redis_publisher.hdel(channel, self.id)
527
		self.channels.remove(channel)
528
529
	def process_get_messages(self, data):
530
		"""
531
		:type data: dict
532
		"""
533
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
534
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
535
		channel = data[VarNames.CHANNEL]
536
		room_id = RedisPrefix.extract_id(channel)
537
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
538
		if header_id is None:
539
			messages = Message.objects.filter(Q(room_id=room_id)).order_by('-pk')[:count]
540
		else:
541
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id)).order_by('-pk')[:count]
542
		response = self.do_db(self.get_messages, messages, channel)
543
		self.safe_write(response)
544
545
	def get_offline_messages(self):
546
		res = {}
547
		offline_messages = Message.objects.filter(
548
			id__gt=F('room__roomusers__last_read_message_id'),
549
			room__roomusers__user_id=self.user_id
550
		)
551
		for message in offline_messages:
552
			res.setdefault(message.room_id, []).append(self.create_message(message))
553
		return res
554
555
	def get_users_in_current_user_rooms(self):
556
		"""
557
		{
558
			"ROOM_ID:1": {
559
				"name": "All",
560
				"users": {
561
					"USER_ID:admin": {
562
						"name": "USER_NAME:admin",
563
						"sex": "SEX:Secret"
564
					},
565
					"USER_ID_2": {
566
						"name": "USER_NAME:Mike",
567
						"sex": "Male"
568
					}
569
				},
570
				"isPrivate": true
571
			}
572
		}
573
		"""
574
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
575
		res = {room['id']: {
576
				VarNames.ROOM_NAME: room['name'],
577
				VarNames.ROOM_USERS: {}
578
			} for room in user_rooms}
579
		room_ids = (room_id for room_id in res)
580
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
581
		for user in rooms_users:
582
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
583
		return res
584
585
	def set_js_user_structure(self, user_dict, user_id, name, sex):
586
		user_dict[user_id] = {
587
			VarNames.USER: name,
588
			VarNames.GENDER: GENDERS[sex]
589
		}
590
591
	def save_ip(self):
592
		if (self.do_db(UserJoinedInfo.objects.filter(
593
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
594
			return
595
		ip_address = self.get_or_create_ip()
596
		UserJoinedInfo.objects.create(
597
			ip=ip_address,
598
			user_id=self.user_id
599
		)
600
601
	def get_or_create_ip(self):
602
		try:
603
			ip_address = IpAddress.objects.get(ip=self.ip)
604
		except IpAddress.DoesNotExist:
605
			try:
606
				if not api_url:
607
					raise Exception('api url is absent')
608
				self.logger.debug("Creating ip record %s", self.ip)
609
				f = urlopen(api_url % self.ip)
610
				raw_response = f.read().decode("utf-8")
611
				response = json.loads(raw_response)
612
				if response['status'] != "success":
613
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
614
				ip_address = IpAddress.objects.create(
615
					ip=self.ip,
616
					isp=response['isp'],
617
					country=response['country'],
618
					region=response['regionName'],
619
					city=response['city'],
620
					country_code=response['countryCode']
621
				)
622
			except Exception as e:
623
				self.logger.error("Error while creating ip with country info, because %s", e)
624
				ip_address = IpAddress.objects.create(ip=self.ip)
625
		return ip_address
626
627
628
class AntiSpam(object):
629
630
	def __init__(self):
631
		self.spammed = 0
632
		self.info = {}
633
634
	def check_spam(self, json_message):
635
		message_length = len(json_message)
636
		info_key = int(round(time.time() * 100))
637
		self.info[info_key] = message_length
638
		if message_length > MAX_MESSAGE_SIZE:
639
			self.spammed += 1
640
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
641
		self.check_timed_spam()
642
643
	def check_timed_spam(self):
644
		# TODO implement me
645
		pass
646
		# raise ValidationError("You're chatting too much, calm down a bit!")
647
648
649
class TornadoHandler(WebSocketHandler, MessagesHandler):
650
651
	def __init__(self, *args, **kwargs):
652
		super(TornadoHandler, self).__init__(*args, **kwargs)
653
		self.connected = False
654
		self.anti_spam = AntiSpam()
655
656
	def data_received(self, chunk):
657
		pass
658
659
	def on_message(self, json_message):
660
		try:
661
			if not self.connected:
662
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
663
			if not json_message:
664
				raise ValidationError('Skipping null message')
665
			# self.anti_spam.check_spam(json_message)
666
			self.logger.debug('<< %s', json_message)
667
			message = json.loads(json_message)
668
			if message[VarNames.EVENT] not in self.pre_process_message:
669
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
670
			channel = message.get(VarNames.CHANNEL)
671
			if channel and channel not in self.channels:
672
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
673
			self.pre_process_message[message[VarNames.EVENT]](message)
674
		except ValidationError as e:
675
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
676
			self.safe_write(error_message)
677
678
	def on_close(self):
679
		if self.async_redis.subscribed:
680
			self.async_redis.unsubscribe(self.channels)
681
		log_data = {}
682
		for channel in self.channels:
683
			if channel.startswith(RedisPrefix.ROOM_CHANNEL_PREFIX):
684
				self.sync_redis.hdel(channel, self.id)
685
				if self.connected:
686
					# seems like async solves problem with connection lost and wrong data status
687
					# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
688
					online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
689
					log_data[channel] = {'online': online, 'is_online': is_online}
690
					if not is_online:
691
						message = self.room_online(online, Actions.LOGOUT, channel)
692
						self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
693
						self.publish(message, channel)
694
695
		self.logger.info("Close connection result: %s", json.dumps(log_data))
696
		self.async_redis.disconnect()
697
698
	def open(self):
699
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
700
		if sessionStore.exists(session_key):
701
			self.ip = self.get_client_ip()
702
			session = SessionStore(session_key)
703
			self.user_id = int(session["_auth_user_id"])
704
			log_params = {
705
				'user_id': str(self.user_id).zfill(3),
706
				'id': self.log_id,
707
				'ip': self.ip
708
			}
709
			self.logger = logging.LoggerAdapter(logger, log_params)
710
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
711
			self.async_redis.connect()
712
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
713
			self.sender_name = user_db.username
714
			self.sex = user_db.sex_str
715
			user_rooms = self.get_users_in_current_user_rooms()
716
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
717
			# get all missed messages
718
			self.channels.clear()
719
			self.channels.append(self.channel)
720
			for room_id in user_rooms:
721
				self.channels.append(RedisPrefix.generate_room(room_id))
722
			self.listen(self.channels)
723
			off_messages = self.get_offline_messages()
724
			for room_id in user_rooms:
725
				self.add_online_user(room_id, off_messages.get(room_id))
726
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
727
			self.connected = True
728
			Thread(target=self.save_ip).start()
729
		else:
730
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
731
			self.close(403, "Session key %s has been rejected" % session_key)
732
733
	def check_origin(self, origin):
734
		"""
735
		check whether browser set domain matches origin
736
		"""
737
		parsed_origin = urlparse(origin)
738
		origin = parsed_origin.netloc
739
		origin_domain = origin.split(':')[0].lower()
740
		browser_set = self.request.headers.get("Host")
741
		browser_domain = browser_set.split(':')[0]
742
		return browser_domain == origin_domain
743
744
	def safe_write(self, message):
745
		"""
746
		Tries to send message, doesn't throw exception outside
747
		:type self: MessagesHandler
748
		"""
749
		# self.logger.debug('<< THREAD %s >>', os.getppid())
750
		try:
751
			if isinstance(message, dict):
752
				message = json.dumps(message)
753
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
754
				raise ValueError('Wrong message type : %s' % str(message))
755
			self.logger.debug(">> %s", message)
756
			self.write_message(message)
757
		except tornado.websocket.WebSocketClosedError as e:
758
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
759
760
	def get_client_ip(self):
761
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
762