Completed
Push — master ( 74d0a0...739783 )
by Andrew
53s
created

MessagesHandler   F

Complexity

Total Complexity 85

Size/Duplication

Total Lines 439
Duplicated Lines 0 %

Importance

Changes 22
Bugs 2 Features 0
Metric Value
c 22
b 2
f 0
dl 0
loc 439
rs 1.5789
wmc 85

35 Methods

Rating   Name   Duplication   Size   Complexity  
B __init__() 0 30 1
A publish() 0 6 2
B add_online_user() 0 28 3
A encode() 0 8 1
A listen() 0 5 1
A execute_query() 0 7 3
A do_db() 0 7 2
A patch_tornadoredis() 0 19 3
A add_channel() 0 5 1
A new_read() 0 15 2
A connected() 0 3 1
A logger() 0 3 2
B get_online_from_redis() 0 18 6
A set_js_user_structure() 0 4 1
A create_user_channel() 0 11 2
B edit_message() 0 18 5
A save_ip() 0 8 2
A set_opponent_call_channel() 0 2 1
A new_message() 0 10 3
A create_new_room() 0 9 3
A safe_write() 0 2 1
B create_room() 0 23 5
B get_or_create_ip() 0 25 5
A send_client_delete_channel() 0 5 1
A process_send_message() 0 15 2
B delete_channel() 0 17 5
B get_users_in_current_user_rooms() 0 29 4
A send_client_new_channel() 0 4 1
A update_room() 0 5 2
A publish_logout() 0 9 2
A remove_parsable_prefix() 0 3 2
A get_offline_messages() 0 10 2
B invite_user() 0 18 5
A process_get_messages() 0 14 2
A process_call() 0 16 2

How to fix   Complexity   

Complex Class

Complex classes like MessagesHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import json
2
import logging
3
import sys
4
import time
5
from datetime import timedelta
6
from numbers import Number
7
from threading import Thread
8
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.ioloop
12
import tornado.web
13
import tornado.websocket
14
import tornadoredis
15
from django.conf import settings
16
from django.core.exceptions import ValidationError
17
from django.db import connection, OperationalError, InterfaceError, IntegrityError
18
from django.db.models import Q, F
19
from redis_sessions.session import SessionStore
20
from tornado import ioloop
21
from tornado.websocket import WebSocketHandler
22
23
from chat.utils import extract_photo
24
25
try:  # py2
26
	from urlparse import urlparse
27
	from urllib import urlopen
28
except ImportError:  # py3
29
	from urllib.parse import urlparse
30
	from urllib.request import urlopen
31
32
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE, SELECT_SELF_ROOM
33
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo, RoomUsers
34
35
PY3 = sys.version > '3'
36
str_type = str if PY3 else basestring
37
api_url = getattr(settings, "IP_API_URL", None)
38
39
sessionStore = SessionStore()
40
41
base_logger = logging.getLogger(__name__)
42
43
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
44
#CONNECTION_POOL = tornadoredis.ConnectionPool(
45
#	max_connections=500,
46
#	wait_for_available=True)
47
48
49
class Actions(object):
50
	LOGIN = 'addOnlineUser'
51
	LOGOUT = 'removeOnlineUser'
52
	SEND_MESSAGE = 'sendMessage'
53
	PRINT_MESSAGE = 'printMessage'
54
	CALL = 'call'
55
	ROOMS = 'setRooms'
56
	REFRESH_USER = 'setOnlineUsers'
57
	GROWL_MESSAGE = 'growl'
58
	GET_MESSAGES = 'loadMessages'
59
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
60
	DELETE_ROOM = 'deleteRoom'
61
	EDIT_MESSAGE = 'editMessage'
62
	DELETE_MESSAGE = 'deleteMessage'
63
	CREATE_ROOM_CHANNEL = 'addRoom'
64
	INVITE_USER = 'inviteUser'
65
	ADD_USER = 'addUserToAll'
66
	OFFLINE_MESSAGES = 'loadOfflineMessages'
67
68
69
class VarNames(object):
70
	CALL_TYPE = 'type'
71
	USER = 'user'
72
	USER_ID = 'userId'
73
	TIME = 'time'
74
	CONTENT = 'content'
75
	IMG = 'image'
76
	EVENT = 'action'
77
	MESSAGE_ID = 'id'
78
	GENDER = 'sex'
79
	ROOM_NAME = 'name'
80
	FILE_NAME = 'filename'
81
	ROOM_ID = 'roomId'
82
	ROOM_USERS = 'users'
83
	CHANNEL = 'channel'
84
	GET_MESSAGES_COUNT = 'count'
85
	GET_MESSAGES_HEADER_ID = 'headerId'
86
	CHANNEL_NAME = 'channel'
87
	IS_ROOM_PRIVATE = 'private'
88
	#ROOM_NAME = 'roomName'
89
	# ROOM_ID = 'roomId'
90
91
92
class CallType(object):
93
	OFFER = 'offer'
94
95
class HandlerNames:
96
	NAME = 'handler'
97
	CHANNELS = 'channels'
98
	CHAT = 'chat'
99
	GROWL = 'growl'
100
	WEBRTC = 'webrtc'
101
	FILE = 'file'
102
103
104
class RedisPrefix:
105
	USER_ID_CHANNEL_PREFIX = 'u'
106
	__ROOM_ONLINE__ = 'o:{}'
107
108
	@classmethod
109
	def generate_user(cls, key):
110
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
111
112
RedisPrefix.DEFAULT_CHANNEL = ALL_ROOM_ID
113
114
115
class MessagesCreator(object):
116
117
	def __init__(self, *args, **kwargs):
118
		super(MessagesCreator, self).__init__(*args, **kwargs)
119
		self.sex = None
120
		self.sender_name = None
121
		self.user_id = 0  # anonymous by default
122
123
	def default(self, content, event, handler):
124
		"""
125
		:return: {"action": event, "content": content, "time": "20:48:57"}
126
		"""
127
		return {
128
			VarNames.EVENT: event,
129
			VarNames.CONTENT: content,
130
			VarNames.USER_ID: self.user_id,
131
			VarNames.TIME: get_milliseconds(),
132
			HandlerNames.NAME: handler
133
		}
134
135
	def room_online(self, online, event, channel):
136
		"""
137
		:return: {"action": event, "content": content, "time": "20:48:57"}
138
		"""
139
		room_less = self.default(online, event, HandlerNames.CHAT)
140
		room_less[VarNames.CHANNEL_NAME] = channel
141
		room_less[VarNames.USER] = self.sender_name
142
		room_less[VarNames.GENDER] = self.sex
143
		return room_less
144
145
	def offer_call(self, content, message_type):
146
		"""
147
		:return: {"action": "call", "content": content, "time": "20:48:57"}
148
		"""
149
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
150
		message[VarNames.CALL_TYPE] = message_type
151
		message[VarNames.USER] = self.sender_name
152
		return message
153
154
	@classmethod
155
	def create_message(cls, message):
156
		res = {
157
			VarNames.USER_ID: message.sender_id,
158
			VarNames.CONTENT: message.content,
159
			VarNames.TIME: message.time,
160
			VarNames.MESSAGE_ID: message.id,
161
		}
162
		if message.img.name:
163
			res[VarNames.IMG] = message.img.url
164
		return res
165
166
	@classmethod
167
	def create_send_message(cls, message, event=Actions.PRINT_MESSAGE):
168
		"""
169
		:param message:
170
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
171
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
172
		"""
173
		res = cls.create_message(message)
174
		res[VarNames.EVENT] = event
175
		res[VarNames.CHANNEL] = message.room_id
176
		res[HandlerNames.NAME] = HandlerNames.CHAT
177
		return res
178
179
	@classmethod
180
	def get_messages(cls, messages, channel):
181
		"""
182
		:type messages: list[Messages]
183
		:type channel: str
184
		:type messages: QuerySet[Messages]
185
		"""
186
		return {
187
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
188
			VarNames.EVENT: Actions.GET_MESSAGES,
189
			VarNames.CHANNEL: channel,
190
			HandlerNames.NAME: HandlerNames.CHAT
191
		}
192
193
	@property
194
	def stored_redis_user(self):
195
		return  self.user_id
196
197
	@property
198
	def channel(self):
199
		return RedisPrefix.generate_user(self.user_id)
200
201
	def subscribe_direct_channel_message(self, room_id, other_user_id):
202
		return {
203
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
204
			VarNames.ROOM_ID: room_id,
205
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
206
			HandlerNames.NAME: HandlerNames.CHANNELS
207
		}
208
209
	def subscribe_room_channel_message(self, room_id, room_name):
210
		return {
211
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
212
			VarNames.ROOM_ID: room_id,
213
			VarNames.ROOM_USERS: [self.user_id],
214
			HandlerNames.NAME: HandlerNames.CHANNELS,
215
			VarNames.ROOM_NAME: room_name
216
		}
217
218
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
219
		return {
220
			VarNames.EVENT: Actions.INVITE_USER,
221
			VarNames.ROOM_ID: room_id,
222
			VarNames.USER_ID: user_id,
223
			HandlerNames.NAME: HandlerNames.CHANNELS,
224
			VarNames.ROOM_NAME: room_name,
225
			VarNames.CONTENT: users
226
		}
227
228
	def add_user_to_room(self, channel, user_id, content):
229
		return {
230
			VarNames.EVENT: Actions.ADD_USER,
231
			VarNames.CHANNEL: channel,
232
			VarNames.USER_ID: user_id,
233
			HandlerNames.NAME: HandlerNames.CHAT,
234
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
235
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
236
		}
237
238
	def unsubscribe_direct_message(self, room_id):
239
		return {
240
			VarNames.EVENT: Actions.DELETE_ROOM,
241
			VarNames.ROOM_ID: room_id,
242
			VarNames.USER_ID: self.user_id,
243
			HandlerNames.NAME: HandlerNames.CHANNELS,
244
			VarNames.TIME: get_milliseconds()
245
		}
246
247
	def load_offline_message(self, offline_messages, channel_key):
248
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
249
		res[VarNames.CHANNEL] = channel_key
250
		return res
251
252
253
class MessagesHandler(MessagesCreator):
254
255
	def __init__(self, *args, **kwargs):
256
		self.closed_channels = None
257
		self.parsable_prefix = 'p'
258
		super(MessagesHandler, self).__init__(*args, **kwargs)
259
		self.id = id(self)
260
		self.ip = None
261
		from chat import global_redis
262
		self.async_redis_publisher = global_redis.async_redis_publisher
263
		self.sync_redis = global_redis.sync_redis
264
		self.channels = []
265
		self.call_receiver_channel = None
266
		self._logger = None
267
		self.async_redis = tornadoredis.Client()
268
		self.patch_tornadoredis()
269
		self.pre_process_message = {
270
			Actions.GET_MESSAGES: self.process_get_messages,
271
			Actions.SEND_MESSAGE: self.process_send_message,
272
			Actions.CALL: self.process_call,
273
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
274
			Actions.DELETE_ROOM: self.delete_channel,
275
			Actions.EDIT_MESSAGE: self.edit_message,
276
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
277
			Actions.INVITE_USER: self.invite_user,
278
		}
279
		self.post_process_message = {
280
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
281
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
282
			Actions.DELETE_ROOM: self.send_client_delete_channel,
283
			Actions.INVITE_USER: self.send_client_new_channel,
284
			Actions.CALL: self.set_opponent_call_channel
285
		}
286
287
	def patch_tornadoredis(self):  # TODO remove this
288
		self.async_redis.connection.old_read = self.async_redis.connection.read
289
		def new_read(instance, *args, **kwargs):
290
			try:
291
				return instance.old_read(*args, **kwargs)
292
			except Exception as e:
293
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
294
				self.logger.error(e)
295
				self.logger.error(
296
					"Exception info: "
297
					"self.connected = '%s';;; "
298
					"Redis default channel online = '%s';;; "
299
					"self.channels = '%s';;; "
300
					"self.closed_channels  = '%s';;;",
301
					self.connected, current_online, self.channels, self.closed_channels
302
				)
303
				raise e
304
		fabric = type(self.async_redis.connection.read)
305
		self.async_redis.connection.read = fabric(new_read, self.async_redis.connection)
306
307
	@property
308
	def connected(self):
309
		raise NotImplemented
310
311
	@connected.setter
312
	def connected(self, value):
313
		raise NotImplemented
314
315
	@tornado.gen.engine
316
	def listen(self, channels):
317
		yield tornado.gen.Task(
318
			self.async_redis.subscribe, channels)
319
		self.async_redis.listen(self.new_message)
320
321
	@property
322
	def logger(self):
323
		return self._logger if self._logger else base_logger
324
325
	@tornado.gen.engine
326
	def add_channel(self, channel):
327
		self.channels.append(channel)
328
		yield tornado.gen.Task(
329
			self.async_redis.subscribe, (channel,))
330
331
	def do_db(self, callback, *args, **kwargs):
332
		try:
333
			return callback(*args, **kwargs)
334
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
335
			self.logger.warning('%s, reconnecting' % e)  # TODO
336
			connection.close()
337
			return callback(*args, **kwargs)
338
339
	def execute_query(self, query, *args, **kwargs):
340
		cursor = connection.cursor()
341
		cursor.execute(query, *args, **kwargs)
342
		desc = cursor.description
343
		return [
344
			dict(zip([col[0] for col in desc], row))
345
			for row in cursor.fetchall()
346
			]
347
348
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
349
		"""
350
		:rtype : dict
351
		returns (dict, bool) if check_type is present
352
		"""
353
		online = self.sync_redis.hgetall(channel)
354
		self.logger.debug('!! channel %s redis online: %s', channel, online)
355
		result = set()
356
		user_is_online = False
357
		# redis stores REDIS_USER_FORMAT, so parse them
358
		if online:
359
			for key_hash, raw_user_id in online.items():  # py2 iteritems
360
				user_id = int(raw_user_id.decode('utf-8'))
361
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
362
					user_is_online = True
363
				result.add(user_id)
364
		result = list(result)
365
		return (result, user_is_online) if check_user_id else result
366
367
	def add_online_user(self, room_id, offline_messages=None):
368
		"""
369
		adds to redis
370
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
371
		:return:
372
		"""
373
		self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user)
374
		# since we add user to online first, latest trigger will always show correct online
375
		online, is_online = self.get_online_from_redis(room_id, self.user_id, self.id)
376
		if not is_online:  # if a new tab has been opened
377
			online.append(self.user_id)
378
			online_user_names_mes = self.room_online(
379
				online,
380
				Actions.LOGIN,
381
				room_id
382
			)
383
			self.logger.info('!! First tab, sending refresh online for all')
384
			self.publish(online_user_names_mes, room_id)
385
			if offline_messages:
386
				self.safe_write(self.load_offline_message(offline_messages, room_id))
387
		else:  # Send user names to self
388
			online_user_names_mes = self.room_online(
389
				online,
390
				Actions.REFRESH_USER,
391
				room_id
392
			)
393
			self.logger.info('!! Second tab, retrieving online for self')
394
			self.safe_write(online_user_names_mes)
395
396
	def publish(self, message, channel, parsable=False):
397
		jsoned_mess = json.dumps(message)
398
		self.logger.debug('<%s> %s', channel, jsoned_mess)
399
		if parsable:
400
			jsoned_mess = self.encode(jsoned_mess)
401
		self.async_redis_publisher.publish(channel, jsoned_mess)
402
403
	def encode(self, message):
404
		"""
405
		Marks message with prefix to specify that
406
		it should be decoded and proccesed before sending to client
407
		@param message: message to mark
408
		@return: marked message
409
		"""
410
		return self.parsable_prefix + message
411
412
	def remove_parsable_prefix(self, message):
413
		if message.startswith(self.parsable_prefix):
414
			return message[1:]
415
416
	def new_message(self, message):
417
		data = message.body
418
		if isinstance(data, str_type):  # subscribe event
419
			prefixless_str = self.remove_parsable_prefix(data)
420
			if prefixless_str:
421
				self.safe_write(prefixless_str)
422
				dict_message = json.loads(prefixless_str)
423
				self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
424
			else:
425
				self.safe_write(data)
426
427
	def safe_write(self, message):
428
		raise NotImplementedError('WebSocketHandler implements')
429
430
	def process_send_message(self, message):
431
		"""
432
		:type message: dict
433
		"""
434
		channel = message[VarNames.CHANNEL]
435
		message_db = Message(
436
			sender_id=self.user_id,
437
			content=message[VarNames.CONTENT]
438
		)
439
		message_db.room_id = channel
440
		if VarNames.IMG in message:
441
			message_db.img = extract_photo(message[VarNames.IMG], message.get(VarNames.FILE_NAME))
442
		self.do_db(message_db.save)  # exit on hacked id with exception
443
		prepared_message = self.create_send_message(message_db)
444
		self.publish(prepared_message, channel)
445
446
	def process_call(self, in_message):
447
		"""
448
		:type in_message: dict
449
		"""
450
		call_type = in_message.get(VarNames.CALL_TYPE)
451
		set_opponent_channel = False
452
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
453
		if call_type == CallType.OFFER:
454
			room_id = in_message[VarNames.CHANNEL]
455
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
456
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
457
			set_opponent_channel = True
458
			out_message[VarNames.CHANNEL] = room_id
459
		# TODO
460
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
461
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
462
463
	def create_new_room(self, message):
464
		room_name = message[VarNames.ROOM_NAME]
465
		if not room_name or len(room_name) > 16:
466
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
467
		room = Room(name=room_name)
468
		self.do_db(room.save)
469
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
470
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
471
		self.publish(subscribe_message, self.channel, True)
472
473
	def invite_user(self, message):
474
		room_id = message[VarNames.ROOM_ID]
475
		user_id = message[VarNames.USER_ID]
476
		if room_id not in self.channels:
477
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
478
		room = self.do_db(Room.objects.get, id=room_id)
479
		if room.is_private:
480
			raise ValidationError("You can't add users to direct room, create a new room instead")
481
		try:
482
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
483
		except IntegrityError:
484
			raise ValidationError("User is already in channel")
485
		users_in_room = {}
486
		for user in room.users.all():
487
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
488
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
489
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
490
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
491
492
	def create_room(self, user_rooms, user_id):
493
		if self.user_id == user_id:
494
			room_ids = list([room['room_id'] for room in user_rooms])
495
			query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ])
496
		else:
497
			rooms_query = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms)
498
			query_res = rooms_query.values('room__id', 'room__disabled')
499
		if len(query_res) > 0:
500
			room = query_res[0]
501
			room_id = room['room__id']
502
			self.update_room(room_id, room['room__disabled'])
503
		else:
504
			room = Room()
505
			room.save()
506
			room_id = room.id
507
			if self.user_id == user_id:
508
				RoomUsers(user_id=self.user_id, room_id=room_id).save()
509
			else:
510
				RoomUsers.objects.bulk_create([
511
					RoomUsers(user_id=user_id, room_id=room_id),
512
					RoomUsers(user_id=self.user_id, room_id=room_id),
513
				])
514
		return room_id
515
516
	def update_room(self, room_id, disabled):
517
		if not disabled:
518
			raise ValidationError('This room already exist')
519
		else:
520
			Room.objects.filter(id=room_id).update(disabled=False)
521
522
	def create_user_channel(self, message):
523
		user_id = message[VarNames.USER_ID]
524
		# get all self private rooms ids
525
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
526
		# get private room that contains another user from rooms above
527
		room_id = self.create_room(user_rooms, user_id)
528
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
529
		self.publish(subscribe_message, self.channel, True)
530
		other_channel = RedisPrefix.generate_user(user_id)
531
		if self.channel != other_channel:
532
			self.publish(subscribe_message, other_channel, True)
533
534
	def delete_channel(self, message):
535
		room_id = message[VarNames.ROOM_ID]
536
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
537
			raise ValidationError('You are not allowed to exit this room')
538
		room = self.do_db(Room.objects.get, id=room_id)
539
		if room.disabled:
540
			raise ValidationError('Room is already deleted')
541
		if room.name is None:  # if private then disable
542
			room.disabled = True
543
		else: # if public -> leave the room, delete the link
544
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
545
			online = self.get_online_from_redis(room_id)
546
			online.remove(self.user_id)
547
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
548
		room.save()
549
		message = self.unsubscribe_direct_message(room_id)
550
		self.publish(message, room_id, True)
551
552
	def edit_message(self, data):
553
		message_id = data[VarNames.MESSAGE_ID]
554
		message = Message.objects.get(id=message_id)
555
		if message.sender_id != self.user_id:
556
			raise ValidationError("You can only edit your messages")
557
		if message.time + 60000 < get_milliseconds():
558
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
559
		if message.deleted:
560
			raise ValidationError("Already deleted")
561
		message.content = data[VarNames.CONTENT]
562
		selector = Message.objects.filter(id=message_id)
563
		if message.content is None:
564
			selector.update(deleted=True)
565
			action = Actions.DELETE_MESSAGE
566
		else:
567
			action = Actions.EDIT_MESSAGE
568
			selector.update(content=message.content)
569
		self.publish(self.create_send_message(message, action), message.room_id)
570
571
	def send_client_new_channel(self, message):
572
		room_id = message[VarNames.ROOM_ID]
573
		self.add_channel(room_id)
574
		self.add_online_user(room_id)
575
576
	def set_opponent_call_channel(self, message):
577
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
578
579
	def send_client_delete_channel(self, message):
580
		room_id = message[VarNames.ROOM_ID]
581
		self.async_redis.unsubscribe((room_id,))
582
		self.async_redis_publisher.hdel(room_id, self.id)
583
		self.channels.remove(room_id)
584
585
	def process_get_messages(self, data):
586
		"""
587
		:type data: dict
588
		"""
589
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
590
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
591
		room_id = data[VarNames.CHANNEL]
592
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
593
		if header_id is None:
594
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
595
		else:
596
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
597
		response = self.do_db(self.get_messages, messages, room_id)
598
		self.safe_write(response)
599
600
	def get_offline_messages(self):
601
		res = {}
602
		offline_messages = Message.objects.filter(
603
			id__gt=F('room__roomusers__last_read_message_id'),
604
			deleted=False,
605
			room__roomusers__user_id=self.user_id
606
		)
607
		for message in offline_messages:
608
			res.setdefault(message.room_id, []).append(self.create_message(message))
609
		return res
610
611
	def get_users_in_current_user_rooms(self):
612
		"""
613
		{
614
			"ROOM_ID:1": {
615
				"name": "All",
616
				"users": {
617
					"USER_ID:admin": {
618
						"name": "USER_NAME:admin",
619
						"sex": "SEX:Secret"
620
					},
621
					"USER_ID_2": {
622
						"name": "USER_NAME:Mike",
623
						"sex": "Male"
624
					}
625
				},
626
				"isPrivate": true
627
			}
628
		}
629
		"""
630
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
631
		res = {room['id']: {
632
				VarNames.ROOM_NAME: room['name'],
633
				VarNames.ROOM_USERS: {}
634
			} for room in user_rooms}
635
		room_ids = (room_id for room_id in res)
636
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
637
		for user in rooms_users:
638
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
639
		return res
640
641
	def set_js_user_structure(self, user_dict, user_id, name, sex):
642
		user_dict[user_id] = {
643
			VarNames.USER: name,
644
			VarNames.GENDER: GENDERS[sex]
645
		}
646
647
	def save_ip(self):
648
		if (self.do_db(UserJoinedInfo.objects.filter(
649
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
650
			return
651
		ip_address = self.get_or_create_ip()
652
		UserJoinedInfo.objects.create(
653
			ip=ip_address,
654
			user_id=self.user_id
655
		)
656
657
	def get_or_create_ip(self):
658
		try:
659
			ip_address = IpAddress.objects.get(ip=self.ip)
660
		except IpAddress.DoesNotExist:
661
			try:
662
				if not api_url:
663
					raise Exception('api url is absent')
664
				self.logger.debug("Creating ip record %s", self.ip)
665
				f = urlopen(api_url % self.ip)
666
				raw_response = f.read().decode("utf-8")
667
				response = json.loads(raw_response)
668
				if response['status'] != "success":
669
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
670
				ip_address = IpAddress.objects.create(
671
					ip=self.ip,
672
					isp=response['isp'],
673
					country=response['country'],
674
					region=response['regionName'],
675
					city=response['city'],
676
					country_code=response['countryCode']
677
				)
678
			except Exception as e:
679
				self.logger.error("Error while creating ip with country info, because %s", e)
680
				ip_address = IpAddress.objects.create(ip=self.ip)
681
		return ip_address
682
683
	def publish_logout(self, channel, log_data):
684
		# seems like async solves problem with connection lost and wrong data status
685
		# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
686
		online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
687
		log_data[channel] = {'online': online, 'is_online': is_online}
688
		if not is_online:
689
			message = self.room_online(online, Actions.LOGOUT, channel)
690
			self.publish(message, channel)
691
			return True
692
693
694
class AntiSpam(object):
695
696
	def __init__(self):
697
		self.spammed = 0
698
		self.info = {}
699
700
	def check_spam(self, json_message):
701
		message_length = len(json_message)
702
		info_key = int(round(time.time() * 100))
703
		self.info[info_key] = message_length
704
		if message_length > MAX_MESSAGE_SIZE:
705
			self.spammed += 1
706
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
707
		self.check_timed_spam()
708
709
	def check_timed_spam(self):
710
		# TODO implement me
711
		pass
712
		# raise ValidationError("You're chatting too much, calm down a bit!")
713
714
715
class TornadoHandler(WebSocketHandler, MessagesHandler):
716
717
	def __init__(self, *args, **kwargs):
718
		super(TornadoHandler, self).__init__(*args, **kwargs)
719
		self.__connected__ = False
720
		self.anti_spam = AntiSpam()
721
722
	@property
723
	def connected(self):
724
		return self.__connected__
725
726
	@connected.setter
727
	def connected(self, value):
728
		self.__connected__ = value
729
730
	def data_received(self, chunk):
731
		pass
732
733
	def on_message(self, json_message):
734
		try:
735
			if not self.connected:
736
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
737
			if not json_message:
738
				raise ValidationError('Skipping null message')
739
			# self.anti_spam.check_spam(json_message)
740
			self.logger.debug('<< %.1000s', json_message)
741
			message = json.loads(json_message)
742
			if message[VarNames.EVENT] not in self.pre_process_message:
743
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
744
			channel = message.get(VarNames.CHANNEL)
745
			if channel and channel not in self.channels:
746
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
747
			self.pre_process_message[message[VarNames.EVENT]](message)
748
		except ValidationError as e:
749
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
750
			self.safe_write(error_message)
751
752
	def on_close(self):
753
		if self.async_redis.subscribed:
754
			self.logger.info("Close event, unsubscribing from %s", self.channels)
755
			self.async_redis.unsubscribe(self.channels)
756
		else:
757
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
758
		log_data = {}
759
		gone_offline = False
760
		for channel in self.channels:
761
			if not isinstance(channel, Number):
762
				continue
763
			self.sync_redis.hdel(channel, self.id)
764
			if self.connected:
765
				gone_offline = self.publish_logout(channel, log_data) or gone_offline
766
		if gone_offline:
767
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
768
			self.logger.info("Updated %s last read message", res)
769
		self.disconnect(json.dumps(log_data))
770
771
	def disconnect(self, log_data, tries=0):
772
		"""
773
		Closes a connection if it's not in proggress, otherwice timeouts closing
774
		https://github.com/evilkost/brukva/issues/25#issuecomment-9468227
775
		"""
776
		self.connected = False
777
		self.closed_channels = self.channels
778
		self.channels = []
779
		if self.async_redis.connection.in_progress and tries < 1000:  # failsafe eternal loop
780
			self.logger.debug('Closing a connection timeouts')
781
			ioloop.IOLoop.instance().add_timeout(timedelta(0.00001), self.disconnect, log_data, tries+1)
782
		else:
783
			self.logger.info("Close connection result: %s", log_data)
784
			self.async_redis.disconnect()
785
786
	def open(self):
787
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
788
		if sessionStore.exists(session_key):
789
			self.ip = self.get_client_ip()
790
			session = SessionStore(session_key)
791
			self.user_id = int(session["_auth_user_id"])
792
			log_params = {
793
				'user_id': str(self.user_id).zfill(3),
794
				'id': self.id,
795
				'ip': self.ip
796
			}
797
			self._logger = logging.LoggerAdapter(base_logger, log_params)
798
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
799
			self.async_redis.connect()
800
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
801
			self.sender_name = user_db.username
802
			self.sex = user_db.sex_str
803
			user_rooms = self.get_users_in_current_user_rooms()
804
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
805
			# get all missed messages
806
			self.channels = []  # py2 doesn't support clear()
807
			self.channels.append(self.channel)
808
			for room_id in user_rooms:
809
				self.channels.append(room_id)
810
			self.listen(self.channels)
811
			off_messages = self.get_offline_messages()
812
			for room_id in user_rooms:
813
				self.add_online_user(room_id, off_messages.get(room_id))
814
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
815
			self.connected = True
816
			Thread(target=self.save_ip).start()
817
		else:
818
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
819
			self.close(403, "Session key %s has been rejected" % session_key)
820
821
	def check_origin(self, origin):
822
		"""
823
		check whether browser set domain matches origin
824
		"""
825
		parsed_origin = urlparse(origin)
826
		origin = parsed_origin.netloc
827
		origin_domain = origin.split(':')[0].lower()
828
		browser_set = self.request.headers.get("Host")
829
		browser_domain = browser_set.split(':')[0]
830
		return browser_domain == origin_domain
831
832
	def safe_write(self, message):
833
		"""
834
		Tries to send message, doesn't throw exception outside
835
		:type self: MessagesHandler
836
		"""
837
		# self.logger.debug('<< THREAD %s >>', os.getppid())
838
		try:
839
			if isinstance(message, dict):
840
				message = json.dumps(message)
841
			if not isinstance(message, str_type):
842
				raise ValueError('Wrong message type : %s' % str(message))
843
			self.logger.debug(">> %.1000s", message)
844
			self.write_message(message)
845
		except tornado.websocket.WebSocketClosedError as e:
846
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
847
848
	def get_client_ip(self):
849
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
850