MessagesHandler.edit_message_edit()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 14
rs 9.7
1
import json
2
import logging
3
import re
4
from django.conf import settings
5
from django.core.exceptions import ValidationError
6
from django.db.models import Q
7
from tornado.gen import engine, Task
8
from tornado.httpclient import HTTPRequest
9
from tornado.ioloop import IOLoop
10
from tornado.web import asynchronous
11
from tornadoredis import Client
12
13
from chat.global_redis import remove_parsable_prefix, encode_message
14
from chat.log_filters import id_generator
15
from chat.models import Message, Room, RoomUsers, Subscription, SubscriptionMessages, MessageHistory, \
16
	UploadedFile, Image
17
from chat.py2_3 import str_type, quote
18
from chat.settings import ALL_ROOM_ID, REDIS_PORT, WEBRTC_CONNECTION, GIPHY_URL, GIPHY_REGEX, FIREBASE_URL, REDIS_HOST
19
from chat.tornado.constants import VarNames, HandlerNames, Actions, RedisPrefix, WebRtcRedisStates
20
from chat.tornado.message_creator import WebRtcMessageCreator, MessagesCreator
21
from chat.utils import get_max_key, do_db, validate_edit_message, get_or_create_room, \
22
	create_room, get_message_images_videos, update_symbols, up_files_to_img, create_simple_room_users
23
24
parent_logger = logging.getLogger(__name__)
25
base_logger = logging.LoggerAdapter(parent_logger, {
26
	'id': 0,
27
	'ip': '000.000.000.000'
28
})
29
30
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
31
# CONNECTION_POOL = tornadoredis.ConnectionPool(
32
# max_connections=500,
33
# wait_for_available=True)
34
35
GIPHY_API_KEY = getattr(settings, "GIPHY_API_KEY", None)
36
FIREBASE_API_KEY = getattr(settings, "FIREBASE_API_KEY", None)
37
38
class MessagesHandler(MessagesCreator):
39
40
	def __init__(self, *args, **kwargs):
41
		self.closed_channels = None
42
		super(MessagesHandler, self).__init__()
43
		self.webrtc_ids = {}
44
		self.id = None  # child init
45
		self.sex = None
46
		self.last_client_ping = 0
47
		self.sender_name = None
48
		self.user_id = 0  # anonymous by default
49
		self.ip = None
50
		from chat import global_redis
51
		self.async_redis_publisher = global_redis.async_redis_publisher
52
		self.sync_redis = global_redis.sync_redis
53
		self.channels = []
54
		self._logger = None
55
		self.async_redis = Client(host=REDIS_HOST, port=REDIS_PORT)
56
		self.patch_tornadoredis()
57
		# input websocket messages handlers
58
		# The handler is determined by @VarNames.EVENT
59
		self.process_ws_message = {
60
			Actions.GET_MESSAGES: self.process_get_messages,
61
			Actions.SEND_MESSAGE: self.process_send_message,
62
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
63
			Actions.DELETE_ROOM: self.delete_channel,
64
			Actions.EDIT_MESSAGE: self.edit_message,
65
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
66
			Actions.INVITE_USER: self.invite_user,
67
			Actions.PING: self.respond_ping,
68
			Actions.PONG: self.process_pong_message,
69
		}
70
		# Handlers for redis messages, if handler returns true - message won't be sent to client
71
		# The handler is determined by @VarNames.EVENT
72
		self.process_pubsub_message = {
73
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
74
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
75
			Actions.DELETE_ROOM: self.send_client_delete_channel,
76
			Actions.INVITE_USER: self.send_client_new_channel,
77
			Actions.PING: self.process_ping_message,
78
		}
79
80
	def patch_tornadoredis(self):  # TODO remove this
81
		fabric = type(self.async_redis.connection.readline)
82
		self.async_redis.connection.old_read = self.async_redis.connection.readline
83
84
		def new_read(new_self, callback=None):
85
			try:
86
				return new_self.old_read(callback=callback)
87
			except Exception as e:
88
				return
89
				current_online = self.get_online_from_redis()
90
				self.logger.error(e)
91
				self.logger.error(
92
					"Exception info: "
93
					"self.id: %s ;;; "
94
					"self.connected = '%s';;; "
95
					"Redis default channel online = '%s';;; "
96
					"self.channels = '%s';;; "
97
					"self.closed_channels  = '%s';;;",
98
					self.id, self.connected, current_online, self.channels, self.closed_channels
99
				)
100
				raise e
101
102
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
103
104
	@property
105
	def connected(self):
106
		raise NotImplemented
107
108
	@connected.setter
109
	def connected(self, value):
110
		raise NotImplemented
111
112
	@property
113
	def http_client(self):
114
		raise NotImplemented
115
116
	@engine
117
	def listen(self, channels):
118
		yield Task(
119
			self.async_redis.subscribe, channels)
120
		self.async_redis.listen(self.on_pub_sub_message)
121
122
	@property
123
	def logger(self):
124
		return self._logger if self._logger else base_logger
125
126
	@engine
127
	def add_channel(self, channel):
128
		self.channels.append(channel)
129
		yield Task(self.async_redis.subscribe, (channel,))
130
131
	def get_online_from_redis(self):
132
		return self.get_online_and_status_from_redis()[1]
133
134
	def get_online_and_status_from_redis(self):
135
		"""
136
		:rtype : (bool, list)
137
		"""
138
		online = self.sync_redis.ssmembers(RedisPrefix.ONLINE_VAR)
139
		self.logger.debug('!! redis online: %s', online)
140
		return self.parse_redis_online(online) if online else (False, [])
141
142
	def parse_redis_online(self, online):
143
		"""
144
		:rtype : (bool, list)
145
		"""
146
		result = set()
147
		user_is_online = False
148
		for decoded in online:  # py2 iteritems
149
			# : char specified in cookies_middleware.py.create_id
150
			user_id = int(decoded.split(':')[0])
151
			if user_id == self.user_id and decoded != self.id:
152
				user_is_online = True
153
			result.add(user_id)
154
		return user_is_online, list(result)
155
156
	def publish(self, message, channel, parsable=False):
157
		jsoned_mess = encode_message(message, parsable)
158
		self.logger.debug('<%s> %s', channel, jsoned_mess)
159
		self.async_redis_publisher.publish(channel, jsoned_mess)
160
161
	def on_pub_sub_message(self, message):
162
		"""
163
		All pubsub messages are automatically sent to client.
164
		:param message:
165
		:return:
166
		"""
167
		data = message.body
168
		if isinstance(data, str_type):  # not subscribe event
169
			prefixless_str = remove_parsable_prefix(data)
170
			if prefixless_str:
171
				dict_message = json.loads(prefixless_str)
172
				res = self.process_pubsub_message[dict_message[VarNames.EVENT]](dict_message)
173
				if not res:
174
					self.ws_write(prefixless_str)
175
			else:
176
				self.ws_write(data)
177
178
	def ws_write(self, message):
179
		raise NotImplementedError('WebSocketHandler implements')
180
181
	@asynchronous
182
	def search_giphy(self, message, query, cb):
183
		self.logger.debug("!! Asking giphy for: %s", query)
184
		def on_giphy_reply(response):
185
			try:
186
				self.logger.debug("!! Got giphy response: " + str(response.body))
187
				res =  json.loads(response.body)
188
				giphy = res['data'][0]['images']['downsized_medium']['url']
189
			except:
190
				giphy = None
191
			cb(message, giphy)
192
		url = GIPHY_URL.format(GIPHY_API_KEY, quote(query, safe=''))
193
		self.http_client.fetch(url, callback=on_giphy_reply)
194
195
	def notify_offline(self, channel, message_id):
196
		if FIREBASE_API_KEY is None:
197
			return
198
		online = self.get_online_from_redis()
199
		if channel == ALL_ROOM_ID:
200
			return
201
		offline_users = RoomUsers.objects.filter(room_id=channel, notifications=True).exclude(user_id__in=online).values_list('user_id')
202
		subscriptions = Subscription.objects.filter(user__in=offline_users, inactive=False)
203
		if len(subscriptions) == 0:
204
			return
205
		new_sub_mess =[SubscriptionMessages(message_id=message_id, subscription_id=r.id) for r in subscriptions]
206
		reg_ids =[r.registration_id for r in subscriptions]
207
		SubscriptionMessages.objects.bulk_create(new_sub_mess)
208
		self.post_firebase(list(reg_ids))
209
210
	@asynchronous
211
	def post_firebase(self, reg_ids):
212
		def on_reply(response):
213
			try:
214
				self.logger.debug("!! FireBase response: " + str(response.body))
215
				response_obj = json.loads(response.body)
216
				delete = []
217
				for index, elem in enumerate(response_obj['results']):
218
					if elem.get('error') in ['NotRegistered', 'InvalidRegistration']:
219
						delete.append(reg_ids[index])
220
				if len(delete) > 0:
221
					self.logger.info("Deactivating subscriptions: %s", delete)
222
					Subscription.objects.filter(registration_id__in=delete).update(inactive=True)
223
			except Exception as e:
224
				self.logger.error("Unable to parse response" + str(e))
225
				pass
226
227
		headers = {"Content-Type": "application/json", "Authorization": "key=%s" % FIREBASE_API_KEY}
228
		body = json.dumps({"registration_ids": reg_ids})
229
		self.logger.debug("!! post_fire_message %s", body)
230
		r = HTTPRequest(FIREBASE_URL, method="POST", headers=headers, body=body)
231
		self.http_client.fetch(r, callback=on_reply)
232
233
	def isGiphy(self, content):
234
		if GIPHY_API_KEY is not None and content is not None:
235
			giphy_match = re.search(GIPHY_REGEX, content)
236
			return giphy_match.group(1) if giphy_match is not None else None
237
238
	def process_send_message(self, message):
239
		"""
240
		:type message: dict
241
		"""
242
		content = message.get(VarNames.CONTENT)
243
		giphy_match = self.isGiphy(content)
244
245
		# @transaction.atomic mysql has gone away
246
		def send_message(message, giphy=None):
247
			files = UploadedFile.objects.filter(id__in=message.get(VarNames.FILES), user_id=self.user_id)
248
			symbol = get_max_key(files)
249
			channel = message[VarNames.ROOM_ID]
250
			js_id = message[VarNames.JS_MESSAGE_ID]
251
			message_db = Message(
252
				sender_id=self.user_id,
253
				content=message[VarNames.CONTENT],
254
				symbol=symbol,
255
				giphy=giphy,
256
				room_id=channel
257
			)
258
			res_files = []
259
			do_db(message_db.save)
260
			if files:
261
				images = up_files_to_img(files, message_db.id)
262
				res_files = MessagesCreator.prepare_img_video(images, message_db.id)
263
			prepared_message = self.create_send_message(
264
				message_db,
265
				Actions.PRINT_MESSAGE,
266
				res_files,
267
				js_id
268
			)
269
			self.publish(prepared_message, channel)
270
			self.notify_offline(channel, message_db.id)
271
		if giphy_match is not None:
272
			self.search_giphy(message, giphy_match, send_message)
273
		else:
274
			send_message(message)
275
276
	def create_new_room(self, message):
277
		room_name = message[VarNames.ROOM_NAME]
278
		if not room_name or len(room_name) > 16:
279
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
280
		room = Room(name=room_name)
281
		do_db(room.save)
282
		create_simple_room_users(self.user_id, room.id)
283
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
284
		self.publish(subscribe_message, self.channel, True)
285
286
	def invite_user(self, message):
287
		room_id = message[VarNames.ROOM_ID]
288
		user_id = message[VarNames.USER_ID]
289
		room = get_or_create_room(self.channels, room_id, user_id)
290
		users_in_room = list(RoomUsers.objects.filter(room_id=room_id).values_list('user_id', flat=True))
291
		notify_others = self.add_user_to_room(
292
			room_id,
293
			room.name,
294
			self.user_id,
295
			user_id,
296
			users_in_room
297
		)
298
		self.publish(notify_others, room_id)
299
		if user_id != self.user_id:
300
			self.publish(notify_others, RedisPrefix.generate_user(user_id), True)
301
302
	def respond_ping(self, message):
303
		self.ws_write(self.responde_pong(message[VarNames.JS_MESSAGE_ID]))
304
305
	def process_pong_message(self, message):
306
		self.last_client_ping = message[VarNames.TIME]
307
308
	def process_ping_message(self, message):
309
		def call_check():
310
			if message[VarNames.TIME] != self.last_client_ping:
311
				self.close(408, "Ping timeout")
312
		IOLoop.instance().call_later(settings.PING_CLOSE_SERVER_DELAY, call_check)
313
314
	def create_user_channel(self, message):
315
		user_id = message[VarNames.USER_ID]
316
		room_id = create_room(self.user_id, user_id)
317
		multiple_users = user_id != self.user_id
318
		subscribe_message = self.subscribe_direct_channel_message(
319
			room_id,
320
			[user_id, self.user_id] if multiple_users else [self.user_id],
321
			self.user_id != user_id
322
		)
323
		self.publish(subscribe_message, self.channel, True)
324
		if multiple_users:
325
			self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
326
327
	def delete_channel(self, message):
328
		room_id = message[VarNames.ROOM_ID]
329
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
330
			raise ValidationError('You are not allowed to exit this room')
331
		room = do_db(Room.objects.get, id=room_id)
332
		if room.disabled:
333
			raise ValidationError('Room is already deleted')
334
		if room.name is None:  # if private then disable
335
			room.disabled = True
336
			room.save()
337
		else:  # if public -> leave the room, delete the link
338
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
339
		ru = list(RoomUsers.objects.filter(room_id=room.id).values_list('user_id', flat=True))
340
		message = self.unsubscribe_direct_message(room_id, ru, room.name)
341
		self.publish(message, room_id, True)
342
343
	def edit_message(self, data):
344
		js_id = data[VarNames.JS_MESSAGE_ID]
345
		message = do_db(Message.objects.get, id=data[VarNames.MESSAGE_ID])
346
		validate_edit_message(self.user_id, message)
347
		message.content = data[VarNames.CONTENT]
348
		MessageHistory(message=message, content=message.content, giphy=message.giphy).save()
349
		message.edited_times += 1
350
		giphy_match = self.isGiphy(data[VarNames.CONTENT])
351
		if message.content is None:
352
			Message.objects.filter(id=data[VarNames.MESSAGE_ID]).update(
353
				deleted=True,
354
				edited_times=message.edited_times,
355
				content=None
356
			)
357
			self.publish(self.create_send_message(message, Actions.DELETE_MESSAGE, None, js_id), message.room_id)
358
		elif giphy_match is not None:
359
			self.edit_message_giphy(giphy_match, message, js_id)
360
		else:
361
			self.edit_message_edit(data, message, js_id)
362
363
	def edit_message_giphy(self, giphy_match, message, js_id):
364
		def edit_glyphy(message, giphy):
365
			do_db(Message.objects.filter(id=message.id).update, content=message.content, symbol=message.symbol, giphy=giphy,
366
					edited_times=message.edited_times)
367
			message.giphy = giphy
368
			self.publish(self.create_send_message(message, Actions.EDIT_MESSAGE, None, js_id), message.room_id)
369
370
		self.search_giphy(message, giphy_match, edit_glyphy)
371
372
	def edit_message_edit(self, data, message, js_id):
373
		action = Actions.EDIT_MESSAGE
374
		message.giphy = None
375
		files = UploadedFile.objects.filter(id__in=data.get(VarNames.FILES), user_id=self.user_id)
376
		if files:
377
			update_symbols(files, message)
378
			up_files_to_img(files, message.id)
379
		if message.symbol:  # fetch all, including that we just store
380
			db_images = Image.objects.filter(message_id=message.id)
381
			prep_files = MessagesCreator.prepare_img_video(db_images, message.id)
382
		else:
383
			prep_files = None
384
		Message.objects.filter(id=message.id).update(content=message.content, symbol=message.symbol, giphy=None, edited_times=message.edited_times)
385
		self.publish(self.create_send_message(message, action, prep_files, js_id), message.room_id)
386
387
	def send_client_new_channel(self, message):
388
		room_id = message[VarNames.ROOM_ID]
389
		self.add_channel(room_id)
390
391
	def send_client_delete_channel(self, message):
392
		room_id = message[VarNames.ROOM_ID]
393
		if message[VarNames.USER_ID] == self.user_id or message[VarNames.ROOM_NAME] is None:
394
			self.async_redis.unsubscribe((room_id,))
395
			self.channels.remove(room_id)
396
397
	def process_get_messages(self, data):
398
		"""
399
		:type data: dict
400
		"""
401
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
402
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
403
		room_id = data[VarNames.ROOM_ID]
404
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
405
		if header_id is None:
406
			messages = Message.objects.filter(room_id=room_id).order_by('-pk')[:count]
407
		else:
408
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id)).order_by('-pk')[:count]
409
		imv = do_db(get_message_images_videos, messages)
410
		response = self.get_messages(messages, room_id, imv, MessagesCreator.prepare_img_video)
411
		self.ws_write(response)
412
413
414
class WebRtcMessageHandler(MessagesHandler, WebRtcMessageCreator):
415
416
	def __init__(self, *args, **kwargs):
417
		super(WebRtcMessageHandler, self).__init__(*args, **kwargs)
418
		self.process_ws_message.update({
419
			Actions.WEBRTC: self.proxy_webrtc,
420
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
421
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
422
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
423
			Actions.ACCEPT_CALL: self.accept_call,
424
			Actions.ACCEPT_FILE: self.accept_file,
425
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
426
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
427
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
428
			Actions.RETRY_FILE_CONNECTION: self.retry_file_connection,
429
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
430
		})
431
		self.process_pubsub_message.update({
432
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
433
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
434
		})
435
436
	def set_opponent_call_channel(self, message):
437
		connection_id = message[VarNames.CONNECTION_ID]
438
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
439
			return True
440
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
441
442
	def offer_webrtc_connection(self, in_message):
443
		room_id = in_message[VarNames.ROOM_ID]
444
		content = in_message.get(VarNames.CONTENT)
445
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
446
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
447
		# use list because sets dont have 1st element which is offerer
448
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
449
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
450
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
451
		self_message = self.set_connection_id(qued_id, connection_id)
452
		self.ws_write(self_message)
453
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
454
		self.publish(opponents_message, room_id, True)
455
456
	def retry_file_connection(self, in_message):
457
		connection_id = in_message[VarNames.CONNECTION_ID]
458
		opponent_ws_id = in_message[VarNames.WEBRTC_OPPONENT_ID]
459
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
460
		receiver_ws_status = self.sync_redis.shget(connection_id, opponent_ws_id)
461
		if receiver_ws_status == WebRtcRedisStates.READY and self.id == sender_ws_id:
462
			self.publish(self.retry_file(connection_id), opponent_ws_id)
463
		else:
464
			raise ValidationError("Invalid channel status.")
465
466 View Code Duplication
	def reply_file_connection(self, in_message):
467
		connection_id = in_message[VarNames.CONNECTION_ID]
468
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
469
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
470
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
471
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
472
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
473
			self.publish(self.reply_webrtc(
474
				Actions.REPLY_FILE_CONNECTION,
475
				connection_id,
476
				HandlerNames.WEBRTC_TRANSFER,
477
				in_message[VarNames.CONTENT]
478
			), sender_ws_id)
479
		else:
480
			raise ValidationError("Invalid channel status.")
481
482
	def reply_call_connection(self, in_message):
483
		self.send_call_answer(
484
			in_message,
485
			WebRtcRedisStates.RESPONDED,
486
			Actions.REPLY_CALL_CONNECTION,
487
			[WebRtcRedisStates.OFFERED],
488
			HandlerNames.WEBRTC_TRANSFER
489
		)
490
491
	def proxy_webrtc(self, in_message):
492
		"""
493
		:type in_message: dict
494
		"""
495
		connection_id = in_message[VarNames.CONNECTION_ID]
496
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
497
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
498
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
499
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
500
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
501
				self_channel_status, opponent_channel_status
502
			))  # todo receiver should only accept proxy_webrtc from sender, sender can accept all
503
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
504
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
505
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
506
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
507
		self.logger.debug(
508
			"!! Forwarding message to channel %s, self %s, other status %s",
509
			channel,
510
			self_channel_status,
511
			opponent_channel_status
512
		)
513
		self.publish(in_message, channel)
514
515
	def close_file_connection(self, in_message):
516
		connection_id = in_message[VarNames.CONNECTION_ID]
517
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
518
		if not self_channel_status:
519
			raise Exception("Access Denied")
520
		if self_channel_status != WebRtcRedisStates.CLOSED:
521
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
522
			if sender_id == self.id:
523
				self.close_file_sender(connection_id)
524
			else:
525
				self.close_file_receiver(connection_id, in_message, sender_id)
526
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
527
528
	def close_call_connection(self, in_message):
529
		self.send_call_answer(
530
			in_message,
531
			WebRtcRedisStates.CLOSED,
532
			Actions.CLOSE_CALL_CONNECTION,
533
			[WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED],
534
			HandlerNames.PEER_CONNECTION
535
		)
536
537
	def cancel_call_connection(self, in_message):
538
		self.send_call_answer(
539
			in_message,
540
			WebRtcRedisStates.CLOSED,
541
			Actions.CANCEL_CALL_CONNECTION,
542
			[WebRtcRedisStates.OFFERED],
543
			HandlerNames.WEBRTC_TRANSFER
544
		)
545
546
	def close_file_receiver(self, connection_id, in_message, sender_id):
547
		sender_status = self.sync_redis.shget(connection_id, sender_id)
548
		if not sender_status:
549
			raise Exception("Access denied")
550
		if sender_status != WebRtcRedisStates.CLOSED:
551
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
552
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
553
			self.publish(in_message, sender_id)
554
555
	def close_file_sender(self, connection_id):
556
		values = self.sync_redis.shgetall(connection_id)
557
		del values[self.id]
558
		message = self.get_close_file_sender_message(connection_id)
559
		for ws_id in values:
560
			if values[ws_id] == WebRtcRedisStates.CLOSED:
561
				continue
562
			self.publish(message, ws_id)
563
564 View Code Duplication
	def accept_file(self, in_message):
565
		connection_id = in_message[VarNames.CONNECTION_ID]
566
		content = in_message[VarNames.CONTENT]
567
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
568
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
569
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
570
		if sender_ws_status == WebRtcRedisStates.READY \
571
				and self_ws_status in [WebRtcRedisStates.RESPONDED, WebRtcRedisStates.READY]:
572
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
573
			self.publish(self.get_accept_file_message(connection_id, content), sender_ws_id)
574
		else:
575
			raise ValidationError("Invalid channel status")
576
577
	# todo
578
	# we can use channel_status = self.sync_redis.shgetall(connection_id)
579
	# and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
580
	# if we shgetall and only then do async hset
581
	# we can catch an issue when 2 concurrent users accepted the call
582
	# but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
583
	def accept_call(self, in_message):
584
		connection_id = in_message[VarNames.CONNECTION_ID]
585
		self_status = self.sync_redis.shget(connection_id, self.id)
586
		if self_status == WebRtcRedisStates.RESPONDED:
587
			conn_users = self.sync_redis.shgetall(connection_id)
588
			self.publish_call_answer(
589
				conn_users,
590
				connection_id,
591
				HandlerNames.WEBRTC_TRANSFER,
592
				Actions.ACCEPT_CALL,
593
				WebRtcRedisStates.READY,
594
				{}
595
			)
596
		else:
597
			raise ValidationError("Invalid channel status")
598
599
	def send_call_answer(self, in_message, status_set, reply_action, allowed_state, message_handler):
600
		connection_id = in_message[VarNames.CONNECTION_ID]
601
		content = in_message.get(VarNames.CONTENT)  # cancel call can skip browser
602
		conn_users = self.sync_redis.shgetall(connection_id)
603
		if conn_users[self.id] in allowed_state:
604
			self.publish_call_answer(conn_users, connection_id, message_handler, reply_action, status_set, content)
605
		else:
606
			raise ValidationError("Invalid channel status.")
607
608
	def publish_call_answer(self, conn_users, connection_id, message_handler, reply_action, status_set, content):
609
		self.async_redis_publisher.hset(connection_id, self.id, status_set)
610
		del conn_users[self.id]
611
		message = self.reply_webrtc(reply_action, connection_id, message_handler, content)
612
		for user in conn_users:
613
			if conn_users[user] != WebRtcRedisStates.CLOSED:
614
				self.publish(message, user)