Completed
Push — master ( cd0cbf...dcf2fd )
by Andrew
53s
created

TornadoHandler.get_client_ip()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
import json
2
import logging
3
import sys
4
import time
5
from time import mktime
6
7
import redis
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError
17
from django.db.models import Q
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.log_filters import id_generator
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ANONYMOUS_REDIS_ROOM
29
from chat.models import User, Message, Room
30
from chat.utils import check_user
31
32
PY3 = sys.version > '3'
33
34
user_cookie_name = settings.USER_COOKIE_NAME
35
36
ANONYMOUS_GENDER = 'Alien'
37
SESSION_USER_VAR_KEY = 'user_name'
38
39
MESSAGE_ID_VAR_NAME = 'id'
40
RECEIVER_USERNAME_VAR_NAME = 'receiverName'
41
RECEIVER_USERID_VAR_NAME = 'receiverId'
42
COUNT_VAR_NAME = 'count'
43
HEADER_ID_VAR_NAME = 'headerId'
44
USER_VAR_NAME = 'user'
45
USER_ID_VAR_NAME = 'userId'
46
TIME_VAR_NAME = 'time'
47
OLD_NAME_VAR_NAME = 'oldName'
48
IS_ANONYMOUS_VAR_NAME = 'anonymous'
49
CONTENT_VAR_NAME = 'content'
50
EVENT_VAR_NAME = 'action'
51
GENDER_VAR_NAME = 'sex'
52
53
REFRESH_USER_EVENT = 'onlineUsers'
54
SYSTEM_MESSAGE_EVENT = 'system'
55
GET_MESSAGES_EVENT = 'messages'
56
GET_MINE_USERNAME_EVENT = 'me'
57
ROOMS_EVENT = 'rooms'  # thread ex "main" , channel ex. 'r:main', "i:3"
58
LOGIN_EVENT = 'joined'
59
LOGOUT_EVENT = 'left'
60
SEND_MESSAGE_EVENT = 'send'
61
CHANGE_ANONYMOUS_NAME_EVENT = 'changed'
62
63
REDIS_USERNAME_CHANNEL_PREFIX = 'u:%s'
64
REDIS_USERID_CHANNEL_PREFIX = 'i:%s'
65
REDIS_ROOM_CHANNEL_PREFIX = 'r:%d'
66
REDIS_ONLINE_USERS = "online_users"
67
68
69
# global connection to read synchronously
70
sync_redis = redis.StrictRedis()
71
# Redis connection cannot be shared between publishers and subscribers.
72
async_redis_publisher = tornadoredis.Client()
73
async_redis_publisher.connect()
74
sync_redis.delete(REDIS_ONLINE_USERS)  # TODO move it somewhere else
75
76
try:
77
	anonymous_default_room = Room.objects.get(name=ANONYMOUS_REDIS_ROOM)
78
except Room.DoesNotExist:
79
	anonymous_default_room = Room()
80
	anonymous_default_room.name = ANONYMOUS_REDIS_ROOM
81
	anonymous_default_room.save()
82
83
ANONYMOUS_REDIS_CHANNEL = REDIS_ROOM_CHANNEL_PREFIX % anonymous_default_room.id
84
ANONYMOUS_ROOM_NAMES = {anonymous_default_room.id: anonymous_default_room.name}
85
86
sessionStore = SessionStore()
87
88
logger = logging.getLogger(__name__)
89
90
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
91
CONNECTION_POOL = tornadoredis.ConnectionPool(
92
	max_connections=500,
93
	wait_for_available=True)
94
95
96
class MessagesCreator(object):
97
98
	def __init__(self, *args, **kwargs):
99
		super(MessagesCreator, self).__init__(*args, **kwargs)
100
		self.sex = ANONYMOUS_GENDER
101
		self.sender_name = None
102
		self.user_id = 0  # anonymous by default
103
104
	def online_user_names(self, user_names_dict, action):
105
		"""
106
		:type user_names_dict: dict
107
		:return: { Nick: male, NewName: alien, Joana: female}
108
		"""
109
		default_message = self.default(user_names_dict, action)
110
		default_message.update({
111
			USER_VAR_NAME: self.sender_name,
112
			IS_ANONYMOUS_VAR_NAME: self.sex == ANONYMOUS_GENDER
113
		})
114
		return default_message
115
116
	def change_user_nickname(self, old_nickname, online):
117
		"""
118
		set self.sender_name to new nickname before call it
119
		:return: {action : changed, content: { Nick: male, NewName: alien}, oldName : OldName, user: NewName}
120
		:type old_nickname: str
121
		:type online: dict
122
		"""
123
		default_message = self.online_user_names(online, CHANGE_ANONYMOUS_NAME_EVENT)
124
		default_message[OLD_NAME_VAR_NAME] = old_nickname
125
		return default_message
126
127
	@classmethod
128
	def default(cls, content, event=SYSTEM_MESSAGE_EVENT):
129
		"""
130
		:return: {"action": event, "content": content, "time": "20:48:57"}
131
		"""
132
		return {
133
			EVENT_VAR_NAME: event,
134
			CONTENT_VAR_NAME: content,
135
			TIME_VAR_NAME: cls.get_miliseconds()
136
		}
137
138
	@classmethod
139
	def create_send_message(cls, message):
140
		"""
141
		:type message: Message
142
		"""
143
		result = cls.get_message(message)
144
		result[EVENT_VAR_NAME] = SEND_MESSAGE_EVENT
145
		return result
146
147
	@classmethod
148
	def get_message(cls, message):
149
		"""
150
		:param message:
151
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
152
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
153
		"""
154
		result = {
155
			USER_VAR_NAME: message.sender.username,
156
			USER_ID_VAR_NAME: message.sender.id,
157
			CONTENT_VAR_NAME: message.content,
158
			TIME_VAR_NAME: cls.get_miliseconds(message),
159
			MESSAGE_ID_VAR_NAME: message.id,
160
		}
161
		if message.receiver is not None:
162
			result[RECEIVER_USERNAME_VAR_NAME] = message.receiver.username
163
		return result
164
165
	@classmethod
166
	def get_messages(cls, messages):
167
		"""
168
		:type messages: list[Messages]
169
		:type messages: QuerySet[Messages]
170
		"""
171
		return {
172
			CONTENT_VAR_NAME: [cls.create_send_message(message) for message in messages],
173
			EVENT_VAR_NAME: GET_MESSAGES_EVENT
174
		}
175
176
	def send_anonymous(self, content, receiver_anonymous, receiver_id):
177
		default_message = self.default(content, SEND_MESSAGE_EVENT)
178
		default_message[USER_VAR_NAME] = self.sender_name
179
		if receiver_anonymous is not None:
180
			default_message[RECEIVER_USERNAME_VAR_NAME] = receiver_anonymous
181
			default_message[RECEIVER_USERID_VAR_NAME] = receiver_id
182
		return default_message
183
184
	@property
185
	def stored_redis_user(self):
186
		return '%s:%s:%d' % (self.sender_name, self.sex, self.user_id)
187
188
	@property
189
	def channel(self):
190
		if self.user_id == 0:
191
			return REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name
192
		else:
193
			return REDIS_USERID_CHANNEL_PREFIX % self.user_id
194
195
	@staticmethod
196
	def online_js_structure(name, sex, user_id):
197
		return {
198
			name: {
199
				GENDER_VAR_NAME: sex,
200
				USER_ID_VAR_NAME: user_id
201
			}
202
		}
203
204
	@property
205
	def online_self_js_structure(self):
206
		return self.online_js_structure(self.sender_name, self.sex, self.user_id)
207
208
	@staticmethod
209
	def get_miliseconds(dt=None):
210
		if dt is None:
211
			return int(time.time()*1000)
212
		if dt.time.timestamp:
213
			return int(dt.time.timestamp()*1000)
214
		else:
215
			return mktime(dt.time.timetuple())*1000 + int(dt.time.microsecond/1000),
216
217
218
class MessagesHandler(MessagesCreator):
219
220
	def __init__(self, *args, **kwargs):
221
		super(MessagesHandler, self).__init__(*args, **kwargs)
222
		self.log_id = str(id(self) % 10000).rjust(4, '0')
223
		log_params = {
224
			'username': '00000000',
225
			'id': self.log_id,
226
			'ip': 'initializing'
227
		}
228
		self.logger = logging.LoggerAdapter(logger, log_params)
229
		self.async_redis = tornadoredis.Client()
230
		self.process_message = {
231
			GET_MINE_USERNAME_EVENT: self.process_change_username,
232
			GET_MESSAGES_EVENT: self.process_get_messages,
233
			SEND_MESSAGE_EVENT: self.process_send_message,
234
		}
235
236
	def do_db(self, callback, *arg, **args):
237
		try:
238
			return callback(*arg, **args)
239
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
240
			self.logger.warning('%s, reconnecting' % e)  # TODO
241
			connection.close()
242
			return callback( *arg, **args)
243
244
	def get_online_from_redis(self, check_name=None, check_id=None):
245
		"""
246
		:rtype : dict
247
		returns (dict, bool) if check_type is present
248
		"""
249
		online = sync_redis.hgetall(REDIS_ONLINE_USERS)
250
		self.logger.debug('!! redis online: %s', online)
251
		result = {}
252
		user_is_online = False
253
		# redis stores REDIS_USER_FORMAT, so parse them
254
		if online:
255
			for key, raw_user_sex in online.items():  # py2 iteritems
256
				(name, sex, user_id) = raw_user_sex.decode('utf-8').split(':')
257
				if name == check_name and check_id != int(key.decode('utf-8')):
258
					user_is_online = True
259
				result.update(self.online_js_structure(name, sex, user_id))
260
		if check_id:
261
			return result, user_is_online
262
		else:
263
			return result
264
265
	def add_online_user(self):
266
		"""
267
		adds to redis
268
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
269
		:return:
270
		"""
271
		online = self.get_online_from_redis()
272
		async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user)
273
		first_tab = False
274
		if self.sender_name not in online:  # if a new tab has been opened
275
			online.update(self.online_self_js_structure)
276
			first_tab = True
277
278
		if first_tab:  # Login event, sent user names to all
279
			online_user_names_mes = self.online_user_names(online, LOGIN_EVENT)
280
			self.logger.info('!! First tab, sending refresh online for all')
281
			self.publish(online_user_names_mes)
282
		else:  # Send user names to self
283
			online_user_names_mes = self.online_user_names(online, REFRESH_USER_EVENT)
284
			self.logger.info('!! Second tab, retrieving online for self')
285
			self.safe_write(online_user_names_mes)
286
		# send usernamechat
287
		username_message = self.default(self.sender_name, GET_MINE_USERNAME_EVENT)
288
		self.safe_write(username_message)
289
290
	def set_username(self, session_key):
291
		"""
292
		Case registered: Fetch userName and its channels from database. returns them
293
		Case anonymous: generates a new name and saves it to session. returns default channel
294
		:return: channels user should subscribe
295
		"""
296
		session = SessionStore(session_key)
297
		try:
298
			self.user_id = int(session["_auth_user_id"])
299
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
300
			self.sender_name = user_db.username
301
			self.sex = user_db.sex_str
302
			rooms = user_db.rooms.all()  # do_db is used already
303
			room_names = {}
304
			channels = [self.channel, ]
305
			for room in rooms:
306
				room_names[room.id] = room.name
307
				channels.append(REDIS_ROOM_CHANNEL_PREFIX % room.id)
308
			rooms_message = self.default(room_names, ROOMS_EVENT)
309
			self.logger.info("!! User %s subscribes for %s", self.sender_name, room_names)
310
		except (KeyError, User.DoesNotExist):
311
			# Anonymous
312
			self.sender_name = session.get(SESSION_USER_VAR_KEY)
313
			if self.sender_name is None:
314
				self.sender_name = id_generator(8)
315
				session[SESSION_USER_VAR_KEY] = self.sender_name
316
				session.save()
317
				self.logger.info("!! A new user log in, created username %s", self.sender_name)
318
			else:
319
				self.logger.info("!! Anonymous with name %s has logged", self.sender_name)
320
			channels = [ANONYMOUS_REDIS_CHANNEL, self.channel]
321
			rooms_message = self.default(ANONYMOUS_ROOM_NAMES, ROOMS_EVENT)
322
		finally:
323
			self.safe_write(rooms_message)
324
			return channels
325
326
	def publish(self, message, channel=ANONYMOUS_REDIS_CHANNEL):
327
		jsoned_mess = json.dumps(message)
328
		self.logger.debug('<%s> %s', channel, jsoned_mess)
329
		async_redis_publisher.publish(channel, jsoned_mess)
330
331
	# TODO really parse every single message for 1 action?
332
	def check_and_finish_change_name(self, message):
333
		if self.sex == ANONYMOUS_GENDER:
334
			parsed_message = json.loads(message)
335
			if parsed_message[EVENT_VAR_NAME] == GET_MINE_USERNAME_EVENT:
336
				self.async_redis.unsubscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name)  # TODO is it allowed?
337
				self.sender_name = parsed_message[CONTENT_VAR_NAME]
338
				self.async_redis.subscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name)
339
				async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user)
340
341
	def new_message(self, message):
342
		if type(message.body) is not int:  # subscribe event
343
			self.safe_write(message.body)
344
			self.check_and_finish_change_name(message.body)
345
346
	def safe_write(self, message):
347
		raise NotImplementedError('WebSocketHandler implements')
348
349
	def process_send_message(self, message):
350
		"""
351
		:type message: dict
352
		"""
353
		content = message[CONTENT_VAR_NAME]
354
		receiver_id = message.get(RECEIVER_USERID_VAR_NAME)  # if receiver_id is None then its a private message
355
		receiver_name = message.get(RECEIVER_USERNAME_VAR_NAME)
356
		self.logger.info('!! Sending message %s to username:%s, id:%s', content, receiver_name, receiver_id)
357
		save_to_db = True
358
		if receiver_id is not None and receiver_id != 0:
359
			receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id
360
		elif receiver_name is not None:
361
			receiver_channel = REDIS_USERNAME_CHANNEL_PREFIX % receiver_name
362
			save_to_db = False
363
364
		if self.user_id != 0 and save_to_db:
365
			self.logger.debug('!! Saving it to db')
366
			message_db = Message(sender_id=self.user_id, content=content, receiver_id=receiver_id)
367
			self.do_db(message_db.save)  # exit on hacked id with exception
368
			prepared_message = self.create_send_message(message_db)
369
		else:
370
			self.logger.debug('!! NOT saving it')
371
			prepared_message = self.send_anonymous(content, receiver_name, receiver_id)
372
373
		if receiver_id is None:
374
			self.logger.debug('!! Detected as public')
375
			self.publish(prepared_message)
376
		else:
377
			self.publish(prepared_message, self.channel)
378
			self.logger.debug('!! Detected as private, channel %s', receiver_channel)
379
			if receiver_channel != self.channel:
380
				self.publish(prepared_message, receiver_channel)
381
382
	def process_change_username(self, message):
383
		"""
384
		:type message: dict
385
		"""
386
		self.logger.info('!! Changing username to %s', message[CONTENT_VAR_NAME])
387
		new_username = message[CONTENT_VAR_NAME]
388
		try:
389
			check_user(new_username)
390
			online = self.get_online_from_redis()
391
			if new_username in online:
392
				self.logger.info('!! This name is already used')
393
				raise ValidationError('Anonymous already has this name')
394
			session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
395
			session = SessionStore(session_key)
396
			session[SESSION_USER_VAR_KEY] = new_username
397
			session.save()
398
399
			del online[self.sender_name]
400
			old_name = self.sender_name
401
			old_channel = self.channel
402
			self.sender_name = new_username  # change_user_name required new_username in sender_name
403
			online.update(self.online_self_js_structure)
404
			message_all = self.change_user_nickname(old_name, online)
405
			message_me = self.default(new_username, GET_MINE_USERNAME_EVENT)
406
			# TODO perform ton of checks or emit twice ?
407
			self.publish(message_me, self.channel)  # to new user channel
408
			self.publish(message_me, old_channel)  # to old user channel
409
			self.publish(message_all)
410
		except ValidationError as e:
411
			self.safe_write(self.default(str(e.message)))
412
413
	def process_get_messages(self, data):
414
		"""
415
		:type data: dict
416
		"""
417
		header_id = data.get(HEADER_ID_VAR_NAME, None)
418
		count = int(data.get(COUNT_VAR_NAME, 10))
419
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
420
		if header_id is None:
421
			messages = Message.objects.filter(
422
				Q(receiver=None)  # Only public
423
				| Q(sender=self.user_id)  # private s
424
				| Q(receiver=self.user_id)  # and private
425
			).order_by('-pk')[:count]
426
		else:
427
			messages = Message.objects.filter(
428
				Q(id__lt=header_id),
429
				Q(receiver=None)
430
				| Q(sender=self.user_id)
431
				| Q(receiver=self.user_id)
432
			).order_by('-pk')[:count]
433
		response = self.do_db(self.get_messages, messages)
434
		self.safe_write(response)
435
436
437
class AntiSpam:
438
439
	def __init__(self):
440
		self.spammed = 0
441
		self.info = {}
442
443
	def check_spam(self, json_message):
444
		message_length = len(json_message)
445
		info_key = int(round(time.time() * 100))
446
		self.info[info_key] = message_length
447
		if message_length > MAX_MESSAGE_SIZE:
448
			self.spammed += 1
449
			raise ValidationError("Message can't exceed %s symbols" % MAX_MESSAGE_SIZE)
450
		self.check_timed_spam()
451
452
	def check_timed_spam(self):
453
		# TODO implement me
454
		pass
455
		# raise ValidationError("You're chatting too much, calm down a bit!")
456
457
458
class TornadoHandler(WebSocketHandler, MessagesHandler):
459
460
	def __init__(self, *args, **kwargs):
461
		super(TornadoHandler, self).__init__(*args, **kwargs)
462
		self.connected = False
463
		self.anti_spam = AntiSpam()
464
465
	@tornado.gen.engine
466
	def listen(self, channels):
467
		"""
468
		self.channel should been set before calling
469
		"""
470
		yield tornado.gen.Task(
471
			self.async_redis.subscribe, channels)
472
		self.async_redis.listen(self.new_message)
473
474
	def data_received(self, chunk):
475
		pass
476
477
	def on_message(self, json_message):
478
		try:
479
			if not self.connected:
480
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
481
			if not json_message:
482
				raise ValidationError('Skipping null message')
483
			self.anti_spam.check_spam(json_message)
484
			self.logger.debug('<< %s', json_message)
485
			message = json.loads(json_message)
486
			self.process_message[message[EVENT_VAR_NAME]](message)
487
		except ValidationError as e:
488
			logger.warning("Message won't be send. Reason: %s", e.message)
489
			self.safe_write(self.default(e.message))
490
491
	def on_close(self):
492
		try:
493
			self_id = id(self)
494
			async_redis_publisher.hdel(REDIS_ONLINE_USERS, self_id)
495
			if self.connected:
496
				# seems like async solves problem with connection lost and wrong data status
497
				# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
498
				online, is_online = self.get_online_from_redis(self.sender_name, self_id)
499
				self.logger.info('!! Closing connection, redis current online %s', online)
500
				if not is_online:
501
					message = self.online_user_names(online, LOGOUT_EVENT)
502
					self.logger.debug('!! User closed the last tab, refreshing online for all')
503
					self.publish(message)
504
				else:
505
					self.logger.debug('!! User is still online in other tabs')
506
			else:
507
				self.logger.warning('Dropping connection for not connected user')
508
		finally:
509
			if self.async_redis.subscribed:
510
				#  TODO unsubscribe of all subscribed                  !IMPORTANT
511
				self.async_redis.unsubscribe([
512
					ANONYMOUS_REDIS_CHANNEL,
513
					self.channel
514
				])
515
			self.async_redis.disconnect()
516
517
	def open(self, *args, **kargs):
518
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
519
		if sessionStore.exists(session_key):
520
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, id(self))
521
			self.async_redis.connect()
522
			channels = self.set_username(session_key)
523
			log_params = {
524
				'username': self.sender_name.rjust(8),
525
				'id': self.log_id,
526
				'ip': self.get_client_ip()
527
			}
528
			self.logger = logging.LoggerAdapter(logger, log_params)
529
			self.listen(channels)
530
			self.add_online_user()
531
			self.connected = True
532
		else:
533
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
534
			self.close(403, "Session key %s has been rejected" % session_key)
535
536
	def check_origin(self, origin):
537
		"""
538
		check whether browser set domain matches origin
539
		"""
540
		parsed_origin = urlparse(origin)
541
		origin = parsed_origin.netloc
542
		origin_domain = origin.split(':')[0].lower()
543
		browser_set = self.request.headers.get("Host")
544
		browser_domain = browser_set.split(':')[0]
545
		return browser_domain == origin_domain
546
547
	def safe_write(self, message):
548
		"""
549
		Tries to send message, doesn't throw exception outside
550
		:type self: MessagesHandler
551
		"""
552
		try:
553
			if isinstance(message, dict):
554
				message = json.dumps(message)
555
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
556
				raise ValueError('Wrong message type : %s' % str(message))
557
			self.logger.debug(">> %s", message)
558
			self.write_message(message)
559
		except tornado.websocket.WebSocketClosedError as e:
560
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
561
562
	def get_client_ip(self):
563
		x_real_ip = self.request.headers.get("X-Real-IP")
564
		return x_real_ip or self.request.remote_ip
565
566
application = tornado.web.Application([
567
	(r'.*', TornadoHandler),
568
])
569