Completed
Push — master ( 1b2f28...25f475 )
by Andrew
01:10
created

MessagesHandler.delete_channel()   B

Complexity

Conditions 5

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 17
rs 8.5454
1
import json
2
import logging
3
import sys
4
import time
5
from threading import Thread
6
from urllib.request import urlopen
7
8
import datetime
9
import tornado.gen
10
import tornado.httpclient
11
import tornado.ioloop
12
import tornado.web
13
import tornado.websocket
14
import tornadoredis
15
from django.conf import settings
16
from django.core.exceptions import ValidationError
17
from django.db import connection, OperationalError, InterfaceError, IntegrityError
18
from django.db.models import Q, F
19
from redis_sessions.session import SessionStore
20
from tornado.websocket import WebSocketHandler
21
22
from chat.utils import extract_photo
23
24
try:
25
	from urllib.parse import urlparse  # py2
26
except ImportError:
27
	from urlparse import urlparse  # py3
28
29
from chat.settings import MAX_MESSAGE_SIZE, ALL_ROOM_ID, GENDERS, UPDATE_LAST_READ_MESSAGE
30
from chat.models import User, Message, Room, IpAddress, get_milliseconds, UserJoinedInfo, RoomUsers
31
32
PY3 = sys.version > '3'
33
34
api_url = getattr(settings, "IP_API_URL", None)
35
36
sessionStore = SessionStore()
37
38
logger = logging.getLogger(__name__)
39
40
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
41
#CONNECTION_POOL = tornadoredis.ConnectionPool(
42
#	max_connections=500,
43
#	wait_for_available=True)
44
45
46
class Actions:
47
	LOGIN = 'addOnlineUser'
48
	LOGOUT = 'removeOnlineUser'
49
	SEND_MESSAGE = 'sendMessage'
50
	PRINT_MESSAGE = 'printMessage'
51
	CALL = 'call'
52
	ROOMS = 'setRooms'
53
	REFRESH_USER = 'setOnlineUsers'
54
	GROWL_MESSAGE = 'growl'
55
	GET_MESSAGES = 'loadMessages'
56
	CREATE_DIRECT_CHANNEL = 'addDirectChannel'
57
	DELETE_ROOM = 'deleteRoom'
58
	EDIT_MESSAGE = 'editMessage'
59
	DELETE_MESSAGE = 'deleteMessage'
60
	CREATE_ROOM_CHANNEL = 'addRoom'
61
	INVITE_USER = 'inviteUser'
62
	ADD_USER = 'addUserToAll'
63
	OFFLINE_MESSAGES = 'loadOfflineMessages'
64
65
66
class VarNames:
67
	CALL_TYPE = 'type'
68
	USER = 'user'
69
	USER_ID = 'userId'
70
	TIME = 'time'
71
	CONTENT = 'content'
72
	IMG = 'image'
73
	EVENT = 'action'
74
	MESSAGE_ID = 'id'
75
	GENDER = 'sex'
76
	ROOM_NAME = 'name'
77
	ROOM_ID = 'roomId'
78
	ROOM_USERS = 'users'
79
	CHANNEL = 'channel'
80
	GET_MESSAGES_COUNT = 'count'
81
	GET_MESSAGES_HEADER_ID = 'headerId'
82
	CHANNEL_NAME = 'channel'
83
	IS_ROOM_PRIVATE = 'private'
84
	#ROOM_NAME = 'roomName'
85
	# ROOM_ID = 'roomId'
86
87
88
class CallType:
89
	OFFER = 'offer'
90
91
class HandlerNames:
92
	NAME = 'handler'
93
	CHANNELS = 'channels'
94
	CHAT = 'chat'
95
	GROWL = 'growl'
96
	WEBRTC = 'webrtc'
97
	FILE = 'file'
98
99
100
class RedisPrefix:
101
	USER_ID_CHANNEL_PREFIX = 'u'
102
	__ROOM_ONLINE__ = 'o:{}'
103
104
	@classmethod
105
	def generate_user(cls, key):
106
		return cls.USER_ID_CHANNEL_PREFIX + str(key)
107
108
RedisPrefix.DEFAULT_CHANNEL = ALL_ROOM_ID
109
110
111
class MessagesCreator(object):
112
113
	def __init__(self, *args, **kwargs):
114
		super(MessagesCreator, self).__init__(*args, **kwargs)
115
		self.sex = None
116
		self.sender_name = None
117
		self.user_id = 0  # anonymous by default
118
119
	def default(self, content, event, handler):
120
		"""
121
		:return: {"action": event, "content": content, "time": "20:48:57"}
122
		"""
123
		return {
124
			VarNames.EVENT: event,
125
			VarNames.CONTENT: content,
126
			VarNames.USER_ID: self.user_id,
127
			VarNames.TIME: get_milliseconds(),
128
			HandlerNames.NAME: handler
129
		}
130
131
	def room_online(self, online, event, channel):
132
		"""
133
		:return: {"action": event, "content": content, "time": "20:48:57"}
134
		"""
135
		room_less = self.default(online, event, HandlerNames.CHAT)
136
		room_less[VarNames.CHANNEL_NAME] = channel
137
		room_less[VarNames.USER] = self.sender_name
138
		room_less[VarNames.GENDER] = self.sex
139
		return room_less
140
141
	def offer_call(self, content, message_type):
142
		"""
143
		:return: {"action": "call", "content": content, "time": "20:48:57"}
144
		"""
145
		message = self.default(content, Actions.CALL, HandlerNames.WEBRTC)
146
		message[VarNames.CALL_TYPE] = message_type
147
		message[VarNames.USER] = self.sender_name
148
		return message
149
150
	@classmethod
151
	def create_message(cls, message):
152
		res = {
153
			VarNames.USER_ID: message.sender_id,
154
			VarNames.CONTENT: message.content,
155
			VarNames.TIME: message.time,
156
			VarNames.MESSAGE_ID: message.id,
157
		}
158
		if message.img.name:
159
			res[VarNames.IMG] = message.img.url
160
		return res
161
162
	@classmethod
163
	def create_send_message(cls, message, event=Actions.PRINT_MESSAGE):
164
		"""
165
		:param message:
166
		:return: "action": "joined", "content": {"v5bQwtWp": "alien", "tRD6emzs": "Alien"},
167
		"sex": "Alien", "user": "tRD6emzs", "time": "20:48:57"}
168
		"""
169
		res = cls.create_message(message)
170
		res[VarNames.EVENT] = event
171
		res[VarNames.CHANNEL] = message.room_id
172
		res[HandlerNames.NAME] = HandlerNames.CHAT
173
		return res
174
175
	@classmethod
176
	def get_messages(cls, messages, channel):
177
		"""
178
		:type messages: list[Messages]
179
		:type channel: str
180
		:type messages: QuerySet[Messages]
181
		"""
182
		return {
183
			VarNames.CONTENT: [cls.create_message(message) for message in messages],
184
			VarNames.EVENT: Actions.GET_MESSAGES,
185
			VarNames.CHANNEL: channel,
186
			HandlerNames.NAME: HandlerNames.CHAT
187
		}
188
189
	@property
190
	def stored_redis_user(self):
191
		return  self.user_id
192
193
	@property
194
	def channel(self):
195
		return RedisPrefix.generate_user(self.user_id)
196
197
	def subscribe_direct_channel_message(self, room_id, other_user_id):
198
		return {
199
			VarNames.EVENT: Actions.CREATE_DIRECT_CHANNEL,
200
			VarNames.ROOM_ID: room_id,
201
			VarNames.ROOM_USERS: [self.user_id, other_user_id],
202
			HandlerNames.NAME: HandlerNames.CHANNELS
203
		}
204
205
	def subscribe_room_channel_message(self, room_id, room_name):
206
		return {
207
			VarNames.EVENT: Actions.CREATE_ROOM_CHANNEL,
208
			VarNames.ROOM_ID: room_id,
209
			VarNames.ROOM_USERS: [self.user_id],
210
			HandlerNames.NAME: HandlerNames.CHANNELS,
211
			VarNames.ROOM_NAME: room_name
212
		}
213
214
	def invite_room_channel_message(self, room_id, user_id, room_name, users):
215
		return {
216
			VarNames.EVENT: Actions.INVITE_USER,
217
			VarNames.ROOM_ID: room_id,
218
			VarNames.USER_ID: user_id,
219
			HandlerNames.NAME: HandlerNames.CHANNELS,
220
			VarNames.ROOM_NAME: room_name,
221
			VarNames.CONTENT: users
222
		}
223
224
	def add_user_to_room(self, channel, user_id, content):
225
		return {
226
			VarNames.EVENT: Actions.ADD_USER,
227
			VarNames.CHANNEL: channel,
228
			VarNames.USER_ID: user_id,
229
			HandlerNames.NAME: HandlerNames.CHAT,
230
			VarNames.GENDER: content[VarNames.GENDER], # SEX: 'Alien', USER: 'Andrew'
231
			VarNames.USER: content[VarNames.USER] # SEX: 'Alien', USER: 'Andrew'
232
		}
233
234
	def unsubscribe_direct_message(self, room_id):
235
		return {
236
			VarNames.EVENT: Actions.DELETE_ROOM,
237
			VarNames.ROOM_ID: room_id,
238
			VarNames.USER_ID: self.user_id,
239
			HandlerNames.NAME: HandlerNames.CHANNELS,
240
			VarNames.TIME: get_milliseconds()
241
		}
242
243
	def load_offline_message(self, offline_messages, channel_key):
244
		res = self.default(offline_messages, Actions.OFFLINE_MESSAGES, HandlerNames.CHAT)
245
		res[VarNames.CHANNEL] = channel_key
246
		return res
247
248
249
class MessagesHandler(MessagesCreator):
250
251
	def __init__(self, *args, **kwargs):
252
		self.parsable_prefix = 'p'
253
		super(MessagesHandler, self).__init__(*args, **kwargs)
254
		self.id = id(self)
255
		self.log_id = str(self.id % 10000).rjust(4, '0')
256
		self.ip = None
257
		from chat import global_redis
258
		self.async_redis_publisher = global_redis.async_redis_publisher
259
		self.sync_redis = global_redis.sync_redis
260
		self.channels = []
261
		self.call_receiver_channel = None
262
		self.logger = None
263
		self.async_redis = tornadoredis.Client()
264
		self.pre_process_message = {
265
			Actions.GET_MESSAGES: self.process_get_messages,
266
			Actions.SEND_MESSAGE: self.process_send_message,
267
			Actions.CALL: self.process_call,
268
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
269
			Actions.DELETE_ROOM: self.delete_channel,
270
			Actions.EDIT_MESSAGE: self.edit_message,
271
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
272
			Actions.INVITE_USER: self.invite_user,
273
		}
274
		self.post_process_message = {
275
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
276
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
277
			Actions.DELETE_ROOM: self.send_client_delete_channel,
278
			Actions.INVITE_USER: self.send_client_new_channel,
279
			Actions.CALL: self.set_opponent_call_channel
280
		}
281
282
	@tornado.gen.engine
283
	def listen(self, channels):
284
		yield tornado.gen.Task(
285
			self.async_redis.subscribe, channels)
286
		self.async_redis.listen(self.new_message)
287
288
	@tornado.gen.engine
289
	def add_channel(self, channel):
290
		self.channels.append(channel)
291
		yield tornado.gen.Task(
292
			self.async_redis.subscribe, (channel,))
293
294
	def do_db(self, callback, *args, **kwargs):
295
		try:
296
			return callback(*args, **kwargs)
297
		except (OperationalError, InterfaceError) as e:  # Connection has gone away
298
			self.logger.warning('%s, reconnecting' % e)  # TODO
299
			connection.close()
300
			return callback(*args, **kwargs)
301
302
	def execute_query(self, query, *args, **kwargs):
303
		cursor = connection.cursor()
304
		cursor.execute(query, *args, **kwargs)
305
		return cursor.fetchall()
306
307
	def get_online_from_redis(self, channel, check_user_id=None, check_hash=None):
308
		"""
309
		:rtype : dict
310
		returns (dict, bool) if check_type is present
311
		"""
312
		online = self.sync_redis.hgetall(channel)
313
		self.logger.debug('!! channel %s redis online: %s', channel, online)
314
		result = set()
315
		user_is_online = False
316
		# redis stores REDIS_USER_FORMAT, so parse them
317
		if online:
318
			for key_hash, raw_user_id in online.items():  # py2 iteritems
319
				user_id = int(raw_user_id.decode('utf-8'))
320
				if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')):
321
					user_is_online = True
322
				result.add(user_id)
323
		result = list(result)
324
		return (result, user_is_online) if check_user_id else result
325
326
	def add_online_user(self, room_id, offline_messages=None):
327
		"""
328
		adds to redis
329
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
330
		:return:
331
		"""
332
		online = self.get_online_from_redis(room_id)
333
		self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user)
334
		if self.user_id not in online:  # if a new tab has been opened
335
			online.append(self.user_id)
336
			online_user_names_mes = self.room_online(
337
				online,
338
				Actions.LOGIN,
339
				room_id
340
			)
341
			self.logger.info('!! First tab, sending refresh online for all')
342
			self.publish(online_user_names_mes, room_id)
343
			if offline_messages:
344
				self.safe_write(self.load_offline_message(offline_messages, room_id))
345
		else:  # Send user names to self
346
			online_user_names_mes = self.room_online(
347
				online,
348
				Actions.REFRESH_USER,
349
				room_id
350
			)
351
			self.logger.info('!! Second tab, retrieving online for self')
352
			self.safe_write(online_user_names_mes)
353
354
	def publish(self, message, channel, parsable=False):
355
		jsoned_mess = json.dumps(message)
356
		self.logger.debug('<%s> %s', channel, jsoned_mess)
357
		if parsable:
358
			jsoned_mess = self.encode(jsoned_mess)
359
		self.async_redis_publisher.publish(channel, jsoned_mess)
360
361
	def encode(self, message):
362
		"""
363
		Marks message with prefix to specify that
364
		it should be decoded and proccesed before sending to client
365
		@param message: message to mark
366
		@return: marked message
367
		"""
368
		return self.parsable_prefix + message
369
370
	def decode(self, message):
371
		"""
372
		Check if message should be proccessed by server before writing to client
373
		@param message: message to check
374
		@return: Object structure of message if it should be processed, None if not
375
		"""
376
		if message.startswith(self.parsable_prefix):
377
			return json.loads(message[1:])
378
379
	def new_message(self, message):
380
		data = message.body
381
		if type(data) is not int:  # subscribe event
382
			decoded = self.decode(data)
383
			if decoded:
384
				data = decoded
385
			self.safe_write(data)
386
			if decoded:
387
				self.post_process_message[decoded[VarNames.EVENT]](decoded)
388
389
	def safe_write(self, message):
390
		raise NotImplementedError('WebSocketHandler implements')
391
392
	def process_send_message(self, message):
393
		"""
394
		:type message: dict
395
		"""
396
		channel = message[VarNames.CHANNEL]
397
		message_db = Message(
398
			sender_id=self.user_id,
399
			content=message[VarNames.CONTENT]
400
		)
401
		message_db.room_id = channel
402
		if VarNames.IMG in message:
403
			message_db.img = extract_photo(message[VarNames.IMG])
404
		self.do_db(message_db.save)  # exit on hacked id with exception
405
		prepared_message = self.create_send_message(message_db)
406
		self.publish(prepared_message, channel)
407
408
	def process_call(self, in_message):
409
		"""
410
		:type in_message: dict
411
		"""
412
		call_type = in_message.get(VarNames.CALL_TYPE)
413
		set_opponent_channel = False
414
		out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type)
415
		if call_type == CallType.OFFER:
416
			room_id = in_message[VarNames.CHANNEL]
417
			user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True))
418
			self.call_receiver_channel = RedisPrefix.generate_user(user.user_id)
419
			set_opponent_channel = True
420
			out_message[VarNames.CHANNEL] = room_id
421
		# TODO
422
		self.logger.info('!! Offering a call to user with id %s',  self.call_receiver_channel)
423
		self.publish(out_message, self.call_receiver_channel, set_opponent_channel)
424
425
	def create_new_room(self, message):
426
		room_name = message[VarNames.ROOM_NAME]
427
		if not room_name or len(room_name) > 16:
428
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
429
		room = Room(name=room_name)
430
		self.do_db(room.save)
431
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
432
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
433
		self.publish(subscribe_message, self.channel, True)
434
435
	def invite_user(self, message):
436
		room_id = message[VarNames.ROOM_ID]
437
		user_id = message[VarNames.USER_ID]
438
		if room_id not in self.channels:
439
			raise ValidationError("Access denied, only allowed for channels {}".format(self.channels))
440
		room = self.do_db(Room.objects.get, id=room_id)
441
		if room.is_private:
442
			raise ValidationError("You can't add users to direct room, create a new room instead")
443
		try:
444
			Room.users.through.objects.create(room_id=room_id, user_id=user_id)
445
		except IntegrityError:
446
			raise ValidationError("User is already in channel")
447
		users_in_room = {}
448
		for user in room.users.all():
449
			self.set_js_user_structure(users_in_room, user.id, user.username, user.sex)
450
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
451
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
452
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
453
454
	def create_user_channel(self, message):
455
		user_id = message[VarNames.USER_ID]
456
		# get all self private rooms ids
457
		user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id')
458
		# get private room that contains another user from rooms above
459
		query_res = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms).values('room__id', 'room__disabled')
460
		if len(query_res) > 0:
461
			room = query_res[0]
462
			room_id = room['room__id']
463
			disabled = room['room__disabled']
464
			if not disabled:
465
				raise ValidationError('This room already exist')
466
			else:
467
				Room.objects.filter(id=room_id).update(disabled=False)
468
		else:
469
			room = Room()
470
			room.save()
471
			room_id = room.id
472
			RoomUsers.objects.bulk_create([
473
				RoomUsers(user_id=user_id, room_id=room_id),
474
				RoomUsers(user_id=self.user_id, room_id=room_id),
475
			])
476
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
477
		self.publish(subscribe_message, self.channel, True)
478
		other_channel = RedisPrefix.generate_user(user_id)
479
		if self.channel != other_channel:
480
			self.publish(subscribe_message, other_channel, True)
481
482
	def delete_channel(self, message):
483
		room_id = message[VarNames.ROOM_ID]
484
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
485
			raise ValidationError('You are not allowed to exit this room')
486
		room = self.do_db(Room.objects.get, id=room_id)
487
		if room.disabled:
488
			raise ValidationError('Room is already deleted')
489
		if room.name is None:  # if private then disable
490
			room.disabled = True
491
		else: # if public -> leave the room, delete the link
492
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
493
			online = self.get_online_from_redis(room_id)
494
			online.remove(self.user_id)
495
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
496
		room.save()
497
		message = self.unsubscribe_direct_message(room_id)
498
		self.publish(message, room_id, True)
499
500
	def edit_message(self, data):
501
		message_id = data[VarNames.MESSAGE_ID]
502
		message = Message.objects.get(id=message_id)
503
		if message.sender_id != self.user_id:
504
			raise ValidationError("You can only edit your messages")
505
		if message.time + 60000 < get_milliseconds():
506
			raise ValidationError("You can only edit messages that were send not more than 1 min ago")
507
		if message.deleted:
508
			raise ValidationError("Already deleted")
509
		message.content = data[VarNames.CONTENT]
510
		selector = Message.objects.filter(id=message_id)
511
		if message.content is None:
512
			selector.update(deleted=True)
513
			action = Actions.DELETE_MESSAGE
514
		else:
515
			action = Actions.EDIT_MESSAGE
516
			selector.update(content=message.content)
517
		self.publish(self.create_send_message(message, action), message.room_id)
518
519
	def send_client_new_channel(self, message):
520
		room_id = message[VarNames.ROOM_ID]
521
		self.add_channel(room_id)
522
		self.add_online_user(room_id)# TODO doesnt work if already subscribed
523
524
	def set_opponent_call_channel(self, message):
525
		self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID])
526
527
	def send_client_delete_channel(self, message):
528
		room_id = message[VarNames.ROOM_ID]
529
		self.async_redis.unsubscribe((room_id,))
530
		self.async_redis_publisher.hdel(room_id, self.id)
531
		self.channels.remove(room_id)
532
533
	def process_get_messages(self, data):
534
		"""
535
		:type data: dict
536
		"""
537
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
538
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
539
		room_id = data[VarNames.CHANNEL]
540
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
541
		if header_id is None:
542
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
543
		else:
544
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
545
		response = self.do_db(self.get_messages, messages, room_id)
546
		self.safe_write(response)
547
548
	def get_offline_messages(self):
549
		res = {}
550
		offline_messages = Message.objects.filter(
551
			id__gt=F('room__roomusers__last_read_message_id'),
552
			deleted=False,
553
			room__roomusers__user_id=self.user_id
554
		)
555
		for message in offline_messages:
556
			res.setdefault(message.room_id, []).append(self.create_message(message))
557
		return res
558
559
	def get_users_in_current_user_rooms(self):
560
		"""
561
		{
562
			"ROOM_ID:1": {
563
				"name": "All",
564
				"users": {
565
					"USER_ID:admin": {
566
						"name": "USER_NAME:admin",
567
						"sex": "SEX:Secret"
568
					},
569
					"USER_ID_2": {
570
						"name": "USER_NAME:Mike",
571
						"sex": "Male"
572
					}
573
				},
574
				"isPrivate": true
575
			}
576
		}
577
		"""
578
		user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name')
579
		res = {room['id']: {
580
				VarNames.ROOM_NAME: room['name'],
581
				VarNames.ROOM_USERS: {}
582
			} for room in user_rooms}
583
		room_ids = (room_id for room_id in res)
584
		rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id')
585
		for user in rooms_users:
586
			self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex'])
587
		return res
588
589
	def set_js_user_structure(self, user_dict, user_id, name, sex):
590
		user_dict[user_id] = {
591
			VarNames.USER: name,
592
			VarNames.GENDER: GENDERS[sex]
593
		}
594
595
	def save_ip(self):
596
		if (self.do_db(UserJoinedInfo.objects.filter(
597
				Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)):
598
			return
599
		ip_address = self.get_or_create_ip()
600
		UserJoinedInfo.objects.create(
601
			ip=ip_address,
602
			user_id=self.user_id
603
		)
604
605
	def get_or_create_ip(self):
606
		try:
607
			ip_address = IpAddress.objects.get(ip=self.ip)
608
		except IpAddress.DoesNotExist:
609
			try:
610
				if not api_url:
611
					raise Exception('api url is absent')
612
				self.logger.debug("Creating ip record %s", self.ip)
613
				f = urlopen(api_url % self.ip)
614
				raw_response = f.read().decode("utf-8")
615
				response = json.loads(raw_response)
616
				if response['status'] != "success":
617
					raise Exception("Creating iprecord failed, server responded: %s" % raw_response)
618
				ip_address = IpAddress.objects.create(
619
					ip=self.ip,
620
					isp=response['isp'],
621
					country=response['country'],
622
					region=response['regionName'],
623
					city=response['city'],
624
					country_code=response['countryCode']
625
				)
626
			except Exception as e:
627
				self.logger.error("Error while creating ip with country info, because %s", e)
628
				ip_address = IpAddress.objects.create(ip=self.ip)
629
		return ip_address
630
631
632
class AntiSpam(object):
633
634
	def __init__(self):
635
		self.spammed = 0
636
		self.info = {}
637
638
	def check_spam(self, json_message):
639
		message_length = len(json_message)
640
		info_key = int(round(time.time() * 100))
641
		self.info[info_key] = message_length
642
		if message_length > MAX_MESSAGE_SIZE:
643
			self.spammed += 1
644
			raise ValidationError("Message can't exceed %d symbols" % MAX_MESSAGE_SIZE)
645
		self.check_timed_spam()
646
647
	def check_timed_spam(self):
648
		# TODO implement me
649
		pass
650
		# raise ValidationError("You're chatting too much, calm down a bit!")
651
652
653
class TornadoHandler(WebSocketHandler, MessagesHandler):
654
655
	def __init__(self, *args, **kwargs):
656
		super(TornadoHandler, self).__init__(*args, **kwargs)
657
		self.connected = False
658
		self.anti_spam = AntiSpam()
659
660
	def data_received(self, chunk):
661
		pass
662
663
	def on_message(self, json_message):
664
		try:
665
			if not self.connected:
666
				raise ValidationError('Skipping message %s, as websocket is not initialized yet' % json_message)
667
			if not json_message:
668
				raise ValidationError('Skipping null message')
669
			# self.anti_spam.check_spam(json_message)
670
			self.logger.debug('<< %s', json_message)
671
			message = json.loads(json_message)
672
			if message[VarNames.EVENT] not in self.pre_process_message:
673
				raise ValidationError("event {} is unknown".format(message[VarNames.EVENT]))
674
			channel = message.get(VarNames.CHANNEL)
675
			if channel and channel not in self.channels:
676
				raise ValidationError('Access denied for channel {}. Allowed channels: {}'.format(channel, self.channels ))
677
			self.pre_process_message[message[VarNames.EVENT]](message)
678
		except ValidationError as e:
679
			error_message = self.default(str(e.message), Actions.GROWL_MESSAGE, HandlerNames.GROWL)
680
			self.safe_write(error_message)
681
682
	def on_close(self):
683
		if self.async_redis.subscribed:
684
			self.logger.info("Close event, unsubscribing from %s", self.channels)
685
			self.async_redis.unsubscribe(self.channels)
686
		else:
687
			self.logger.info("Close event, not subscribed, channels: %s", self.channels)
688
		log_data = {}
689
		gone_offline = False
690
		for channel in self.channels:
691
			if not isinstance(channel, int):
692
				continue
693
			self.sync_redis.hdel(channel, self.id)
694
			if self.connected:
695
				# seems like async solves problem with connection lost and wrong data status
696
				# http://programmers.stackexchange.com/questions/294663/how-to-store-online-status
697
				online, is_online = self.get_online_from_redis(channel, self.user_id, self.id)
698
				log_data[channel] = {'online': online, 'is_online': is_online}
699
				if not is_online:
700
					message = self.room_online(online, Actions.LOGOUT, channel)
701
					self.publish(message, channel)
702
					gone_offline = True
703
		if gone_offline:
704
			res = self.do_db(self.execute_query, UPDATE_LAST_READ_MESSAGE, [self.user_id, ])
705
			logger.info("Updated %s last read message", res)
706
707
		self.logger.info("Close connection result: %s", json.dumps(log_data))
708
		self.async_redis.disconnect()
709
710
	def open(self):
711
		session_key = self.get_cookie(settings.SESSION_COOKIE_NAME)
712
		if sessionStore.exists(session_key):
713
			self.ip = self.get_client_ip()
714
			session = SessionStore(session_key)
715
			self.user_id = int(session["_auth_user_id"])
716
			log_params = {
717
				'user_id': str(self.user_id).zfill(3),
718
				'id': self.log_id,
719
				'ip': self.ip
720
			}
721
			self.logger = logging.LoggerAdapter(logger, log_params)
722
			self.logger.debug("!! Incoming connection, session %s, thread hash %s", session_key, self.id)
723
			self.async_redis.connect()
724
			user_db = self.do_db(User.objects.get, id=self.user_id)  # everything but 0 is a registered user
725
			self.sender_name = user_db.username
726
			self.sex = user_db.sex_str
727
			user_rooms = self.get_users_in_current_user_rooms()
728
			self.safe_write(self.default(user_rooms, Actions.ROOMS, HandlerNames.CHANNELS))
729
			# get all missed messages
730
			self.channels.clear()
731
			self.channels.append(self.channel)
732
			for room_id in user_rooms:
733
				self.channels.append(room_id)
734
			self.listen(self.channels)
735
			off_messages = self.get_offline_messages()
736
			for room_id in user_rooms:
737
				self.add_online_user(room_id, off_messages.get(room_id))
738
			self.logger.info("!! User %s subscribes for %s", self.sender_name, self.channels)
739
			self.connected = True
740
			Thread(target=self.save_ip).start()
741
		else:
742
			self.logger.warning('!! Session key %s has been rejected', str(session_key))
743
			self.close(403, "Session key %s has been rejected" % session_key)
744
745
	def check_origin(self, origin):
746
		"""
747
		check whether browser set domain matches origin
748
		"""
749
		parsed_origin = urlparse(origin)
750
		origin = parsed_origin.netloc
751
		origin_domain = origin.split(':')[0].lower()
752
		browser_set = self.request.headers.get("Host")
753
		browser_domain = browser_set.split(':')[0]
754
		return browser_domain == origin_domain
755
756
	def safe_write(self, message):
757
		"""
758
		Tries to send message, doesn't throw exception outside
759
		:type self: MessagesHandler
760
		"""
761
		# self.logger.debug('<< THREAD %s >>', os.getppid())
762
		try:
763
			if isinstance(message, dict):
764
				message = json.dumps(message)
765
			if not (isinstance(message, str) or (not PY3 and isinstance(message, unicode))):
766
				raise ValueError('Wrong message type : %s' % str(message))
767
			self.logger.debug(">> %s", message)
768
			self.write_message(message)
769
		except tornado.websocket.WebSocketClosedError as e:
770
			self.logger.error("%s. Can't send << %s >> message", e, str(message))
771
772
	def get_client_ip(self):
773
		return self.request.headers.get("X-Real-IP") or self.request.remote_ip
774