Completed
Push — master ( ec7895...07bc46 )
by Andrew
28s
created

MessagesHandler.evaluate()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 4
rs 10
1
import json
2
import logging
3
4
from django.core.exceptions import ValidationError
5
from django.db.models import Q
6
from tornado.gen import engine, Task
7
from tornadoredis import Client
8
9
from chat.log_filters import id_generator
10
from chat.models import Message, Room, RoomUsers
11
from chat.py2_3 import str_type
12
from chat.settings import ALL_ROOM_ID, TORNADO_REDIS_PORT, WEBRTC_CONNECTION
13
from chat.tornado.constants import VarNames, HandlerNames, Actions, RedisPrefix, WebRtcRedisStates
14
from chat.tornado.image_utils import process_images, prepare_img, save_images, get_message_images
15
from chat.tornado.message_creator import WebRtcMessageCreator, MessagesCreator
16
from chat.utils import get_max_key, do_db, validate_edit_message, get_or_create_room, \
17
	create_room
18
19
parent_logger = logging.getLogger(__name__)
20
base_logger = logging.LoggerAdapter(parent_logger, {
21
	'id': 0,
22
	'ip': '000.000.000.000'
23
})
24
25
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
26
# CONNECTION_POOL = tornadoredis.ConnectionPool(
27
# max_connections=500,
28
# wait_for_available=True)
29
30
31
class MessagesHandler(MessagesCreator):
32
33
	def __init__(self, *args, **kwargs):
34
		self.closed_channels = None
35
		self.parsable_prefix = 'p'
36
		super(MessagesHandler, self).__init__()
37
		self.webrtc_ids = {}
38
		self.id = None  # child init
39
		self.sex = None
40
		self.sender_name = None
41
		self.user_id = 0  # anonymous by default
42
		self.ip = None
43
		from chat import global_redis
44
		self.async_redis_publisher = global_redis.async_redis_publisher
45
		self.sync_redis = global_redis.sync_redis
46
		self.channels = []
47
		self._logger = None
48
		self.async_redis = Client(port=TORNADO_REDIS_PORT)
49
		self.patch_tornadoredis()
50
		self.pre_process_message = {
51
			Actions.GET_MESSAGES: self.process_get_messages,
52
			Actions.SEND_MESSAGE: self.process_send_message,
53
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
54
			Actions.DELETE_ROOM: self.delete_channel,
55
			Actions.EDIT_MESSAGE: self.edit_message,
56
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
57
			Actions.INVITE_USER: self.invite_user
58
		}
59
		self.post_process_message = {
60
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
61
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
62
			Actions.DELETE_ROOM: self.send_client_delete_channel,
63
			Actions.INVITE_USER: self.send_client_new_channel
64
		}
65
66
	def patch_tornadoredis(self):  # TODO remove this
67
		fabric = type(self.async_redis.connection.readline)
68
		self.async_redis.connection.old_read = self.async_redis.connection.readline
69
70
		def new_read(new_self, callback=None):
71
			try:
72
				return new_self.old_read(callback=callback)
73
			except Exception as e:
74
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
75
				self.logger.error(e)
76
				self.logger.error(
77
					"Exception info: "
78
					"self.id: %s ;;; "
79
					"self.connected = '%s';;; "
80
					"Redis default channel online = '%s';;; "
81
					"self.channels = '%s';;; "
82
					"self.closed_channels  = '%s';;;",
83
					self.id, self.connected, current_online, self.channels, self.closed_channels
84
				)
85
				raise e
86
87
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
88
89
	@property
90
	def connected(self):
91
		raise NotImplemented
92
93
	@connected.setter
94
	def connected(self, value):
95
		raise NotImplemented
96
97
	@engine
98
	def listen(self, channels):
99
		yield Task(
100
			self.async_redis.subscribe, channels)
101
		self.async_redis.listen(self.pub_sub_message)
102
103
	@property
104
	def logger(self):
105
		return self._logger if self._logger else base_logger
106
107
	@engine
108
	def add_channel(self, channel):
109
		self.channels.append(channel)
110
		yield Task(self.async_redis.subscribe, (channel,))
111
112
	def get_online_from_redis(self, channel):
113
		return self.get_online_and_status_from_redis(channel)[1]
114
115
	def get_online_and_status_from_redis(self, channel):
116
		"""
117
		:rtype : (bool, list)
118
		"""
119
		online = self.sync_redis.ssmembers(channel)
120
		self.logger.debug('!! channel %s redis online: %s', channel, online)
121
		return self.parse_redis_online(online) if online else (False, [])
122
123
	def parse_redis_online(self, online):
124
		"""
125
		:rtype : (bool, list)
126
		"""
127
		result = set()
128
		user_is_online = False
129
		for decoded in online:  # py2 iteritems
130
			# : char specified in cookies_middleware.py.create_id
131
			user_id = int(decoded.split(':')[0])
132
			if user_id == self.user_id and decoded != self.id:
133
				user_is_online = True
134
			result.add(user_id)
135
		return user_is_online, list(result)
136
137
	def add_online_user(self, room_id, offline_messages=None):
138
		"""
139
		adds to redis
140
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
141
		:return:
142
		"""
143
		self.async_redis_publisher.sadd(room_id, self.id)
144
		# since we add user to online first, latest trigger will always show correct online
145
		is_online, online = self.get_online_and_status_from_redis(room_id)
146
		if is_online:  # Send user names to self
147
			online_user_names_mes = self.room_online(online, Actions.REFRESH_USER, room_id)
148
			self.logger.info('!! Second tab, retrieving online for self')
149
			self.ws_write(online_user_names_mes)
150
		else:  # if a new tab has been opened
151
			online.append(self.user_id)
152
			online_user_names_mes = self.room_online(online, Actions.LOGIN, room_id)
153
			self.logger.info('!! First tab, sending refresh online for all')
154
			self.publish(online_user_names_mes, room_id)
155
			if offline_messages:
156
				self.ws_write(self.load_offline_message(offline_messages, room_id))
157
158
	def publish(self, message, channel, parsable=False):
159
		jsoned_mess = json.dumps(message)
160
		self.logger.debug('<%s> %s', channel, jsoned_mess)
161
		if parsable:
162
			jsoned_mess = self.encode(jsoned_mess)
163
		self.async_redis_publisher.publish(channel, jsoned_mess)
164
165
	def encode(self, message):
166
		"""
167
		Marks message with prefix to specify that
168
		it should be decoded and proccesed before sending to client
169
		@param message: message to mark
170
		@return: marked message
171
		"""
172
		return self.parsable_prefix + message
173
174
	def remove_parsable_prefix(self, message):
175
		if message.startswith(self.parsable_prefix):
176
			return message[1:]
177
178
	def pub_sub_message(self, message):
179
		data = message.body
180
		if isinstance(data, str_type):  # subscribe event
181
			prefixless_str = self.remove_parsable_prefix(data)
182
			if prefixless_str:
183
				dict_message = json.loads(prefixless_str)
184
				res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
185
				if not res:
186
					self.ws_write(prefixless_str)
187
			else:
188
				self.ws_write(data)
189
190
	def ws_write(self, message):
191
		raise NotImplementedError('WebSocketHandler implements')
192
193
	def process_send_message(self, message):
194
		"""
195
		:type message: dict
196
		"""
197
		raw_imgs = message.get(VarNames.IMG)
198
		channel = message[VarNames.CHANNEL]
199
		message_db = Message(
200
			sender_id=self.user_id,
201
			content=message[VarNames.CONTENT],
202
			symbol=get_max_key(raw_imgs)
203
		)
204
		message_db.room_id = channel
205
		do_db(message_db.save)
206
		db_images = save_images(raw_imgs, message_db.id)
207
		prepared_message = self.create_send_message(
208
			message_db,
209
			Actions.PRINT_MESSAGE,
210
			prepare_img(db_images, message_db.id)
211
		)
212
		self.publish(prepared_message, channel)
213
214
	def create_new_room(self, message):
215
		room_name = message[VarNames.ROOM_NAME]
216
		if not room_name or len(room_name) > 16:
217
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
218
		room = Room(name=room_name)
219
		do_db(room.save)
220
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
221
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
222
		self.publish(subscribe_message, self.channel, True)
223
224
	def invite_user(self, message):
225
		room_id = message[VarNames.ROOM_ID]
226
		user_id = message[VarNames.USER_ID]
227
		room = get_or_create_room(self.channels, room_id, user_id)
228
		users_in_room = {
229
			user.id: RedisPrefix.set_js_user_structure(user.username, user.sex)
230
			for user in room.users.all()
231
		}
232
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
233
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
234
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
235
236
237
	def create_user_channel(self, message):
238
		user_id = message[VarNames.USER_ID]
239
		room_id = create_room(self.user_id, user_id)
240
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
241
		self.publish(subscribe_message, self.channel, True)
242
		other_channel = RedisPrefix.generate_user(user_id)
243
		if self.channel != other_channel:
244
			self.publish(subscribe_message, other_channel, True)
245
246
	def delete_channel(self, message):
247
		room_id = message[VarNames.ROOM_ID]
248
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
249
			raise ValidationError('You are not allowed to exit this room')
250
		room = do_db(Room.objects.get, id=room_id)
251
		if room.disabled:
252
			raise ValidationError('Room is already deleted')
253
		if room.name is None:  # if private then disable
254
			room.disabled = True
255
		else:  # if public -> leave the room, delete the link
256
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
257
			online = self.get_online_from_redis(room_id)
258
			online.remove(self.user_id)
259
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
260
		room.save()
261
		message = self.unsubscribe_direct_message(room_id)
262
		self.publish(message, room_id, True)
263
264
	def edit_message(self, data):
265
		# ord(next (iter (message['images'])))
266
		message_id = data[VarNames.MESSAGE_ID]
267
		message = Message.objects.get(id=message_id)
268
		validate_edit_message(self.user_id, message)
269
		message.content = data[VarNames.CONTENT]
270
		selector = Message.objects.filter(id=message_id)
271
		if message.content is None:
272
			action = Actions.DELETE_MESSAGE
273
			prep_imgs = None
274
			selector.update(deleted=True)
275
		else:
276
			action = Actions.EDIT_MESSAGE
277
			prep_imgs = process_images(data.get(VarNames.IMG), message)
278
			selector.update(content=message.content, symbol=message.symbol)
279
		self.publish(self.create_send_message(message, action, prep_imgs), message.room_id)
280
281
	def send_client_new_channel(self, message):
282
		room_id = message[VarNames.ROOM_ID]
283
		self.add_channel(room_id)
284
		self.add_online_user(room_id)
285
286
	def send_client_delete_channel(self, message):
287
		room_id = message[VarNames.ROOM_ID]
288
		self.async_redis.unsubscribe((room_id,))
289
		self.async_redis_publisher.hdel(room_id, self.id)
290
		self.channels.remove(room_id)
291
292
	def process_get_messages(self, data):
293
		"""
294
		:type data: dict
295
		"""
296
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
297
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
298
		room_id = data[VarNames.CHANNEL]
299
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
300
		if header_id is None:
301
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
302
		else:
303
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
304
		images = do_db(get_message_images, messages)
305
		response = self.get_messages(messages, room_id, images)
306
		self.ws_write(response)
307
308
309
class WebRtcMessageHandler(MessagesHandler, WebRtcMessageCreator):
310
311
	def __init__(self, *args, **kwargs):
312
		super(WebRtcMessageHandler, self).__init__(*args, **kwargs)
313
		self.pre_process_message.update({
314
			Actions.WEBRTC: self.proxy_webrtc,
315
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
316
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
317
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
318
			Actions.ACCEPT_CALL: self.accept_call,
319
			Actions.ACCEPT_FILE: self.accept_file,
320
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
321
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
322
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
323
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
324
		})
325
		self.post_process_message.update({
326
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
327
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
328
		})
329
330
	def set_opponent_call_channel(self, message):
331
		connection_id = message[VarNames.CONNECTION_ID]
332
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
333
			return True
334
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
335
336
	def offer_webrtc_connection(self, in_message):
337
		room_id = in_message[VarNames.CHANNEL]
338
		content = in_message.get(VarNames.CONTENT)
339
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
340
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
341
		# use list because sets dont have 1st element which is offerer
342
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
343
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
344
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
345
		self_message = self.set_connection_id(qued_id, connection_id)
346
		self.ws_write(self_message)
347
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
348
		self.publish(opponents_message, room_id, True)
349
350
	def reply_file_connection(self, in_message):
351
		connection_id = in_message[VarNames.CONNECTION_ID]
352
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
353
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
354
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
355
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
356
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
357
			self.publish(self.reply_webrtc(
358
				Actions.REPLY_FILE_CONNECTION,
359
				connection_id,
360
				HandlerNames.WEBRTC_TRANSFER
361
			), sender_ws_id)
362
		else:
363
			raise ValidationError("Invalid channel status.")
364
365
	def reply_call_connection(self, in_message):
366
		self.send_call_answer(
367
			in_message,
368
			WebRtcRedisStates.RESPONDED,
369
			Actions.REPLY_CALL_CONNECTION,
370
			[WebRtcRedisStates.OFFERED],
371
			HandlerNames.WEBRTC_TRANSFER
372
		)
373
374
	def proxy_webrtc(self, in_message):
375
		"""
376
		:type in_message: dict
377
		"""
378
		connection_id = in_message[VarNames.CONNECTION_ID]
379
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
380
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
381
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
382
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
383
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
384
				self_channel_status, opponent_channel_status
385
			))  # todo receiver should only accept proxy_webrtc from sender, sender can accept all
386
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
387
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
388
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
389
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
390
		self.logger.debug(
391
			"Forwarding message to channel %s, self %s, other status %s",
392
			channel,
393
			self_channel_status,
394
			opponent_channel_status
395
		)
396
		self.publish(in_message, channel)
397
398
	def close_file_connection(self, in_message):
399
		connection_id = in_message[VarNames.CONNECTION_ID]
400
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
401
		if not self_channel_status:
402
			raise Exception("Access Denied")
403
		if self_channel_status != WebRtcRedisStates.CLOSED:
404
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
405
			if sender_id == self.id:
406
				self.close_file_sender(connection_id)
407
			else:
408
				self.close_file_receiver(connection_id, in_message, sender_id)
409
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
410
411
	def close_call_connection(self, in_message):
412
		self.send_call_answer(
413
			in_message,
414
			WebRtcRedisStates.CLOSED,
415
			Actions.CLOSE_CALL_CONNECTION,
416
			[WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED],
417
			HandlerNames.PEER_CONNECTION
418
		)
419
420
	def cancel_call_connection(self, in_message):
421
		self.send_call_answer(
422
			in_message,
423
			WebRtcRedisStates.CLOSED,
424
			Actions.CANCEL_CALL_CONNECTION,
425
			[WebRtcRedisStates.OFFERED],
426
			HandlerNames.WEBRTC_TRANSFER
427
		)
428
429
	def close_file_receiver(self, connection_id, in_message, sender_id):
430
		sender_status = self.sync_redis.shget(connection_id, sender_id)
431
		if not sender_status:
432
			raise Exception("Access denied")
433
		if sender_status != WebRtcRedisStates.CLOSED:
434
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
435
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
436
			self.publish(in_message, sender_id)
437
438
	def close_file_sender(self, connection_id):
439
		values = self.sync_redis.shgetall(connection_id)
440
		del values[self.id]
441
		message = self.get_close_file_sender_message(connection_id)
442
		for ws_id in values:
443
			if values[ws_id] == WebRtcRedisStates.CLOSED:
444
				continue
445
			self.publish(message, ws_id)
446
447
	def accept_file(self, in_message):
448
		connection_id = in_message[VarNames.CONNECTION_ID]
449
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
450
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
451
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
452
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.RESPONDED:
453
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
454
			self.publish(self.get_accept_file_message(connection_id), sender_ws_id)
455
		else:
456
			raise ValidationError("Invalid channel status")
457
458
	# todo
459
	# we can use channel_status = self.sync_redis.shgetall(connection_id)
460
	# and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
461
	# if we shgetall and only then do async hset
462
	# we can catch an issue when 2 concurrent users accepted the call
463
	# but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
464
	def accept_call(self, in_message):
465
		connection_id = in_message[VarNames.CONNECTION_ID]
466
		self_status = self.sync_redis.shget(connection_id, self.id)
467
		if self_status == WebRtcRedisStates.RESPONDED:
468
			conn_users = self.sync_redis.shgetall(connection_id)
469
			self.publish_call_answer(
470
				conn_users,
471
				connection_id,
472
				HandlerNames.WEBRTC_TRANSFER,
473
				Actions.ACCEPT_CALL,
474
				WebRtcRedisStates.READY
475
			)
476
		else:
477
			raise ValidationError("Invalid channel status")
478
479
	def send_call_answer(self, in_message, status_set, reply_action, allowed_state, message_handler):
480
		connection_id = in_message[VarNames.CONNECTION_ID]
481
		conn_users = self.sync_redis.shgetall(connection_id)
482
		if conn_users[self.id] in allowed_state:
483
			self.publish_call_answer(conn_users, connection_id, message_handler, reply_action, status_set)
484
		else:
485
			raise ValidationError("Invalid channel status.")
486
487
	def publish_call_answer(self, conn_users, connection_id, message_handler, reply_action, status_set):
488
		self.async_redis_publisher.hset(connection_id, self.id, status_set)
489
		del conn_users[self.id]
490
		message = self.reply_webrtc(reply_action, connection_id, message_handler)
491
		for user in conn_users:
492
			if conn_users[user] != WebRtcRedisStates.CLOSED:
493
				self.publish(message, user)