Completed
Push — master ( 6ca66a...a573ac )
by Andrew
01:29
created

MessagesHandler.do_db()   A

Complexity

Conditions 2

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 1 Features 1
Metric Value
cc 2
c 3
b 1
f 1
dl 0
loc 7
rs 9.4285
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError
17
from django.db.models import Q
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
try:
22
	from urllib.parse import urlparse  # py2
23
except ImportError:
24
	from urlparse import urlparse  # py3
25
26
from chat.settings import MAX_MESSAGE_SIZE, ALL_REDIS_ROOM
27
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo
28
29
PY3 = sys.version > '3'
30
31
user_cookie_name = settings.USER_COOKIE_NAME
32
api_url = getattr(settings, "IP_API_URL", None)
33
34
SESSION_USER_VAR_KEY = 'user_name'
35
36
MESSAGE_ID_VAR_NAME = 'id'
37
RECEIVER_USERNAME_VAR_NAME = 'receiverName'
38
RECEIVER_USERID_VAR_NAME = 'receiverId'
39
CALL_TYPE_VAR_NAME = 'type'
40
COUNT_VAR_NAME = 'count'
41
HEADER_ID_VAR_NAME = 'headerId'
42
USER_VAR_NAME = 'user'
43
USER_ID_VAR_NAME = 'userId'
44
TIME_VAR_NAME = 'time'
45
OLD_NAME_VAR_NAME = 'oldName'
46
CONTENT_VAR_NAME = 'content'
47
EVENT_VAR_NAME = 'action'
48
GENDER_VAR_NAME = 'sex'
49
50
REFRESH_USER_EVENT = 'onlineUsers'
51
SYSTEM_MESSAGE_EVENT = 'system'
52
GROWL_MESSAGE_EVENT = 'growl'
53
GET_MESSAGES_EVENT = 'messages'
54
ROOMS_EVENT = 'rooms'  # thread ex "main" , channel ex. 'r:main', "i:3"
55
LOGIN_EVENT = 'joined'
56
LOGOUT_EVENT = 'left'
57
SEND_MESSAGE_EVENT = 'send'
58
WEBRTC_EVENT = 'webrtc'
59
CALL_EVENT = 'call'
60
61
REDIS_USERID_CHANNEL_PREFIX = 'i:%s'
62
REDIS_ROOM_CHANNEL_PREFIX = 'r:%d'
63
REDIS_ONLINE_USERS = "online_users"
64
65
(default_room, c)  = Room.objects.get_or_create(name=ALL_REDIS_ROOM)
66
67
ANONYMOUS_REDIS_CHANNEL = REDIS_ROOM_CHANNEL_PREFIX % default_room.id
68
ANONYMOUS_ROOM_NAMES = {default_room.id: default_room.name}
69
70
sessionStore = SessionStore()
71
72
logger = logging.getLogger(__name__)
73
74
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
75
CONNECTION_POOL = tornadoredis.ConnectionPool(
76
	max_connections=500,
77
	wait_for_available=True)
78
79
80
class MessagesCreator(object):
81
82
	def __init__(self, *args, **kwargs):
83
		super(MessagesCreator, self).__init__(*args, **kwargs)
84
		self.sex = None
85
		self.sender_name = None
86
		self.user_id = 0  # anonymous by default
87
88
	def default(self, content, event=SYSTEM_MESSAGE_EVENT):
89
		"""
90
		:return: {"action": event, "content": content, "time": "20:48:57"}
91
		"""
92
		return {
93
			EVENT_VAR_NAME: event,
94
			CONTENT_VAR_NAME: content,
95
			USER_ID_VAR_NAME: self.user_id,
96
			USER_VAR_NAME: self.sender_name,
97
			TIME_VAR_NAME: get_milliseconds()
98
		}
99
100
	def offer_call(self, content, message_type):
101
		"""
102
		:return: {"action": "call", "content": content, "time": "20:48:57"}
103
		"""
104
		message = self.default(content, CALL_EVENT)
105
		message[CALL_TYPE_VAR_NAME] = message_type
106
		return message
107
108
	@classmethod
109
	def create_send_message(cls, message):
110
		"""
111
		:type message: Message
112
		"""
113
		result = cls.get_message(message)
114
		result[EVENT_VAR_NAME] = SEND_MESSAGE_EVENT
115
		return result
116
117
	@classmethod
118
	def get_message(cls, message):
119
		"""
120
		:param message:
121
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
122
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
123
		"""
124
		result = {
125
			USER_VAR_NAME: message.sender.username,
126
			USER_ID_VAR_NAME: message.sender.id,
127
			CONTENT_VAR_NAME: message.content,
128
			TIME_VAR_NAME: message.time,
129
			MESSAGE_ID_VAR_NAME: message.id,
130
		}
131
		if message.receiver is not None:
132
			result[RECEIVER_USERID_VAR_NAME] = message.receiver.id
133
			result[RECEIVER_USERNAME_VAR_NAME] = message.receiver.username
134
		return result
135
136
	@classmethod
137
	def get_messages(cls, messages):
138
		"""
139
		:type messages: list[Messages]
140
		:type messages: QuerySet[Messages]
141
		"""
142
		return {
143
			CONTENT_VAR_NAME: [cls.create_send_message(message) for message in messages],
144
			EVENT_VAR_NAME: GET_MESSAGES_EVENT
145
		}
146
147
	@property
148
	def stored_redis_user(self):
149
		return '%s:%s:%d' % (self.sender_name, self.sex, self.user_id)
150
151
	@property
152
	def channel(self):
153
		return REDIS_USERID_CHANNEL_PREFIX % self.user_id
154
155
	@staticmethod
156
	def online_js_structure(name, sex, user_id):
157
		return {
158
			name: {
159
				GENDER_VAR_NAME: sex,
160
				USER_ID_VAR_NAME: user_id
161
			}
162
		}
163
164
	@property
165
	def online_self_js_structure(self):
166
		return self.online_js_structure(self.sender_name, self.sex, self.user_id)
167
168
169
class MessagesHandler(MessagesCreator):
170
171
	def __init__(self, *args, **kwargs):
172
		super(MessagesHandler, self).__init__(*args, **kwargs)
173
		self.log_id = str(id(self) % 10000).rjust(4, '0')
174
		self.ip = None
175
		log_params = {
176
			'username': '00000000',
177
			'id': self.log_id,
178
			'ip': 'initializing'
179
		}
180
		self.logger = logging.LoggerAdapter(logger, log_params)
181
		self.async_redis = tornadoredis.Client()
182
		self.process_message = {
183
			GET_MESSAGES_EVENT: self.process_get_messages,
184
			SEND_MESSAGE_EVENT: self.process_send_message,
185
			CALL_EVENT: self.process_call
186
		}
187
188
	def do_db(self, callback, *arg, **args):
189
		try:
190
			return callback(*arg, **args)
191
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
192
			self.logger.warning('%s, reconnecting' % e)  # TODO
193
			connection.close()
194
			return callback(*arg, **args)
195
196
	def get_online_from_redis(self, check_name=None, check_id=None):
197
		"""
198
		:rtype : dict
199
		returns (dict, bool) if check_type is present
200
		"""
201
		online = self.sync_redis.hgetall(REDIS_ONLINE_USERS)
202
		self.logger.debug('!! redis online: %s', online)
203
		result = {}
204
		user_is_online = False
205
		# redis stores REDIS_USER_FORMAT, so parse them
206
		if online:
207
			for key, raw_user_sex in online.items():  # py2 iteritems
208
				(name, sex, user_id) = raw_user_sex.decode('utf-8').split(':')
209
				if name == check_name and check_id != int(key.decode('utf-8')):
210
					user_is_online = True
211
				result.update(self.online_js_structure(name, sex, user_id))
212
		return (result, user_is_online) if check_id else result
213
214
	def add_online_user(self):
215
		"""
216
		adds to redis
217
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
218
		:return:
219
		"""
220
		online = self.get_online_from_redis()
221
		self.async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user)
222
		if self.sender_name not in online:  # if a new tab has been opened
223
			online.update(self.online_self_js_structure)
224
			online_user_names_mes = self.default(online, LOGIN_EVENT)
225
			self.logger.info('!! First tab, sending refresh online for all')
226
			self.publish(online_user_names_mes)
227
		else:  # Send user names to self
228
			online_user_names_mes = self.default(online, REFRESH_USER_EVENT)
229
			self.logger.info('!! Second tab, retrieving online for self')
230
			self.safe_write(online_user_names_mes)
231
232
	def set_username(self, session_key):
233
		"""
234
		Case registered: Fetch userName and its channels from database. returns them
235
		:return: channels user should subscribe
236
		"""
237
		session = SessionStore(session_key)
238
		self.user_id = int(session["_auth_user_id"])
239
		user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
240
		self.sender_name = user_db.username
241
		self.sex = user_db.sex_str
242
		rooms = user_db.rooms.all()  # do_db is used already
243
		room_names = {}
244
		channels = [self.channel, ]
245
		for room in rooms:
246
			room_names[room.id] = room.name
247
			channels.append(REDIS_ROOM_CHANNEL_PREFIX % room.id)
248
		rooms_message = self.default(room_names, ROOMS_EVENT)
249
		self.logger.info("!! User %s subscribes for %s", self.sender_name, room_names)
250
		self.safe_write(rooms_message)
251
		return channels
252
253
	def publish(self, message, channel=ANONYMOUS_REDIS_CHANNEL):
254
		jsoned_mess = json.dumps(message)
255
		self.logger.debug('<%s> %s', channel, jsoned_mess)
256
		self.async_redis_publisher.publish(channel, jsoned_mess)
257
258
	def new_message(self, message):
259
		if type(message.body) is not int:  # subscribe event
260
			self.safe_write(message.body)
261
262
	def safe_write(self, message):
263
		raise NotImplementedError('WebSocketHandler implements')
264
265
	def process_send_message(self, message):
266
		"""
267
		:type message: dict
268
		"""
269
		content = message[CONTENT_VAR_NAME]
270
		receiver_id = message.get(RECEIVER_USERID_VAR_NAME)  # if receiver_id is None then its a private message
271
		self.logger.info('!! Sending message %s to user with id %s', content, receiver_id)
272
		receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id
273
		message_db = Message(sender_id=self.user_id, content=content, receiver_id=receiver_id)
274
		self.do_db(message_db.save)  # exit on hacked id with exception
275
		prepared_message = self.create_send_message(message_db)
276
		if receiver_id is None:
277
			self.logger.debug('!! Detected as public')
278
			self.publish(prepared_message)
279
		else:
280
			self.publish(prepared_message, self.channel)
281
			self.logger.debug('!! Detected as private, channel %s', receiver_channel)
282
			if receiver_channel != self.channel:
283
				self.publish(prepared_message, receiver_channel)
284
285
	def process_call(self, message):
286
			"""
287
			:type message: dict
288
			"""
289
			receiver_id = message.get(RECEIVER_USERID_VAR_NAME)  # if receiver_id is None then its a private message
290
			self.logger.info('!! Offering a call to user with id %s',  receiver_id)
291
			message = self.offer_call(message.get(CONTENT_VAR_NAME), message.get(CALL_TYPE_VAR_NAME))
292
			self.publish(message, REDIS_USERID_CHANNEL_PREFIX % receiver_id)
293
294
	def process_get_messages(self, data):
295
		"""
296
		:type data: dict
297
		"""
298
		header_id = data.get(HEADER_ID_VAR_NAME, None)
299
		count = int(data.get(COUNT_VAR_NAME, 10))
300
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
301
		if header_id is None:
302
			messages = Message.objects.filter(
303
				# Only public or private or private
304
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
305
			).order_by('-pk')[:count]
306
		else:
307
			messages = Message.objects.filter(
308
				Q(id__lt=header_id),
309
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
310
			).order_by('-pk')[:count]
311
		response = self.do_db(self.get_messages, messages)
312
		self.safe_write(response)
313
314
	def save_ip(self):
315
		if (self.do_db(UserJoinedInfo.objects.filter(
316
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
317
			return
318
		ip_address = self.get_or_create_ip()
319
		UserJoinedInfo.objects.create(
320
			ip=ip_address,
321
			user_id=self.user_id
322
		)
323
324
	def get_or_create_ip(self):
325
		try:
326
			ip_address = IpAddress.objects.get(ip=self.ip)
327
		except IpAddress.DoesNotExist:
328
			try:
329
				if not api_url:
330
					raise Exception('api url is absent')
331
				self.logger.debug("Creating ip record %s", self.ip)
332
				f = urlopen(api_url % self.ip)
333
				raw_response = f.read().decode("utf-8")
334
				response = json.loads(raw_response)
335
				if response['status'] != "success":
336
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
337
				ip_address = IpAddress.objects.create(
338
					ip=self.ip,
339
					isp=response['isp'],
340
					country=response['country'],
341
					region=response['regionName'],
342
					city=response['city'],
343
					country_code=response['countryCode']
344
				)
345
			except Exception as e:
346
				self.logger.error("Error while creating ip with country info, because %s", e)
347
				ip_address = IpAddress.objects.create(ip=self.ip)
348
		return ip_address
349
350
351
class AntiSpam(object):
352
353
	def __init__(self):
354
		self.spammed = 0
355
		self.info = {}
356
357
	def check_spam(self, json_message):
358
		message_length = len(json_message)
359
		info_key = int(round(time.time() * 100))
360
		self.info[info_key] = message_length
361
		if message_length > MAX_MESSAGE_SIZE:
362
			self.spammed += 1
363
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
364
		self.check_timed_spam()
365
366
	def check_timed_spam(self):
367
		# TODO implement me
368
		pass
369
		# raise ValidationError("You're chatting too much, calm down a bit!")
370
371
372
class TornadoHandler(WebSocketHandler, MessagesHandler):
373
374
	def __init__(self, *args, **kwargs):
375
		super(TornadoHandler, self).__init__(*args, **kwargs)
376
		self.connected = False
377
		self.anti_spam = AntiSpam()
378
		from chat import global_redis
379
		self.async_redis_publisher = global_redis.async_redis_publisher
380
		self.sync_redis = global_redis.sync_redis
381
382
	@tornado.gen.engine
383
	def listen(self, channels):
384
		"""
385
		self.channel should been set before calling
386
		"""
387
		yield tornado.gen.Task(
388
			self.async_redis.subscribe, channels)
389
		self.async_redis.listen(self.new_message)
390
391
	def data_received(self, chunk):
392
		pass
393
394
	def on_message(self, json_message):
395
		try:
396
			if not self.connected:
397
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
398
			if not json_message:
399
				raise ValidationError('Skipping null message')
400
			self.anti_spam.check_spam(json_message)
401
			self.logger.debug('<< %s', json_message)
402
			message = json.loads(json_message)
403
			self.process_message[message[EVENT_VAR_NAME]](message)
404
		except ValidationError as e:
405
			self.logger.warning("Message won't be send. Reason: %s", e.message)
406
			self.safe_write(self.default(str(e.message), event=GROWL_MESSAGE_EVENT))
407
408
	def on_close(self):
409
		try:
410
			self_id = id(self)
411
			self.async_redis_publisher.hdel(REDIS_ONLINE_USERS, self_id)
412
			if self.connected:
413
				# seems like async solves problem with connection lost and wrong data status
414
				# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
415
				online, is_online = self.get_online_from_redis(self.sender_name, self_id)
416
				self.logger.info('!! Closing connection, redis current online %s', online)
417
				if not is_online:
418
					message = self.default(online, LOGOUT_EVENT)
419
					self.logger.debug('!! User closed the last tab, refreshing online for all')
420
					self.publish(message)
421
				else:
422
					self.logger.debug('!! User is still online in other tabs')
423
			else:
424
				self.logger.warning('Dropping connection for not connected user')
425
		finally:
426
			if self.async_redis.subscribed:
427
				#  TODO unsubscribe of all subscribed                  !IMPORTANT
428
				self.async_redis.unsubscribe([
429
					ANONYMOUS_REDIS_CHANNEL,
430
					self.channel
431
				])
432
			self.async_redis.disconnect()
433
434
	def open(self):
435
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
436
		if sessionStore.exists(session_key):
437
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, id(self))
438
			self.async_redis.connect()
439
			channels = self.set_username(session_key)
440
			self.ip = self.get_client_ip()
441
			log_params = {
442
				'username': self.sender_name.rjust(8),
443
				'id': self.log_id,
444
				'ip': self.ip
445
			}
446
			self.logger = logging.LoggerAdapter(logger, log_params)
447
			self.listen(channels)
448
			self.add_online_user()
449
			self.connected = True
450
			Thread(target=self.save_ip).start()
451
		else:
452
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
453
			self.close(403, "Session key %s has been rejected" % session_key)
454
455
	def check_origin(self, origin):
456
		"""
457
		check whether browser set domain matches origin
458
		"""
459
		parsed_origin = urlparse(origin)
460
		origin = parsed_origin.netloc
461
		origin_domain = origin.split(':')[0].lower()
462
		browser_set = self.request.headers.get("Host")
463
		browser_domain = browser_set.split(':')[0]
464
		return browser_domain == origin_domain
465
466
	def safe_write(self, message):
467
		"""
468
		Tries to send message, doesn't throw exception outside
469
		:type self: MessagesHandler
470
		"""
471
		# self.logger.debug('<< THREAD %s >>', os.getppid())
472
		try:
473
			if isinstance(message, dict):
474
				message = json.dumps(message)
475
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
476
				raise ValueError('Wrong message type : %s' % str(message))
477
			self.logger.debug(">> %s", message)
478
			self.write_message(message)
479
		except tornado.websocket.WebSocketClosedError as e:
480
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
481
482
	def get_client_ip(self):
483
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
484