Completed
Push — master ( ec5943...6b5d6f )
by Andrew
30s
created

MessagesHandler.add_online_user()   A

Complexity

Conditions 2

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 15
rs 9.4285
1
import json
2
import logging
3
import re
4
import urllib
5
6
from django.core.exceptions import ValidationError
7
from django.db import transaction
8
from django.db.models import Q
9
from tornado import ioloop
10
from tornado.gen import engine, Task
11
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
12
from tornado.ioloop import IOLoop
13
from tornado.web import asynchronous
14
from tornadoredis import Client
15
from django.conf import settings
16
from chat.global_redis import remove_parsable_prefix, encode_message
17
from chat.log_filters import id_generator
18
from chat.models import Message, Room, RoomUsers, Subscription, User, UserProfile, SubscriptionMessages, MessageHistory, \
19
	UploadedFile, Image
20
from chat.py2_3 import str_type, quote
21
from chat.settings import ALL_ROOM_ID, REDIS_PORT, WEBRTC_CONNECTION, GIPHY_URL, GIPHY_REGEX, FIREBASE_URL, REDIS_HOST
22
from chat.tornado.constants import VarNames, HandlerNames, Actions, RedisPrefix, WebRtcRedisStates
23
from chat.tornado.message_creator import WebRtcMessageCreator, MessagesCreator
24
from chat.utils import get_max_key, do_db, validate_edit_message, get_or_create_room, \
25
	create_room, get_message_images_videos, update_symbols, up_files_to_img
26
27
parent_logger = logging.getLogger(__name__)
28
base_logger = logging.LoggerAdapter(parent_logger, {
29
	'id': 0,
30
	'ip': '000.000.000.000'
31
})
32
33
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
34
# CONNECTION_POOL = tornadoredis.ConnectionPool(
35
# max_connections=500,
36
# wait_for_available=True)
37
38
GIPHY_API_KEY = getattr(settings, "GIPHY_API_KEY", None)
39
FIREBASE_API_KEY = getattr(settings, "FIREBASE_API_KEY", None)
40
41
class MessagesHandler(MessagesCreator):
42
43
	def __init__(self, *args, **kwargs):
44
		self.closed_channels = None
45
		super(MessagesHandler, self).__init__()
46
		self.webrtc_ids = {}
47
		self.id = None  # child init
48
		self.sex = None
49
		self.last_client_ping = 0
50
		self.sender_name = None
51
		self.user_id = 0  # anonymous by default
52
		self.ip = None
53
		from chat import global_redis
54
		self.async_redis_publisher = global_redis.async_redis_publisher
55
		self.sync_redis = global_redis.sync_redis
56
		self.channels = []
57
		self._logger = None
58
		self.async_redis = Client(host=REDIS_HOST, port=REDIS_PORT)
59
		self.patch_tornadoredis()
60
		# input websocket messages handlers
61
		# The handler is determined by @VarNames.EVENT
62
		self.process_ws_message = {
63
			Actions.GET_MESSAGES: self.process_get_messages,
64
			Actions.SEND_MESSAGE: self.process_send_message,
65
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
66
			Actions.DELETE_ROOM: self.delete_channel,
67
			Actions.EDIT_MESSAGE: self.edit_message,
68
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
69
			Actions.INVITE_USER: self.invite_user,
70
			Actions.PING: self.respond_ping,
71
			Actions.PONG: self.process_pong_message,
72
		}
73
		# Handlers for redis messages, if handler returns true - message won't be sent to client
74
		# The handler is determined by @VarNames.EVENT
75
		self.process_pubsub_message = {
76
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
77
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
78
			Actions.DELETE_ROOM: self.send_client_delete_channel,
79
			Actions.INVITE_USER: self.send_client_new_channel,
80
			Actions.PING: self.process_ping_message,
81
		}
82
83
	def patch_tornadoredis(self):  # TODO remove this
84
		fabric = type(self.async_redis.connection.readline)
85
		self.async_redis.connection.old_read = self.async_redis.connection.readline
86
87
		def new_read(new_self, callback=None):
88
			try:
89
				return new_self.old_read(callback=callback)
90
			except Exception as e:
91
				return
92
				current_online = self.get_online_from_redis()
93
				self.logger.error(e)
94
				self.logger.error(
95
					"Exception info: "
96
					"self.id: %s ;;; "
97
					"self.connected = '%s';;; "
98
					"Redis default channel online = '%s';;; "
99
					"self.channels = '%s';;; "
100
					"self.closed_channels  = '%s';;;",
101
					self.id, self.connected, current_online, self.channels, self.closed_channels
102
				)
103
				raise e
104
105
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
106
107
	@property
108
	def connected(self):
109
		raise NotImplemented
110
111
	@connected.setter
112
	def connected(self, value):
113
		raise NotImplemented
114
115
	@property
116
	def http_client(self):
117
		raise NotImplemented
118
119
	@engine
120
	def listen(self, channels):
121
		yield Task(
122
			self.async_redis.subscribe, channels)
123
		self.async_redis.listen(self.on_pub_sub_message)
124
125
	@property
126
	def logger(self):
127
		return self._logger if self._logger else base_logger
128
129
	@engine
130
	def add_channel(self, channel):
131
		self.channels.append(channel)
132
		yield Task(self.async_redis.subscribe, (channel,))
133
134
	def get_online_from_redis(self):
135
		return self.get_online_and_status_from_redis()[1]
136
137
	def get_online_and_status_from_redis(self):
138
		"""
139
		:rtype : (bool, list)
140
		"""
141
		online = self.sync_redis.ssmembers(RedisPrefix.ONLINE_VAR)
142
		self.logger.debug('!! redis online: %s', online)
143
		return self.parse_redis_online(online) if online else (False, [])
144
145
	def parse_redis_online(self, online):
146
		"""
147
		:rtype : (bool, list)
148
		"""
149
		result = set()
150
		user_is_online = False
151
		for decoded in online:  # py2 iteritems
152
			# : char specified in cookies_middleware.py.create_id
153
			user_id = int(decoded.split(':')[0])
154
			if user_id == self.user_id and decoded != self.id:
155
				user_is_online = True
156
			result.add(user_id)
157
		return user_is_online, list(result)
158
159
	def publish(self, message, channel, parsable=False):
160
		jsoned_mess = encode_message(message, parsable)
161
		self.logger.debug('<%s> %s', channel, jsoned_mess)
162
		self.async_redis_publisher.publish(channel, jsoned_mess)
163
164
	def on_pub_sub_message(self, message):
165
		"""
166
		All pubsub messages are automatically sent to client.
167
		:param message:
168
		:return:
169
		"""
170
		data = message.body
171
		if isinstance(data, str_type):  # not subscribe event
172
			prefixless_str = remove_parsable_prefix(data)
173
			if prefixless_str:
174
				dict_message = json.loads(prefixless_str)
175
				res = self.process_pubsub_message[dict_message[VarNames.EVENT]](dict_message)
176
				if not res:
177
					self.ws_write(prefixless_str)
178
			else:
179
				self.ws_write(data)
180
181
	def ws_write(self, message):
182
		raise NotImplementedError('WebSocketHandler implements')
183
184
	@asynchronous
185
	def search_giphy(self, message, query, cb):
186
		self.logger.debug("!! Asking giphy for: %s", query)
187
		def on_giphy_reply(response):
188
			try:
189
				self.logger.debug("!! Got giphy response: " + str(response.body))
190
				res =  json.loads(response.body)
191
				giphy = res['data'][0]['images']['downsized_medium']['url']
192
			except:
193
				giphy = None
194
			cb(message, giphy)
195
		url = GIPHY_URL.format(GIPHY_API_KEY, quote(query, safe=''))
196
		self.http_client.fetch(url, callback=on_giphy_reply)
197
198
	def notify_offline(self, channel, message_id):
199
		if FIREBASE_API_KEY is None:
200
			return
201
		online = self.get_online_from_redis()
202
		if channel == ALL_ROOM_ID:
203
			return
204
		offline_users = RoomUsers.objects.filter(room_id=channel, notifications=True).exclude(user_id__in=online).values_list('user_id')
205
		subscriptions = Subscription.objects.filter(user__in=offline_users, inactive=False)
206
		if len(subscriptions) == 0:
207
			return
208
		new_sub_mess =[SubscriptionMessages(message_id=message_id, subscription_id=r.id) for r in subscriptions]
209
		reg_ids =[r.registration_id for r in subscriptions]
210
		SubscriptionMessages.objects.bulk_create(new_sub_mess)
211
		self.post_firebase(list(reg_ids))
212
213
	@asynchronous
214
	def post_firebase(self, reg_ids):
215
		def on_reply(response):
216
			try:
217
				self.logger.debug("!! FireBase response: " + str(response.body))
218
				response_obj = json.loads(response.body)
219
				delete = []
220
				for index, elem in enumerate(response_obj['results']):
221
					if elem.get('error') in ['NotRegistered', 'InvalidRegistration']:
222
						delete.append(reg_ids[index])
223
				if len(delete) > 0:
224
					self.logger.info("Deactivating subscriptions: %s", delete)
225
					Subscription.objects.filter(registration_id__in=delete).update(inactive=True)
226
			except Exception as e:
227
				self.logger.error("Unable to parse response" + str(e))
228
				pass
229
230
		headers = {"Content-Type": "application/json", "Authorization": "key=%s" % FIREBASE_API_KEY}
231
		body = json.dumps({"registration_ids": reg_ids})
232
		self.logger.debug("!! post_fire_message %s", body)
233
		r = HTTPRequest(FIREBASE_URL, method="POST", headers=headers, body=body)
234
		self.http_client.fetch(r, callback=on_reply)
235
236
	def isGiphy(self, content):
237
		if GIPHY_API_KEY is not None and content is not None:
238
			giphy_match = re.search(GIPHY_REGEX, content)
239
			return giphy_match.group(1) if giphy_match is not None else None
240
241
	def process_send_message(self, message):
242
		"""
243
		:type message: dict
244
		"""
245
		content = message.get(VarNames.CONTENT)
246
		giphy_match = self.isGiphy(content)
247
248
		# @transaction.atomic mysql has gone away
249
		def send_message(message, giphy=None):
250
			files = UploadedFile.objects.filter(id__in=message.get(VarNames.FILES), user_id=self.user_id)
251
			symbol = get_max_key(files)
252
			channel = message[VarNames.ROOM_ID]
253
			js_id = message[VarNames.JS_MESSAGE_ID]
254
			message_db = Message(
255
				sender_id=self.user_id,
256
				content=message[VarNames.CONTENT],
257
				symbol=symbol,
258
				giphy=giphy,
259
				room_id=channel
260
			)
261
			res_files = []
262
			do_db(message_db.save)
263
			if files:
264
				images = up_files_to_img(files, message_db.id)
265
				res_files = MessagesCreator.prepare_img_video(images, message_db.id)
266
			prepared_message = self.create_send_message(
267
				message_db,
268
				Actions.PRINT_MESSAGE,
269
				res_files,
270
				js_id
271
			)
272
			self.publish(prepared_message, channel)
273
			self.notify_offline(channel, message_db.id)
274
		if giphy_match is not None:
275
			self.search_giphy(message, giphy_match, send_message)
276
		else:
277
			send_message(message)
278
279
	def create_new_room(self, message):
280
		room_name = message[VarNames.ROOM_NAME]
281
		if not room_name or len(room_name) > 16:
282
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
283
		room = Room(name=room_name)
284
		do_db(room.save)
285
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
286
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
287
		self.publish(subscribe_message, self.channel, True)
288
289
	def invite_user(self, message):
290
		room_id = message[VarNames.ROOM_ID]
291
		user_id = message[VarNames.USER_ID]
292
		room = get_or_create_room(self.channels, room_id, user_id)
293
		users_in_room = list(RoomUsers.objects.filter(room_id=room_id).values_list('user_id', flat=True))
294
		notify_others = self.add_user_to_room(
295
			room_id,
296
			room.name,
297
			self.user_id,
298
			user_id,
299
			users_in_room
300
		)
301
		self.publish(notify_others, room_id)
302
		if user_id != self.user_id:
303
			self.publish(notify_others, RedisPrefix.generate_user(user_id), True)
304
305
	def respond_ping(self, message):
306
		self.ws_write(self.responde_pong(message[VarNames.JS_MESSAGE_ID]))
307
308
	def process_pong_message(self, message):
309
		self.last_client_ping = message[VarNames.TIME]
310
311
	def process_ping_message(self, message):
312
		def call_check():
313
			if message[VarNames.TIME] != self.last_client_ping:
314
				self.close(408, "Ping timeout")
315
		IOLoop.instance().call_later(settings.PING_CLOSE_SERVER_DELAY, call_check)
316
317
	def create_user_channel(self, message):
318
		user_id = message[VarNames.USER_ID]
319
		room_id = create_room(self.user_id, user_id)
320
		multiple_users = user_id != self.user_id
321
		subscribe_message = self.subscribe_direct_channel_message(
322
			room_id,
323
			[user_id, self.user_id] if multiple_users else [self.user_id],
324
			self.user_id != user_id
325
		)
326
		self.publish(subscribe_message, self.channel, True)
327
		if multiple_users:
328
			self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
329
330
	def delete_channel(self, message):
331
		room_id = message[VarNames.ROOM_ID]
332
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
333
			raise ValidationError('You are not allowed to exit this room')
334
		room = do_db(Room.objects.get, id=room_id)
335
		if room.disabled:
336
			raise ValidationError('Room is already deleted')
337
		if room.name is None:  # if private then disable
338
			room.disabled = True
339
		else:  # if public -> leave the room, delete the link
340
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
341
		room.save()
342
		message = self.unsubscribe_direct_message(room_id)
343
		self.publish(message, room_id, True)
344
345
	def edit_message(self, data):
346
		js_id = data[VarNames.JS_MESSAGE_ID]
347
		message = do_db(Message.objects.get, id=data[VarNames.MESSAGE_ID])
348
		validate_edit_message(self.user_id, message)
349
		message.content = data[VarNames.CONTENT]
350
		MessageHistory(message=message, content=message.content, giphy=message.giphy).save()
351
		message.edited_times += 1
352
		giphy_match = self.isGiphy(data[VarNames.CONTENT])
353
		if message.content is None:
354
			Message.objects.filter(id=data[VarNames.MESSAGE_ID]).update(
355
				deleted=True,
356
				edited_times=message.edited_times,
357
				content=None
358
			)
359
			self.publish(self.create_send_message(message, Actions.DELETE_MESSAGE, None, js_id), message.room_id)
360
		elif giphy_match is not None:
361 View Code Duplication
			self.edit_message_giphy(giphy_match, message, js_id)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
362
		else:
363
			self.edit_message_edit(data, message, js_id)
364
365
	def edit_message_giphy(self, giphy_match, message, js_id):
366
		def edit_glyphy(message, giphy):
367
			do_db(Message.objects.filter(id=message.id).update, content=message.content, symbol=message.symbol, giphy=giphy,
368
					edited_times=message.edited_times)
369
			message.giphy = giphy
370
			self.publish(self.create_send_message(message, Actions.EDIT_MESSAGE, None, js_id), message.room_id)
371
372
		self.search_giphy(message, giphy_match, edit_glyphy)
373
374
	def edit_message_edit(self, data, message, js_id):
375
		action = Actions.EDIT_MESSAGE
376
		message.giphy = None
377
		files = UploadedFile.objects.filter(id__in=data.get(VarNames.FILES), user_id=self.user_id)
378
		if files:
379
			update_symbols(files, message)
380
			up_files_to_img(files, message.id)
381
		if message.symbol:  # fetch all, including that we just store
382
			db_images = Image.objects.filter(message_id=message.id)
383
			prep_files = MessagesCreator.prepare_img_video(db_images, message.id)
384
		else:
385
			prep_files = None
386
		Message.objects.filter(id=message.id).update(content=message.content, symbol=message.symbol, giphy=None, edited_times=message.edited_times)
387
		self.publish(self.create_send_message(message, action, prep_files, js_id), message.room_id)
388
389
	def send_client_new_channel(self, message):
390
		room_id = message[VarNames.ROOM_ID]
391
		self.add_channel(room_id)
392
393
	def send_client_delete_channel(self, message):
394
		room_id = message[VarNames.ROOM_ID]
395
		self.async_redis.unsubscribe((room_id,))
396
		self.channels.remove(room_id)
397
398
	def process_get_messages(self, data):
399
		"""
400
		:type data: dict
401
		"""
402
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
403
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
404
		room_id = data[VarNames.ROOM_ID]
405
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
406
		if header_id is None:
407
			messages = Message.objects.filter(room_id=room_id).order_by('-pk')[:count]
408
		else:
409
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id)).order_by('-pk')[:count]
410
		imv = do_db(get_message_images_videos, messages)
411
		response = self.get_messages(messages, room_id, imv, MessagesCreator.prepare_img_video)
412
		self.ws_write(response)
413
414
415
class WebRtcMessageHandler(MessagesHandler, WebRtcMessageCreator):
416
417
	def __init__(self, *args, **kwargs):
418
		super(WebRtcMessageHandler, self).__init__(*args, **kwargs)
419
		self.process_ws_message.update({
420
			Actions.WEBRTC: self.proxy_webrtc,
421
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
422
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
423
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
424
			Actions.ACCEPT_CALL: self.accept_call,
425
			Actions.ACCEPT_FILE: self.accept_file,
426
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
427
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
428
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
429
			Actions.RETRY_FILE_CONNECTION: self.retry_file_connection,
430
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
431
		})
432
		self.process_pubsub_message.update({
433
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
434
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
435
		})
436
437
	def set_opponent_call_channel(self, message):
438
		connection_id = message[VarNames.CONNECTION_ID]
439
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
440
			return True
441
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
442
443
	def offer_webrtc_connection(self, in_message):
444
		room_id = in_message[VarNames.ROOM_ID]
445
		content = in_message.get(VarNames.CONTENT)
446
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
447
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
448
		# use list because sets dont have 1st element which is offerer
449
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
450
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
451
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
452
		self_message = self.set_connection_id(qued_id, connection_id)
453
		self.ws_write(self_message)
454
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
455
		self.publish(opponents_message, room_id, True)
456
457
	def retry_file_connection(self, in_message):
458
		connection_id = in_message[VarNames.CONNECTION_ID]
459 View Code Duplication
		opponent_ws_id = in_message[VarNames.WEBRTC_OPPONENT_ID]
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
460
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
461
		receiver_ws_status = self.sync_redis.shget(connection_id, opponent_ws_id)
462
		if receiver_ws_status == WebRtcRedisStates.READY and self.id == sender_ws_id:
463
			self.publish(self.retry_file(connection_id), opponent_ws_id)
464
		else:
465
			raise ValidationError("Invalid channel status.")
466
467
	def reply_file_connection(self, in_message):
468
		connection_id = in_message[VarNames.CONNECTION_ID]
469
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
470
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
471
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
472
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
473
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
474
			self.publish(self.reply_webrtc(
475
				Actions.REPLY_FILE_CONNECTION,
476
				connection_id,
477
				HandlerNames.WEBRTC_TRANSFER,
478
				in_message[VarNames.CONTENT]
479
			), sender_ws_id)
480
		else:
481
			raise ValidationError("Invalid channel status.")
482
483
	def reply_call_connection(self, in_message):
484
		self.send_call_answer(
485
			in_message,
486
			WebRtcRedisStates.RESPONDED,
487
			Actions.REPLY_CALL_CONNECTION,
488
			[WebRtcRedisStates.OFFERED],
489
			HandlerNames.WEBRTC_TRANSFER
490
		)
491
492
	def proxy_webrtc(self, in_message):
493
		"""
494
		:type in_message: dict
495
		"""
496
		connection_id = in_message[VarNames.CONNECTION_ID]
497
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
498
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
499
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
500
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
501
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
502
				self_channel_status, opponent_channel_status
503
			))  # todo receiver should only accept proxy_webrtc from sender, sender can accept all
504
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
505
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
506
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
507
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
508
		self.logger.debug(
509
			"!! Forwarding message to channel %s, self %s, other status %s",
510
			channel,
511
			self_channel_status,
512
			opponent_channel_status
513
		)
514
		self.publish(in_message, channel)
515
516
	def close_file_connection(self, in_message):
517
		connection_id = in_message[VarNames.CONNECTION_ID]
518
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
519
		if not self_channel_status:
520
			raise Exception("Access Denied")
521
		if self_channel_status != WebRtcRedisStates.CLOSED:
522
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
523
			if sender_id == self.id:
524
				self.close_file_sender(connection_id)
525
			else:
526
				self.close_file_receiver(connection_id, in_message, sender_id)
527
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
528
529
	def close_call_connection(self, in_message):
530
		self.send_call_answer(
531
			in_message,
532
			WebRtcRedisStates.CLOSED,
533
			Actions.CLOSE_CALL_CONNECTION,
534
			[WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED],
535
			HandlerNames.PEER_CONNECTION
536
		)
537
538
	def cancel_call_connection(self, in_message):
539
		self.send_call_answer(
540
			in_message,
541
			WebRtcRedisStates.CLOSED,
542
			Actions.CANCEL_CALL_CONNECTION,
543
			[WebRtcRedisStates.OFFERED],
544
			HandlerNames.WEBRTC_TRANSFER
545
		)
546
547
	def close_file_receiver(self, connection_id, in_message, sender_id):
548
		sender_status = self.sync_redis.shget(connection_id, sender_id)
549
		if not sender_status:
550
			raise Exception("Access denied")
551
		if sender_status != WebRtcRedisStates.CLOSED:
552
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
553
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
554
			self.publish(in_message, sender_id)
555
556
	def close_file_sender(self, connection_id):
557
		values = self.sync_redis.shgetall(connection_id)
558
		del values[self.id]
559
		message = self.get_close_file_sender_message(connection_id)
560
		for ws_id in values:
561
			if values[ws_id] == WebRtcRedisStates.CLOSED:
562
				continue
563
			self.publish(message, ws_id)
564
565
	def accept_file(self, in_message):
566
		connection_id = in_message[VarNames.CONNECTION_ID]
567
		content = in_message[VarNames.CONTENT]
568
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
569
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
570
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
571
		if sender_ws_status == WebRtcRedisStates.READY \
572
				and self_ws_status in [WebRtcRedisStates.RESPONDED, WebRtcRedisStates.READY]:
573
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
574
			self.publish(self.get_accept_file_message(connection_id, content), sender_ws_id)
575
		else:
576
			raise ValidationError("Invalid channel status")
577
578
	# todo
579
	# we can use channel_status = self.sync_redis.shgetall(connection_id)
580
	# and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
581
	# if we shgetall and only then do async hset
582
	# we can catch an issue when 2 concurrent users accepted the call
583
	# but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
584
	def accept_call(self, in_message):
585
		connection_id = in_message[VarNames.CONNECTION_ID]
586
		self_status = self.sync_redis.shget(connection_id, self.id)
587
		if self_status == WebRtcRedisStates.RESPONDED:
588
			conn_users = self.sync_redis.shgetall(connection_id)
589
			self.publish_call_answer(
590
				conn_users,
591
				connection_id,
592
				HandlerNames.WEBRTC_TRANSFER,
593
				Actions.ACCEPT_CALL,
594
				WebRtcRedisStates.READY,
595
				{}
596
			)
597
		else:
598
			raise ValidationError("Invalid channel status")
599
600
	def send_call_answer(self, in_message, status_set, reply_action, allowed_state, message_handler):
601
		connection_id = in_message[VarNames.CONNECTION_ID]
602
		content = in_message.get(VarNames.CONTENT)  # cancel call can skip browser
603
		conn_users = self.sync_redis.shgetall(connection_id)
604
		if conn_users[self.id] in allowed_state:
605
			self.publish_call_answer(conn_users, connection_id, message_handler, reply_action, status_set, content)
606
		else:
607
			raise ValidationError("Invalid channel status.")
608
609
	def publish_call_answer(self, conn_users, connection_id, message_handler, reply_action, status_set, content):
610
		self.async_redis_publisher.hset(connection_id, self.id, status_set)
611
		del conn_users[self.id]
612
		message = self.reply_webrtc(reply_action, connection_id, message_handler, content)
613
		for user in conn_users:
614
			if conn_users[user] != WebRtcRedisStates.CLOSED:
615
				self.publish(message, user)