Completed
Push — master ( b013d2...e9ec99 )
by Andrew
01:12
created

MessagesHandler.process_send_message()   B

Complexity

Conditions 4

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 4
c 3
b 0
f 0
dl 0
loc 25
rs 8.5806
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import tornado.gen
9
import tornado.httpclient
10
import tornado.ioloop
11
import tornado.web
12
import tornado.websocket
13
import tornadoredis
14
from django.conf import settings
15
from django.core.exceptions import ValidationError
16
from django.db import connection, OperationalError, InterfaceError
17
from django.db.models import Q
18
from redis_sessions.session import SessionStore
19
from tornado.websocket import WebSocketHandler
20
21
from chat.utils import extract_photo
22
23
try:
24
	from urllib.parse import urlparse  # py2
25
except ImportError:
26
	from urlparse import urlparse  # py3
27
28
from chat.settings import MAX_MESSAGE_SIZE, ALL_REDIS_ROOM
29
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo
30
31
PY3 = sys.version > '3'
32
33
user_cookie_name = settings.USER_COOKIE_NAME
34
api_url = getattr(settings, "IP_API_URL", None)
35
36
SESSION_USER_VAR_KEY = 'user_name'
37
38
MESSAGE_ID_VAR_NAME = 'id'
39
RECEIVER_USERNAME_VAR_NAME = 'receiverName'
40
RECEIVER_USERID_VAR_NAME = 'receiverId'
41
CALL_TYPE_VAR_NAME = 'type'
42
COUNT_VAR_NAME = 'count'
43
HEADER_ID_VAR_NAME = 'headerId'
44
USER_VAR_NAME = 'user'
45
USER_ID_VAR_NAME = 'userId'
46
TIME_VAR_NAME = 'time'
47
OLD_NAME_VAR_NAME = 'oldName'
48
CONTENT_VAR_NAME = 'content'
49
IMG_VAR_NAME = 'image'
50
EVENT_VAR_NAME = 'action'
51
GENDER_VAR_NAME = 'sex'
52
53
REFRESH_USER_EVENT = 'onlineUsers'
54
SYSTEM_MESSAGE_EVENT = 'system'
55
GROWL_MESSAGE_EVENT = 'growl'
56
GET_MESSAGES_EVENT = 'messages'
57
ROOMS_EVENT = 'rooms'  # thread ex "main" , channel ex. 'r:main', "i:3"
58
LOGIN_EVENT = 'joined'
59
LOGOUT_EVENT = 'left'
60
SEND_MESSAGE_EVENT = 'send'
61
WEBRTC_EVENT = 'webrtc'
62
CALL_EVENT = 'call'
63
64
REDIS_USERID_CHANNEL_PREFIX = 'i:%s'
65
REDIS_ROOM_CHANNEL_PREFIX = 'r:%d'
66
REDIS_ONLINE_USERS = "online_users"
67
68
(default_room, c)  = Room.objects.get_or_create(name=ALL_REDIS_ROOM)
69
70
ANONYMOUS_REDIS_CHANNEL = REDIS_ROOM_CHANNEL_PREFIX % default_room.id
71
ANONYMOUS_ROOM_NAMES = {default_room.id: default_room.name}
72
73
sessionStore = SessionStore()
74
75
logger = logging.getLogger(__name__)
76
77
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
78
CONNECTION_POOL = tornadoredis.ConnectionPool(
79
	max_connections=500,
80
	wait_for_available=True)
81
82
83
class MessagesCreator(object):
84
85
	def __init__(self, *args, **kwargs):
86
		super(MessagesCreator, self).__init__(*args, **kwargs)
87
		self.sex = None
88
		self.sender_name = None
89
		self.user_id = 0  # anonymous by default
90
91
	def default(self, content, event=SYSTEM_MESSAGE_EVENT):
92
		"""
93
		:return: {"action": event, "content": content, "time": "20:48:57"}
94
		"""
95
		return {
96
			EVENT_VAR_NAME: event,
97
			CONTENT_VAR_NAME: content,
98
			USER_ID_VAR_NAME: self.user_id,
99
			USER_VAR_NAME: self.sender_name,
100
			TIME_VAR_NAME: get_milliseconds()
101
		}
102
103
	def offer_call(self, content, message_type):
104
		"""
105
		:return: {"action": "call", "content": content, "time": "20:48:57"}
106
		"""
107
		message = self.default(content, CALL_EVENT)
108
		message[CALL_TYPE_VAR_NAME] = message_type
109
		return message
110
111
	@classmethod
112
	def create_send_message(cls, message):
113
		"""
114
		:param message:
115
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
116
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
117
		"""
118
		result = {
119
			USER_VAR_NAME: message.sender.username,
120
			USER_ID_VAR_NAME: message.sender.id,
121
			CONTENT_VAR_NAME: message.content,
122
			TIME_VAR_NAME: message.time,
123
			MESSAGE_ID_VAR_NAME: message.id,
124
			EVENT_VAR_NAME: SEND_MESSAGE_EVENT
125
		}
126
		if message.img.name is not None:
127
			result[IMG_VAR_NAME] = message.img.url
128
		if message.receiver is not None:
129
			result[RECEIVER_USERID_VAR_NAME] = message.receiver.id
130
			result[RECEIVER_USERNAME_VAR_NAME] = message.receiver.username
131
		return result
132
133
	@classmethod
134
	def get_messages(cls, messages):
135
		"""
136
		:type messages: list[Messages]
137
		:type messages: QuerySet[Messages]
138
		"""
139
		return {
140
			CONTENT_VAR_NAME: [cls.create_send_message(message) for message in messages],
141
			EVENT_VAR_NAME: GET_MESSAGES_EVENT
142
		}
143
144
	@property
145
	def stored_redis_user(self):
146
		return '%s:%s:%d' % (self.sender_name, self.sex, self.user_id)
147
148
	@property
149
	def channel(self):
150
		return REDIS_USERID_CHANNEL_PREFIX % self.user_id
151
152
	@staticmethod
153
	def online_js_structure(name, sex, user_id):
154
		return {
155
			name: {
156
				GENDER_VAR_NAME: sex,
157
				USER_ID_VAR_NAME: user_id
158
			}
159
		}
160
161
	@property
162
	def online_self_js_structure(self):
163
		return self.online_js_structure(self.sender_name, self.sex, self.user_id)
164
165
166
class MessagesHandler(MessagesCreator):
167
168
	def __init__(self, *args, **kwargs):
169
		super(MessagesHandler, self).__init__(*args, **kwargs)
170
		self.log_id = str(id(self) % 10000).rjust(4, '0')
171
		self.ip = None
172
		log_params = {
173
			'username': '00000000',
174
			'id': self.log_id,
175
			'ip': 'initializing'
176
		}
177
		self.logger = logging.LoggerAdapter(logger, log_params)
178
		self.async_redis = tornadoredis.Client()
179
		self.process_message = {
180
			GET_MESSAGES_EVENT: self.process_get_messages,
181
			SEND_MESSAGE_EVENT: self.process_send_message,
182
			CALL_EVENT: self.process_call
183
		}
184
185
	def do_db(self, callback, *arg, **args):
186
		try:
187
			return callback(*arg, **args)
188
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
189
			self.logger.warning('%s, reconnecting' % e)  # TODO
190
			connection.close()
191
			return callback(*arg, **args)
192
193
	def get_online_from_redis(self, check_name=None, check_id=None):
194
		"""
195
		:rtype : dict
196
		returns (dict, bool) if check_type is present
197
		"""
198
		online = self.sync_redis.hgetall(REDIS_ONLINE_USERS)
199
		self.logger.debug('!! redis online: %s', online)
200
		result = {}
201
		user_is_online = False
202
		# redis stores REDIS_USER_FORMAT, so parse them
203
		if online:
204
			for key, raw_user_sex in online.items():  # py2 iteritems
205
				(name, sex, user_id) = raw_user_sex.decode('utf-8').split(':')
206
				if name == check_name and check_id != int(key.decode('utf-8')):
207
					user_is_online = True
208
				result.update(self.online_js_structure(name, sex, user_id))
209
		return (result, user_is_online) if check_id else result
210
211
	def add_online_user(self):
212
		"""
213
		adds to redis
214
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
215
		:return:
216
		"""
217
		online = self.get_online_from_redis()
218
		self.async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user)
219
		if self.sender_name not in online:  # if a new tab has been opened
220
			online.update(self.online_self_js_structure)
221
			online_user_names_mes = self.default(online, LOGIN_EVENT)
222
			self.logger.info('!! First tab, sending refresh online for all')
223
			self.publish(online_user_names_mes)
224
		else:  # Send user names to self
225
			online_user_names_mes = self.default(online, REFRESH_USER_EVENT)
226
			self.logger.info('!! Second tab, retrieving online for self')
227
			self.safe_write(online_user_names_mes)
228
229
	def set_username(self, session_key):
230
		"""
231
		Case registered: Fetch userName and its channels from database. returns them
232
		:return: channels user should subscribe
233
		"""
234
		session = SessionStore(session_key)
235
		self.user_id = int(session["_auth_user_id"])
236
		user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
237
		self.sender_name = user_db.username
238
		self.sex = user_db.sex_str
239
		rooms = user_db.rooms.all()  # do_db is used already
240
		room_names = {}
241
		channels = [self.channel, ]
242
		for room in rooms:
243
			room_names[room.id] = room.name
244
			channels.append(REDIS_ROOM_CHANNEL_PREFIX % room.id)
245
		rooms_message = self.default(room_names, ROOMS_EVENT)
246
		self.logger.info("!! User %s subscribes for %s", self.sender_name, room_names)
247
		self.safe_write(rooms_message)
248
		return channels
249
250
	def publish(self, message, channel=ANONYMOUS_REDIS_CHANNEL):
251
		jsoned_mess = json.dumps(message)
252
		self.logger.debug('<%s> %s', channel, jsoned_mess)
253
		self.async_redis_publisher.publish(channel, jsoned_mess)
254
255
	def new_message(self, message):
256
		if type(message.body) is not int:  # subscribe event
257
			self.safe_write(message.body)
258
259
	def safe_write(self, message):
260
		raise NotImplementedError('WebSocketHandler implements')
261
262
	def process_send_message(self, message):
263
		"""
264
		:type message: dict
265
		"""
266
		content = message[CONTENT_VAR_NAME]
267
		receiver_id = message.get(RECEIVER_USERID_VAR_NAME)  # if receiver_id is None then its a private message
268
		self.logger.info('!! Sending message %s to user with id %s', content, receiver_id)
269
		receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id
270
		message_db = Message(
271
			sender_id=self.user_id,
272
			content=content,
273
			receiver_id=receiver_id,
274
		)
275
		if IMG_VAR_NAME in message:
276
			message_db.img = extract_photo(message[IMG_VAR_NAME])
277
		self.do_db(message_db.save)  # exit on hacked id with exception
278
		prepared_message = self.create_send_message(message_db)
279
		if receiver_id is None:
280
			self.logger.debug('!! Detected as public')
281
			self.publish(prepared_message)
282
		else:
283
			self.publish(prepared_message, self.channel)
284
			self.logger.debug('!! Detected as private, channel %s', receiver_channel)
285
			if receiver_channel != self.channel:
286
				self.publish(prepared_message, receiver_channel)
287
288
	def process_call(self, message):
289
			"""
290
			:type message: dict
291
			"""
292
			receiver_id = message.get(RECEIVER_USERID_VAR_NAME)  # if receiver_id is None then its a private message
293
			self.logger.info('!! Offering a call to user with id %s',  receiver_id)
294
			message = self.offer_call(message.get(CONTENT_VAR_NAME), message.get(CALL_TYPE_VAR_NAME))
295
			self.publish(message, REDIS_USERID_CHANNEL_PREFIX % receiver_id)
296
297
	def process_get_messages(self, data):
298
		"""
299
		:type data: dict
300
		"""
301
		header_id = data.get(HEADER_ID_VAR_NAME, None)
302
		count = int(data.get(COUNT_VAR_NAME, 10))
303
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
304
		if header_id is None:
305
			messages = Message.objects.filter(
306
				# Only public or private or private
307
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
308
			).order_by('-pk')[:count]
309
		else:
310
			messages = Message.objects.filter(
311
				Q(id__lt=header_id),
312
				Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id)
313
			).order_by('-pk')[:count]
314
		response = self.do_db(self.get_messages, messages)
315
		self.safe_write(response)
316
317
	def save_ip(self):
318
		if (self.do_db(UserJoinedInfo.objects.filter(
319
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
320
			return
321
		ip_address = self.get_or_create_ip()
322
		UserJoinedInfo.objects.create(
323
			ip=ip_address,
324
			user_id=self.user_id
325
		)
326
327
	def get_or_create_ip(self):
328
		try:
329
			ip_address = IpAddress.objects.get(ip=self.ip)
330
		except IpAddress.DoesNotExist:
331
			try:
332
				if not api_url:
333
					raise Exception('api url is absent')
334
				self.logger.debug("Creating ip record %s", self.ip)
335
				f = urlopen(api_url % self.ip)
336
				raw_response = f.read().decode("utf-8")
337
				response = json.loads(raw_response)
338
				if response['status'] != "success":
339
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
340
				ip_address = IpAddress.objects.create(
341
					ip=self.ip,
342
					isp=response['isp'],
343
					country=response['country'],
344
					region=response['regionName'],
345
					city=response['city'],
346
					country_code=response['countryCode']
347
				)
348
			except Exception as e:
349
				self.logger.error("Error while creating ip with country info, because %s", e)
350
				ip_address = IpAddress.objects.create(ip=self.ip)
351
		return ip_address
352
353
354
class AntiSpam(object):
355
356
	def __init__(self):
357
		self.spammed = 0
358
		self.info = {}
359
360
	def check_spam(self, json_message):
361
		message_length = len(json_message)
362
		info_key = int(round(time.time() * 100))
363
		self.info[info_key] = message_length
364
		if message_length > MAX_MESSAGE_SIZE:
365
			self.spammed += 1
366
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
367
		self.check_timed_spam()
368
369
	def check_timed_spam(self):
370
		# TODO implement me
371
		pass
372
		# raise ValidationError("You're chatting too much, calm down a bit!")
373
374
375
class TornadoHandler(WebSocketHandler, MessagesHandler):
376
377
	def __init__(self, *args, **kwargs):
378
		super(TornadoHandler, self).__init__(*args, **kwargs)
379
		self.connected = False
380
		self.anti_spam = AntiSpam()
381
		from chat import global_redis
382
		self.async_redis_publisher = global_redis.async_redis_publisher
383
		self.sync_redis = global_redis.sync_redis
384
385
	@tornado.gen.engine
386
	def listen(self, channels):
387
		"""
388
		self.channel should been set before calling
389
		"""
390
		yield tornado.gen.Task(
391
			self.async_redis.subscribe, channels)
392
		self.async_redis.listen(self.new_message)
393
394
	def data_received(self, chunk):
395
		pass
396
397
	def on_message(self, json_message):
398
		try:
399
			if not self.connected:
400
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
401
			if not json_message:
402
				raise ValidationError('Skipping null message')
403
			# self.anti_spam.check_spam(json_message)
404
			self.logger.debug('<< %s', json_message)
405
			message = json.loads(json_message)
406
			if not message[EVENT_VAR_NAME] in self.process_message:
407
				raise ValidationError("event {} is unknown".format(message[EVENT_VAR_NAME]))
408
			self.process_message[message[EVENT_VAR_NAME]](message)
409
		except ValidationError as e:
410
			self.logger.warning("Message won't be send, because : %s", e.message)
411
			self.safe_write(self.default(str(e.message), event=GROWL_MESSAGE_EVENT))
412
413
	def on_close(self):
414
		try:
415
			self_id = id(self)
416
			self.async_redis_publisher.hdel(REDIS_ONLINE_USERS, self_id)
417
			if self.connected:
418
				# seems like async solves problem with connection lost and wrong data status
419
				# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
420
				online, is_online = self.get_online_from_redis(self.sender_name, self_id)
421
				self.logger.info('!! Closing connection, redis current online %s', online)
422
				if not is_online:
423
					message = self.default(online, LOGOUT_EVENT)
424
					self.logger.debug('!! User closed the last tab, refreshing online for all')
425
					self.publish(message)
426
				else:
427
					self.logger.debug('!! User is still online in other tabs')
428
			else:
429
				self.logger.warning('Dropping connection for not connected user')
430
		finally:
431
			if self.async_redis.subscribed:
432
				#  TODO unsubscribe of all subscribed                  !IMPORTANT
433
				self.async_redis.unsubscribe([
434
					ANONYMOUS_REDIS_CHANNEL,
435
					self.channel
436
				])
437
			self.async_redis.disconnect()
438
439
	def open(self):
440
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
441
		if sessionStore.exists(session_key):
442
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, id(self))
443
			self.async_redis.connect()
444
			channels = self.set_username(session_key)
445
			self.ip = self.get_client_ip()
446
			log_params = {
447
				'username': self.sender_name.rjust(8),
448
				'id': self.log_id,
449
				'ip': self.ip
450
			}
451
			self.logger = logging.LoggerAdapter(logger, log_params)
452
			self.listen(channels)
453
			self.add_online_user()
454
			self.connected = True
455
			Thread(target=self.save_ip).start()
456
		else:
457
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
458
			self.close(403, "Session key %s has been rejected" % session_key)
459
460
	def check_origin(self, origin):
461
		"""
462
		check whether browser set domain matches origin
463
		"""
464
		parsed_origin = urlparse(origin)
465
		origin = parsed_origin.netloc
466
		origin_domain = origin.split(':')[0].lower()
467
		browser_set = self.request.headers.get("Host")
468
		browser_domain = browser_set.split(':')[0]
469
		return browser_domain == origin_domain
470
471
	def safe_write(self, message):
472
		"""
473
		Tries to send message, doesn't throw exception outside
474
		:type self: MessagesHandler
475
		"""
476
		# self.logger.debug('<< THREAD %s >>', os.getppid())
477
		try:
478
			if isinstance(message, dict):
479
				message = json.dumps(message)
480
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
481
				raise ValueError('Wrong message type : %s' % str(message))
482
			self.logger.debug(">> %s", message)
483
			self.write_message(message)
484
		except tornado.websocket.WebSocketClosedError as e:
485
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
486
487
	def get_client_ip(self):
488
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
489