Completed
Push — master ( 78c1f7...a4dd93 )
by Andrew
26s
created

MessagesHandler.respond_ping()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 1
b 0
f 0
1
import json
2
import logging
3
4
from django.core.exceptions import ValidationError
5
from django.db.models import Q
6
from tornado.gen import engine, Task
7
from tornadoredis import Client
8
9
from chat.log_filters import id_generator
10
from chat.models import Message, Room, RoomUsers
11
from chat.py2_3 import str_type
12
from chat.settings import ALL_ROOM_ID, TORNADO_REDIS_PORT, WEBRTC_CONNECTION
13
from chat.tornado.constants import VarNames, HandlerNames, Actions, RedisPrefix, WebRtcRedisStates
14
from chat.tornado.image_utils import process_images, prepare_img, save_images, get_message_images
15
from chat.tornado.message_creator import WebRtcMessageCreator, MessagesCreator
16
from chat.utils import get_max_key, do_db, validate_edit_message, get_or_create_room, \
17
	create_room
18
19
parent_logger = logging.getLogger(__name__)
20
base_logger = logging.LoggerAdapter(parent_logger, {
21
	'id': 0,
22
	'ip': '000.000.000.000'
23
})
24
25
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
26
# CONNECTION_POOL = tornadoredis.ConnectionPool(
27
# max_connections=500,
28
# wait_for_available=True)
29
30
31
class MessagesHandler(MessagesCreator):
32
33
	def __init__(self, *args, **kwargs):
34
		self.closed_channels = None
35
		self.parsable_prefix = 'p'
36
		super(MessagesHandler, self).__init__()
37
		self.webrtc_ids = {}
38
		self.id = None  # child init
39
		self.sex = None
40
		self.sender_name = None
41
		self.user_id = 0  # anonymous by default
42
		self.ip = None
43
		from chat import global_redis
44
		self.async_redis_publisher = global_redis.async_redis_publisher
45
		self.sync_redis = global_redis.sync_redis
46
		self.channels = []
47
		self._logger = None
48
		self.async_redis = Client(port=TORNADO_REDIS_PORT)
49
		self.patch_tornadoredis()
50
		self.pre_process_message = {
51
			Actions.GET_MESSAGES: self.process_get_messages,
52
			Actions.SEND_MESSAGE: self.process_send_message,
53
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
54
			Actions.DELETE_ROOM: self.delete_channel,
55
			Actions.EDIT_MESSAGE: self.edit_message,
56
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
57
			Actions.INVITE_USER: self.invite_user,
58
			Actions.PING: self.respond_ping
59
		}
60
		self.post_process_message = {
61
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
62
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
63
			Actions.DELETE_ROOM: self.send_client_delete_channel,
64
			Actions.INVITE_USER: self.send_client_new_channel
65
		}
66
67
	def patch_tornadoredis(self):  # TODO remove this
68
		fabric = type(self.async_redis.connection.readline)
69
		self.async_redis.connection.old_read = self.async_redis.connection.readline
70
71
		def new_read(new_self, callback=None):
72
			try:
73
				return new_self.old_read(callback=callback)
74
			except Exception as e:
75
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
76
				self.logger.error(e)
77
				self.logger.error(
78
					"Exception info: "
79
					"self.id: %s ;;; "
80
					"self.connected = '%s';;; "
81
					"Redis default channel online = '%s';;; "
82
					"self.channels = '%s';;; "
83
					"self.closed_channels  = '%s';;;",
84
					self.id, self.connected, current_online, self.channels, self.closed_channels
85
				)
86
				raise e
87
88
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
89
90
	@property
91
	def connected(self):
92
		raise NotImplemented
93
94
	@connected.setter
95
	def connected(self, value):
96
		raise NotImplemented
97
98
	@engine
99
	def listen(self, channels):
100
		yield Task(
101
			self.async_redis.subscribe, channels)
102
		self.async_redis.listen(self.pub_sub_message)
103
104
	@property
105
	def logger(self):
106
		return self._logger if self._logger else base_logger
107
108
	@engine
109
	def add_channel(self, channel):
110
		self.channels.append(channel)
111
		yield Task(self.async_redis.subscribe, (channel,))
112
113
	def get_online_from_redis(self, channel):
114
		return self.get_online_and_status_from_redis(channel)[1]
115
116
	def get_online_and_status_from_redis(self, channel):
117
		"""
118
		:rtype : (bool, list)
119
		"""
120
		online = self.sync_redis.ssmembers(channel)
121
		self.logger.debug('!! channel %s redis online: %s', channel, online)
122
		return self.parse_redis_online(online) if online else (False, [])
123
124
	def parse_redis_online(self, online):
125
		"""
126
		:rtype : (bool, list)
127
		"""
128
		result = set()
129
		user_is_online = False
130
		for decoded in online:  # py2 iteritems
131
			# : char specified in cookies_middleware.py.create_id
132
			user_id = int(decoded.split(':')[0])
133
			if user_id == self.user_id and decoded != self.id:
134
				user_is_online = True
135
			result.add(user_id)
136
		return user_is_online, list(result)
137
138
	def add_online_user(self, room_id, offline_messages=None):
139
		"""
140
		adds to redis
141
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
142
		:return:
143
		"""
144
		self.async_redis_publisher.sadd(room_id, self.id)
145
		# since we add user to online first, latest trigger will always show correct online
146
		is_online, online = self.get_online_and_status_from_redis(room_id)
147
		if is_online:  # Send user names to self
148
			online_user_names_mes = self.room_online(online, Actions.REFRESH_USER, room_id)
149
			self.logger.info('!! Second tab, retrieving online for self')
150
			self.ws_write(online_user_names_mes)
151
		else:  # if a new tab has been opened
152
			online.append(self.user_id)
153
			online_user_names_mes = self.room_online(online, Actions.LOGIN, room_id)
154
			self.logger.info('!! First tab, sending refresh online for all')
155
			self.publish(online_user_names_mes, room_id)
156
			if offline_messages:
157
				self.ws_write(self.load_offline_message(offline_messages, room_id))
158
159
	def publish(self, message, channel, parsable=False):
160
		jsoned_mess = json.dumps(message)
161
		self.logger.debug('<%s> %s', channel, jsoned_mess)
162
		if parsable:
163
			jsoned_mess = self.encode(jsoned_mess)
164
		self.async_redis_publisher.publish(channel, jsoned_mess)
165
166
	def encode(self, message):
167
		"""
168
		Marks message with prefix to specify that
169
		it should be decoded and proccesed before sending to client
170
		@param message: message to mark
171
		@return: marked message
172
		"""
173
		return self.parsable_prefix + message
174
175
	def remove_parsable_prefix(self, message):
176
		if message.startswith(self.parsable_prefix):
177
			return message[1:]
178
179
	def pub_sub_message(self, message):
180
		data = message.body
181
		if isinstance(data, str_type):  # subscribe event
182
			prefixless_str = self.remove_parsable_prefix(data)
183
			if prefixless_str:
184
				dict_message = json.loads(prefixless_str)
185
				res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
186
				if not res:
187
					self.ws_write(prefixless_str)
188
			else:
189
				self.ws_write(data)
190
191
	def ws_write(self, message):
192
		raise NotImplementedError('WebSocketHandler implements')
193
194
	def process_send_message(self, message):
195
		"""
196
		:type message: dict
197
		"""
198
		raw_imgs = message.get(VarNames.IMG)
199
		channel = message[VarNames.CHANNEL]
200
		message_db = Message(
201
			sender_id=self.user_id,
202
			content=message[VarNames.CONTENT],
203
			symbol=get_max_key(raw_imgs)
204
		)
205
		message_db.room_id = channel
206
		do_db(message_db.save)
207
		db_images = save_images(raw_imgs, message_db.id)
208
		prepared_message = self.create_send_message(
209
			message_db,
210
			Actions.PRINT_MESSAGE,
211
			prepare_img(db_images, message_db.id)
212
		)
213
		self.publish(prepared_message, channel)
214
215
	def create_new_room(self, message):
216
		room_name = message[VarNames.ROOM_NAME]
217
		if not room_name or len(room_name) > 16:
218
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
219
		room = Room(name=room_name)
220
		do_db(room.save)
221
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
222
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
223
		self.publish(subscribe_message, self.channel, True)
224
225
	def invite_user(self, message):
226
		room_id = message[VarNames.ROOM_ID]
227
		user_id = message[VarNames.USER_ID]
228
		room = get_or_create_room(self.channels, room_id, user_id)
229
		users_in_room = {
230
			user.id: RedisPrefix.set_js_user_structure(user.username, user.sex)
231
			for user in room.users.all()
232
		}
233
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
234
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
235
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
236
237
	def respond_ping(self, message):
238
		self.ws_write(self.responde_pong())
239
240
	def create_user_channel(self, message):
241
		user_id = message[VarNames.USER_ID]
242
		room_id = create_room(self.user_id, user_id)
243
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
244
		self.publish(subscribe_message, self.channel, True)
245
		other_channel = RedisPrefix.generate_user(user_id)
246
		if self.channel != other_channel:
247
			self.publish(subscribe_message, other_channel, True)
248
249
	def delete_channel(self, message):
250
		room_id = message[VarNames.ROOM_ID]
251
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
252
			raise ValidationError('You are not allowed to exit this room')
253
		room = do_db(Room.objects.get, id=room_id)
254
		if room.disabled:
255
			raise ValidationError('Room is already deleted')
256
		if room.name is None:  # if private then disable
257
			room.disabled = True
258
		else:  # if public -> leave the room, delete the link
259
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
260
			online = self.get_online_from_redis(room_id)
261
			online.remove(self.user_id)
262
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
263
		room.save()
264
		message = self.unsubscribe_direct_message(room_id)
265
		self.publish(message, room_id, True)
266
267
	def edit_message(self, data):
268
		# ord(next (iter (message['images'])))
269
		message_id = data[VarNames.MESSAGE_ID]
270
		message = Message.objects.get(id=message_id)
271
		validate_edit_message(self.user_id, message)
272
		message.content = data[VarNames.CONTENT]
273
		selector = Message.objects.filter(id=message_id)
274
		if message.content is None:
275
			action = Actions.DELETE_MESSAGE
276
			prep_imgs = None
277
			selector.update(deleted=True)
278
		else:
279
			action = Actions.EDIT_MESSAGE
280
			prep_imgs = process_images(data.get(VarNames.IMG), message)
281
			selector.update(content=message.content, symbol=message.symbol)
282
		self.publish(self.create_send_message(message, action, prep_imgs), message.room_id)
283
284
	def send_client_new_channel(self, message):
285
		room_id = message[VarNames.ROOM_ID]
286
		self.add_channel(room_id)
287
		self.add_online_user(room_id)
288
289
	def send_client_delete_channel(self, message):
290
		room_id = message[VarNames.ROOM_ID]
291
		self.async_redis.unsubscribe((room_id,))
292
		self.async_redis_publisher.hdel(room_id, self.id)
293
		self.channels.remove(room_id)
294
295
	def process_get_messages(self, data):
296
		"""
297
		:type data: dict
298
		"""
299
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
300
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
301
		room_id = data[VarNames.CHANNEL]
302
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
303
		if header_id is None:
304
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
305
		else:
306
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
307
		images = do_db(get_message_images, messages)
308
		response = self.get_messages(messages, room_id, images)
309
		self.ws_write(response)
310
311
312
class WebRtcMessageHandler(MessagesHandler, WebRtcMessageCreator):
313
314
	def __init__(self, *args, **kwargs):
315
		super(WebRtcMessageHandler, self).__init__(*args, **kwargs)
316
		self.pre_process_message.update({
317
			Actions.WEBRTC: self.proxy_webrtc,
318
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
319
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
320
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
321
			Actions.ACCEPT_CALL: self.accept_call,
322
			Actions.ACCEPT_FILE: self.accept_file,
323
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
324
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
325
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
326
			Actions.RETRY_FILE_CONNECTION: self.retry_file_connection,
327
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
328
		})
329
		self.post_process_message.update({
330
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
331
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
332
		})
333
334
	def set_opponent_call_channel(self, message):
335
		connection_id = message[VarNames.CONNECTION_ID]
336
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
337
			return True
338
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
339
340
	def offer_webrtc_connection(self, in_message):
341
		room_id = in_message[VarNames.CHANNEL]
342
		content = in_message.get(VarNames.CONTENT)
343
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
344
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
345
		# use list because sets dont have 1st element which is offerer
346
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
347
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
348
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
349
		self_message = self.set_connection_id(qued_id, connection_id)
350
		self.ws_write(self_message)
351
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
352
		self.publish(opponents_message, room_id, True)
353
354
	def retry_file_connection(self, in_message):
355
		connection_id = in_message[VarNames.CONNECTION_ID]
356
		opponent_ws_id = in_message[VarNames.WEBRTC_OPPONENT_ID]
357
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
358
		receiver_ws_status = self.sync_redis.shget(connection_id, opponent_ws_id)
359
		if receiver_ws_status == WebRtcRedisStates.READY and self.id == sender_ws_id:
360
			self.publish(self.retry_file(connection_id), opponent_ws_id)
361 View Code Duplication
		else:
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
362
			raise ValidationError("Invalid channel status.")
363
364
	def reply_file_connection(self, in_message):
365
		connection_id = in_message[VarNames.CONNECTION_ID]
366
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
367
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
368
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
369
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
370
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
371
			self.publish(self.reply_webrtc(
372
				Actions.REPLY_FILE_CONNECTION,
373
				connection_id,
374
				HandlerNames.WEBRTC_TRANSFER,
375
				in_message[VarNames.CONTENT]
376
			), sender_ws_id)
377
		else:
378
			raise ValidationError("Invalid channel status.")
379
380
	def reply_call_connection(self, in_message):
381
		self.send_call_answer(
382
			in_message,
383
			WebRtcRedisStates.RESPONDED,
384
			Actions.REPLY_CALL_CONNECTION,
385
			[WebRtcRedisStates.OFFERED],
386
			HandlerNames.WEBRTC_TRANSFER
387
		)
388
389
	def proxy_webrtc(self, in_message):
390
		"""
391
		:type in_message: dict
392
		"""
393
		connection_id = in_message[VarNames.CONNECTION_ID]
394
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
395
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
396
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
397
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
398
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
399
				self_channel_status, opponent_channel_status
400
			))  # todo receiver should only accept proxy_webrtc from sender, sender can accept all
401
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
402
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
403
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
404
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
405
		self.logger.debug(
406
			"Forwarding message to channel %s, self %s, other status %s",
407
			channel,
408
			self_channel_status,
409
			opponent_channel_status
410
		)
411
		self.publish(in_message, channel)
412
413
	def close_file_connection(self, in_message):
414
		connection_id = in_message[VarNames.CONNECTION_ID]
415
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
416
		if not self_channel_status:
417
			raise Exception("Access Denied")
418
		if self_channel_status != WebRtcRedisStates.CLOSED:
419
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
420
			if sender_id == self.id:
421
				self.close_file_sender(connection_id)
422
			else:
423
				self.close_file_receiver(connection_id, in_message, sender_id)
424
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
425
426
	def close_call_connection(self, in_message):
427
		self.send_call_answer(
428
			in_message,
429
			WebRtcRedisStates.CLOSED,
430
			Actions.CLOSE_CALL_CONNECTION,
431
			[WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED],
432
			HandlerNames.PEER_CONNECTION
433
		)
434
435
	def cancel_call_connection(self, in_message):
436
		self.send_call_answer(
437
			in_message,
438
			WebRtcRedisStates.CLOSED,
439
			Actions.CANCEL_CALL_CONNECTION,
440
			[WebRtcRedisStates.OFFERED],
441
			HandlerNames.WEBRTC_TRANSFER
442
		)
443
444
	def close_file_receiver(self, connection_id, in_message, sender_id):
445
		sender_status = self.sync_redis.shget(connection_id, sender_id)
446
		if not sender_status:
447
			raise Exception("Access denied")
448
		if sender_status != WebRtcRedisStates.CLOSED:
449
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
450
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
451
			self.publish(in_message, sender_id)
452
453
	def close_file_sender(self, connection_id):
454
		values = self.sync_redis.shgetall(connection_id)
455
		del values[self.id]
456
		message = self.get_close_file_sender_message(connection_id)
457
		for ws_id in values:
458
			if values[ws_id] == WebRtcRedisStates.CLOSED:
459 View Code Duplication
				continue
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
460
			self.publish(message, ws_id)
461
462
	def accept_file(self, in_message):
463
		connection_id = in_message[VarNames.CONNECTION_ID]
464
		content = in_message[VarNames.CONTENT]
465
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
466
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
467
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
468
		if sender_ws_status == WebRtcRedisStates.READY \
469
				and self_ws_status in [WebRtcRedisStates.RESPONDED, WebRtcRedisStates.READY]:
470
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
471
			self.publish(self.get_accept_file_message(connection_id, content), sender_ws_id)
472
		else:
473
			raise ValidationError("Invalid channel status")
474
475
	# todo
476
	# we can use channel_status = self.sync_redis.shgetall(connection_id)
477
	# and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
478
	# if we shgetall and only then do async hset
479
	# we can catch an issue when 2 concurrent users accepted the call
480
	# but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
481
	def accept_call(self, in_message):
482
		connection_id = in_message[VarNames.CONNECTION_ID]
483
		self_status = self.sync_redis.shget(connection_id, self.id)
484
		if self_status == WebRtcRedisStates.RESPONDED:
485
			conn_users = self.sync_redis.shgetall(connection_id)
486
			self.publish_call_answer(
487
				conn_users,
488
				connection_id,
489
				HandlerNames.WEBRTC_TRANSFER,
490
				Actions.ACCEPT_CALL,
491
				WebRtcRedisStates.READY,
492
				{}
493
			)
494
		else:
495
			raise ValidationError("Invalid channel status")
496
497
	def send_call_answer(self, in_message, status_set, reply_action, allowed_state, message_handler):
498
		connection_id = in_message[VarNames.CONNECTION_ID]
499
		content = in_message[VarNames.CONTENT]
500
		conn_users = self.sync_redis.shgetall(connection_id)
501
		if conn_users[self.id] in allowed_state:
502
			self.publish_call_answer(conn_users, connection_id, message_handler, reply_action, status_set, content)
503
		else:
504
			raise ValidationError("Invalid channel status.")
505
506
	def publish_call_answer(self, conn_users, connection_id, message_handler, reply_action, status_set, content):
507
		self.async_redis_publisher.hset(connection_id, self.id, status_set)
508
		del conn_users[self.id]
509
		message = self.reply_webrtc(reply_action, connection_id, message_handler, content)
510
		for user in conn_users:
511
			if conn_users[user] != WebRtcRedisStates.CLOSED:
512
				self.publish(message, user)