Completed
Push — master ( acef0c...5d47b2 )
by Andrew
32s
created

MessagesHandler.get_is_online()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
import json
2
import logging
3
import re
4
import urllib
5
6
from django.core.exceptions import ValidationError
7
from django.db import transaction
8
from django.db.models import Q
9
from tornado import ioloop
10
from tornado.gen import engine, Task
11
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
12
from tornado.ioloop import IOLoop
13
from tornado.web import asynchronous
14
from tornadoredis import Client
15
from django.conf import settings
16
from chat.global_redis import remove_parsable_prefix, encode_message
17
from chat.log_filters import id_generator
18
from chat.models import Message, Room, RoomUsers, Subscription, User, UserProfile, SubscriptionMessages, MessageHistory, \
19
	UploadedFile, Image
20
from chat.py2_3 import str_type, quote
21
from chat.settings import ALL_ROOM_ID, REDIS_PORT, WEBRTC_CONNECTION, GIPHY_URL, GIPHY_REGEX, FIREBASE_URL, REDIS_HOST
22
from chat.tornado.constants import VarNames, HandlerNames, Actions, RedisPrefix, WebRtcRedisStates
23
from chat.tornado.message_creator import WebRtcMessageCreator, MessagesCreator
24
from chat.utils import get_max_key, do_db, validate_edit_message, get_or_create_room, \
25
	create_room, get_message_images_videos, update_symbols, up_files_to_img
26
27
parent_logger = logging.getLogger(__name__)
28
base_logger = logging.LoggerAdapter(parent_logger, {
29
	'id': 0,
30
	'ip': '000.000.000.000'
31
})
32
33
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
34
# CONNECTION_POOL = tornadoredis.ConnectionPool(
35
# max_connections=500,
36
# wait_for_available=True)
37
38
GIPHY_API_KEY = getattr(settings, "GIPHY_API_KEY", None)
39
FIREBASE_API_KEY = getattr(settings, "FIREBASE_API_KEY", None)
40
41
class MessagesHandler(MessagesCreator):
42
43
	def __init__(self, *args, **kwargs):
44
		self.closed_channels = None
45
		super(MessagesHandler, self).__init__()
46
		self.webrtc_ids = {}
47
		self.id = None  # child init
48
		self.sex = None
49
		self.last_client_ping = 0
50
		self.sender_name = None
51
		self.user_id = 0  # anonymous by default
52
		self.ip = None
53
		from chat import global_redis
54
		self.async_redis_publisher = global_redis.async_redis_publisher
55
		self.sync_redis = global_redis.sync_redis
56
		self.channels = []
57
		self._logger = None
58
		self.async_redis = Client(host=REDIS_HOST, port=REDIS_PORT)
59
		self.patch_tornadoredis()
60
		# input websocket messages handlers
61
		# The handler is determined by @VarNames.EVENT
62
		self.process_ws_message = {
63
			Actions.GET_MESSAGES: self.process_get_messages,
64
			Actions.SEND_MESSAGE: self.process_send_message,
65
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
66
			Actions.DELETE_ROOM: self.delete_channel,
67
			Actions.EDIT_MESSAGE: self.edit_message,
68
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
69
			Actions.INVITE_USER: self.invite_user,
70
			Actions.PING: self.respond_ping,
71
			Actions.PONG: self.process_pong_message,
72
		}
73
		# Handlers for redis messages, if handler returns true - message won't be sent to client
74
		# The handler is determined by @VarNames.EVENT
75
		self.process_pubsub_message = {
76
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
77
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
78
			Actions.DELETE_ROOM: self.send_client_delete_channel,
79
			Actions.INVITE_USER: self.send_client_new_channel,
80
			Actions.PING: self.process_ping_message,
81
		}
82
83
	def patch_tornadoredis(self):  # TODO remove this
84
		fabric = type(self.async_redis.connection.readline)
85
		self.async_redis.connection.old_read = self.async_redis.connection.readline
86
87
		def new_read(new_self, callback=None):
88
			try:
89
				return new_self.old_read(callback=callback)
90
			except Exception as e:
91
				return
92
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
93
				self.logger.error(e)
94
				self.logger.error(
95
					"Exception info: "
96
					"self.id: %s ;;; "
97
					"self.connected = '%s';;; "
98
					"Redis default channel online = '%s';;; "
99
					"self.channels = '%s';;; "
100
					"self.closed_channels  = '%s';;;",
101
					self.id, self.connected, current_online, self.channels, self.closed_channels
102
				)
103
				raise e
104
105
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
106
107
	@property
108
	def connected(self):
109
		raise NotImplemented
110
111
	@connected.setter
112
	def connected(self, value):
113
		raise NotImplemented
114
115
	@property
116
	def http_client(self):
117
		raise NotImplemented
118
119
	@engine
120
	def listen(self, channels):
121
		yield Task(
122
			self.async_redis.subscribe, channels)
123
		self.async_redis.listen(self.on_pub_sub_message)
124
125
	@property
126
	def logger(self):
127
		return self._logger if self._logger else base_logger
128
129
	@engine
130
	def add_channel(self, channel):
131
		self.channels.append(channel)
132
		yield Task(self.async_redis.subscribe, (channel,))
133
134
	def get_online_from_redis(self, channel):
135
		return self.get_online_and_status_from_redis(channel)[1]
136
137
	def get_online_and_status_from_redis(self, channel):
138
		"""
139
		:rtype : (bool, list)
140
		"""
141
		online = self.sync_redis.ssmembers(channel)
142
		self.logger.debug('!! channel %s redis online: %s', channel, online)
143
		return self.parse_redis_online(online) if online else (False, [])
144
145
	def parse_redis_online(self, online):
146
		"""
147
		:rtype : (bool, list)
148
		"""
149
		result = set()
150
		user_is_online = False
151
		for decoded in online:  # py2 iteritems
152
			# : char specified in cookies_middleware.py.create_id
153
			user_id = int(decoded.split(':')[0])
154
			if user_id == self.user_id and decoded != self.id:
155
				user_is_online = True
156
			result.add(user_id)
157
		return user_is_online, list(result)
158
159
	def add_online_user(self, room_id, is_online, online):
160
		"""
161
		adds to redis
162
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
163
		:return: if user is online
164
		"""
165
		if is_online:  # Send user names to self
166
			online_user_names_mes = self.room_online(online, Actions.REFRESH_USER, room_id)
167
			self.logger.info('!! Second tab, retrieving online for self')
168
			self.ws_write(online_user_names_mes)
169
		else:  # if a new tab has been opened
170
			online.append(self.user_id)
171
			online_user_names_mes = self.room_online(online, Actions.LOGIN, room_id)
172
			self.logger.info('!! First tab, sending refresh online for all')
173
			self.publish(online_user_names_mes, room_id)
174
		return is_online
175
176
	def get_is_online(self, room_id):
177
		self.async_redis_publisher.sadd(room_id, self.id)
178
		# since we add user to online first, latest trigger will always show correct online
179
		return self.get_online_and_status_from_redis(room_id)
180
181
	def publish(self, message, channel, parsable=False):
182
		jsoned_mess = encode_message(message, parsable)
183
		self.logger.debug('<%s> %s', channel, jsoned_mess)
184
		self.async_redis_publisher.publish(channel, jsoned_mess)
185
186
	def on_pub_sub_message(self, message):
187
		"""
188
		All pubsub messages are automatically sent to client.
189
		:param message:
190
		:return:
191
		"""
192
		data = message.body
193
		if isinstance(data, str_type):  # not subscribe event
194
			prefixless_str = remove_parsable_prefix(data)
195
			if prefixless_str:
196
				dict_message = json.loads(prefixless_str)
197
				res = self.process_pubsub_message[dict_message[VarNames.EVENT]](dict_message)
198
				if not res:
199
					self.ws_write(prefixless_str)
200
			else:
201
				self.ws_write(data)
202
203
	def ws_write(self, message):
204
		raise NotImplementedError('WebSocketHandler implements')
205
206
	@asynchronous
207
	def search_giphy(self, message, query, cb):
208
		self.logger.debug("!! Asking giphy for: %s", query)
209
		def on_giphy_reply(response):
210
			try:
211
				self.logger.debug("!! Got giphy response: " + str(response.body))
212
				res =  json.loads(response.body)
213
				giphy = res['data'][0]['images']['downsized_medium']['url']
214
			except:
215
				giphy = None
216
			cb(message, giphy)
217
		url = GIPHY_URL.format(GIPHY_API_KEY, quote(query, safe=''))
218
		self.http_client.fetch(url, callback=on_giphy_reply)
219
220
	def notify_offline(self, channel, message_id):
221
		if FIREBASE_API_KEY is None:
222
			return
223
		online = self.get_online_from_redis(channel)
224
		if channel == ALL_ROOM_ID:
225
			return
226
		offline_users = RoomUsers.objects.filter(room_id=channel, notifications=True).exclude(user_id__in=online).values_list('user_id')
227
		subscriptions = Subscription.objects.filter(user__in=offline_users, inactive=False)
228
		if len(subscriptions) == 0:
229
			return
230
		new_sub_mess =[SubscriptionMessages(message_id=message_id, subscription_id=r.id) for r in subscriptions]
231
		reg_ids =[r.registration_id for r in subscriptions]
232
		SubscriptionMessages.objects.bulk_create(new_sub_mess)
233
		self.post_firebase(list(reg_ids))
234
235
	@asynchronous
236
	def post_firebase(self, reg_ids):
237
		def on_reply(response):
238
			try:
239
				self.logger.debug("!! FireBase response: " + str(response.body))
240
				response_obj = json.loads(response.body)
241
				delete = []
242
				for index, elem in enumerate(response_obj['results']):
243
					if elem.get('error') in ['NotRegistered', 'InvalidRegistration']:
244
						delete.append(reg_ids[index])
245
				if len(delete) > 0:
246
					self.logger.info("Deactivating subscriptions: %s", delete)
247
					Subscription.objects.filter(registration_id__in=delete).update(inactive=True)
248
			except Exception as e:
249
				self.logger.error("Unable to parse response" + str(e))
250
				pass
251
252
		headers = {"Content-Type": "application/json", "Authorization": "key=%s" % FIREBASE_API_KEY}
253
		body = json.dumps({"registration_ids": reg_ids})
254
		self.logger.debug("!! post_fire_message %s", body)
255
		r = HTTPRequest(FIREBASE_URL, method="POST", headers=headers, body=body)
256
		self.http_client.fetch(r, callback=on_reply)
257
258
	def isGiphy(self, content):
259
		if GIPHY_API_KEY is not None and content is not None:
260
			giphy_match = re.search(GIPHY_REGEX, content)
261
			return giphy_match.group(1) if giphy_match is not None else None
262
263
	def process_send_message(self, message):
264
		"""
265
		:type message: dict
266
		"""
267
		content = message.get(VarNames.CONTENT)
268
		giphy_match = self.isGiphy(content)
269
270
		# @transaction.atomic mysql has gone away
271
		def send_message(message, giphy=None):
272
			files = UploadedFile.objects.filter(id__in=message.get(VarNames.FILES), user_id=self.user_id)
273
			symbol = get_max_key(files)
274
			channel = message[VarNames.CHANNEL]
275
			js_id = message[VarNames.JS_MESSAGE_ID]
276
			message_db = Message(
277
				sender_id=self.user_id,
278
				content=message[VarNames.CONTENT],
279
				symbol=symbol,
280
				giphy=giphy,
281
				room_id=channel
282
			)
283
			res_files = []
284
			do_db(message_db.save)
285
			if files:
286
				images = up_files_to_img(files, message_db.id)
287
				res_files = MessagesCreator.prepare_img_video(images, message_db.id)
288
			prepared_message = self.create_send_message(
289
				message_db,
290
				Actions.PRINT_MESSAGE,
291
				res_files,
292
				js_id
293
			)
294
			self.publish(prepared_message, channel)
295
			self.notify_offline(channel, message_db.id)
296
		if giphy_match is not None:
297
			self.search_giphy(message, giphy_match, send_message)
298
		else:
299
			send_message(message)
300
301
	def create_new_room(self, message):
302
		room_name = message[VarNames.ROOM_NAME]
303
		if not room_name or len(room_name) > 16:
304
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
305
		room = Room(name=room_name)
306
		do_db(room.save)
307
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
308
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
309
		self.publish(subscribe_message, self.channel, True)
310
311
	def invite_user(self, message):
312
		room_id = message[VarNames.ROOM_ID]
313
		user_id = message[VarNames.USER_ID]
314
		room = get_or_create_room(self.channels, room_id, user_id)
315
		users_in_room = {
316
			user.id: RedisPrefix.set_js_user_structure(user.username, user.sex)
317
			for user in room.users.all()
318
		}
319
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
320
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
321
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
322
323
	def respond_ping(self, message):
324
		self.ws_write(self.responde_pong(message[VarNames.JS_MESSAGE_ID]))
325
326
	def process_pong_message(self, message):
327
		self.last_client_ping = message[VarNames.TIME]
328
329
	def process_ping_message(self, message):
330
		def call_check():
331
			if message[VarNames.TIME] != self.last_client_ping:
332
				self.close(408, "Ping timeout")
333
		IOLoop.instance().call_later(settings.PING_CLOSE_SERVER_DELAY, call_check)
334
335
	def create_user_channel(self, message):
336
		user_id = message[VarNames.USER_ID]
337
		room_id = create_room(self.user_id, user_id)
338
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id, self.user_id != user_id)
339
		self.publish(subscribe_message, self.channel, True)
340
		other_channel = RedisPrefix.generate_user(user_id)
341
		if self.channel != other_channel:
342
			self.publish(subscribe_message, other_channel, True)
343
344
	def delete_channel(self, message):
345
		room_id = message[VarNames.ROOM_ID]
346
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
347
			raise ValidationError('You are not allowed to exit this room')
348
		room = do_db(Room.objects.get, id=room_id)
349
		if room.disabled:
350
			raise ValidationError('Room is already deleted')
351
		if room.name is None:  # if private then disable
352
			room.disabled = True
353
		else:  # if public -> leave the room, delete the link
354
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
355
			online = self.get_online_from_redis(room_id)
356
			online.remove(self.user_id)
357
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
358
		room.save()
359
		message = self.unsubscribe_direct_message(room_id)
360
		self.publish(message, room_id, True)
361 View Code Duplication
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
362
	def edit_message(self, data):
363
		js_id = data[VarNames.JS_MESSAGE_ID]
364
		message = do_db(Message.objects.get, id=data[VarNames.MESSAGE_ID])
365
		validate_edit_message(self.user_id, message)
366
		message.content = data[VarNames.CONTENT]
367
		MessageHistory(message=message, content=message.content, giphy=message.giphy).save()
368
		message.edited_times += 1
369
		giphy_match = self.isGiphy(data[VarNames.CONTENT])
370
		if message.content is None:
371
			Message.objects.filter(id=data[VarNames.MESSAGE_ID]).update(deleted=True, edited_times=message.edited_times)
372
			self.publish(self.create_send_message(message, Actions.DELETE_MESSAGE, None, js_id), message.room_id)
373
		elif giphy_match is not None:
374
			self.edit_message_giphy(giphy_match, message, js_id)
375
		else:
376
			self.edit_message_edit(data, message, js_id)
377
378
	def edit_message_giphy(self, giphy_match, message, js_id):
379
		def edit_glyphy(message, giphy):
380
			do_db(Message.objects.filter(id=message.id).update, content=message.content, symbol=message.symbol, giphy=giphy,
381
					edited_times=message.edited_times)
382
			message.giphy = giphy
383
			self.publish(self.create_send_message(message, Actions.EDIT_MESSAGE, None, js_id), message.room_id)
384
385
		self.search_giphy(message, giphy_match, edit_glyphy)
386
387
	def edit_message_edit(self, data, message, js_id):
388
		action = Actions.EDIT_MESSAGE
389
		message.giphy = None
390
		files = UploadedFile.objects.filter(id__in=data.get(VarNames.FILES), user_id=self.user_id)
391
		if files:
392
			update_symbols(files, message)
393
			up_files_to_img(files, message.id)
394
		if message.symbol:  # fetch all, including that we just store
395
			db_images = Image.objects.filter(message_id=message.id)
396
			prep_files = MessagesCreator.prepare_img_video(db_images, message.id)
397
		else:
398
			prep_files = None
399
		Message.objects.filter(id=message.id).update(content=message.content, symbol=message.symbol, giphy=None, edited_times=message.edited_times)
400
		self.publish(self.create_send_message(message, action, prep_files, js_id), message.room_id)
401
402
	def send_client_new_channel(self, message):
403
		room_id = message[VarNames.ROOM_ID]
404
		self.add_channel(room_id)
405
		is_online, online = self.get_is_online(room_id=room_id)
406
		self.add_online_user(room_id, is_online, online)
407
408
	def send_client_delete_channel(self, message):
409
		room_id = message[VarNames.ROOM_ID]
410
		self.async_redis.unsubscribe((room_id,))
411
		self.async_redis_publisher.hdel(room_id, self.id)
412
		self.channels.remove(room_id)
413
414
	def process_get_messages(self, data):
415
		"""
416
		:type data: dict
417
		"""
418
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
419
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
420
		room_id = data[VarNames.CHANNEL]
421
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
422
		if header_id is None:
423
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
424
		else:
425
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
426
		imv = do_db(get_message_images_videos, messages)
427
		response = self.get_messages(messages, room_id, imv, MessagesCreator.prepare_img_video)
428
		self.ws_write(response)
429
430
431
class WebRtcMessageHandler(MessagesHandler, WebRtcMessageCreator):
432
433
	def __init__(self, *args, **kwargs):
434
		super(WebRtcMessageHandler, self).__init__(*args, **kwargs)
435
		self.process_ws_message.update({
436
			Actions.WEBRTC: self.proxy_webrtc,
437
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
438
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
439
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
440
			Actions.ACCEPT_CALL: self.accept_call,
441
			Actions.ACCEPT_FILE: self.accept_file,
442
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
443
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
444
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
445
			Actions.RETRY_FILE_CONNECTION: self.retry_file_connection,
446
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
447
		})
448
		self.process_pubsub_message.update({
449
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
450
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
451
		})
452
453
	def set_opponent_call_channel(self, message):
454
		connection_id = message[VarNames.CONNECTION_ID]
455
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
456
			return True
457
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
458
459 View Code Duplication
	def offer_webrtc_connection(self, in_message):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
460
		room_id = in_message[VarNames.CHANNEL]
461
		content = in_message.get(VarNames.CONTENT)
462
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
463
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
464
		# use list because sets dont have 1st element which is offerer
465
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
466
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
467
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
468
		self_message = self.set_connection_id(qued_id, connection_id)
469
		self.ws_write(self_message)
470
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
471
		self.publish(opponents_message, room_id, True)
472
473
	def retry_file_connection(self, in_message):
474
		connection_id = in_message[VarNames.CONNECTION_ID]
475
		opponent_ws_id = in_message[VarNames.WEBRTC_OPPONENT_ID]
476
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
477
		receiver_ws_status = self.sync_redis.shget(connection_id, opponent_ws_id)
478
		if receiver_ws_status == WebRtcRedisStates.READY and self.id == sender_ws_id:
479
			self.publish(self.retry_file(connection_id), opponent_ws_id)
480
		else:
481
			raise ValidationError("Invalid channel status.")
482
483
	def reply_file_connection(self, in_message):
484
		connection_id = in_message[VarNames.CONNECTION_ID]
485
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
486
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
487
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
488
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
489
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
490
			self.publish(self.reply_webrtc(
491
				Actions.REPLY_FILE_CONNECTION,
492
				connection_id,
493
				HandlerNames.WEBRTC_TRANSFER,
494
				in_message[VarNames.CONTENT]
495
			), sender_ws_id)
496
		else:
497
			raise ValidationError("Invalid channel status.")
498
499
	def reply_call_connection(self, in_message):
500
		self.send_call_answer(
501
			in_message,
502
			WebRtcRedisStates.RESPONDED,
503
			Actions.REPLY_CALL_CONNECTION,
504
			[WebRtcRedisStates.OFFERED],
505
			HandlerNames.WEBRTC_TRANSFER
506
		)
507
508
	def proxy_webrtc(self, in_message):
509
		"""
510
		:type in_message: dict
511
		"""
512
		connection_id = in_message[VarNames.CONNECTION_ID]
513
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
514
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
515
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
516
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
517
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
518
				self_channel_status, opponent_channel_status
519
			))  # todo receiver should only accept proxy_webrtc from sender, sender can accept all
520
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
521
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
522
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
523
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
524
		self.logger.debug(
525
			"!! Forwarding message to channel %s, self %s, other status %s",
526
			channel,
527
			self_channel_status,
528
			opponent_channel_status
529
		)
530
		self.publish(in_message, channel)
531
532
	def close_file_connection(self, in_message):
533
		connection_id = in_message[VarNames.CONNECTION_ID]
534
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
535
		if not self_channel_status:
536
			raise Exception("Access Denied")
537
		if self_channel_status != WebRtcRedisStates.CLOSED:
538
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
539
			if sender_id == self.id:
540
				self.close_file_sender(connection_id)
541
			else:
542
				self.close_file_receiver(connection_id, in_message, sender_id)
543
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
544
545
	def close_call_connection(self, in_message):
546
		self.send_call_answer(
547
			in_message,
548
			WebRtcRedisStates.CLOSED,
549
			Actions.CLOSE_CALL_CONNECTION,
550
			[WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED],
551
			HandlerNames.PEER_CONNECTION
552
		)
553
554
	def cancel_call_connection(self, in_message):
555
		self.send_call_answer(
556
			in_message,
557
			WebRtcRedisStates.CLOSED,
558
			Actions.CANCEL_CALL_CONNECTION,
559
			[WebRtcRedisStates.OFFERED],
560
			HandlerNames.WEBRTC_TRANSFER
561
		)
562
563
	def close_file_receiver(self, connection_id, in_message, sender_id):
564
		sender_status = self.sync_redis.shget(connection_id, sender_id)
565
		if not sender_status:
566
			raise Exception("Access denied")
567
		if sender_status != WebRtcRedisStates.CLOSED:
568
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
569
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
570
			self.publish(in_message, sender_id)
571
572
	def close_file_sender(self, connection_id):
573
		values = self.sync_redis.shgetall(connection_id)
574
		del values[self.id]
575
		message = self.get_close_file_sender_message(connection_id)
576
		for ws_id in values:
577
			if values[ws_id] == WebRtcRedisStates.CLOSED:
578
				continue
579
			self.publish(message, ws_id)
580
581
	def accept_file(self, in_message):
582
		connection_id = in_message[VarNames.CONNECTION_ID]
583
		content = in_message[VarNames.CONTENT]
584
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
585
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
586
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
587
		if sender_ws_status == WebRtcRedisStates.READY \
588
				and self_ws_status in [WebRtcRedisStates.RESPONDED, WebRtcRedisStates.READY]:
589
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
590
			self.publish(self.get_accept_file_message(connection_id, content), sender_ws_id)
591
		else:
592
			raise ValidationError("Invalid channel status")
593
594
	# todo
595
	# we can use channel_status = self.sync_redis.shgetall(connection_id)
596
	# and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
597
	# if we shgetall and only then do async hset
598
	# we can catch an issue when 2 concurrent users accepted the call
599
	# but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
600
	def accept_call(self, in_message):
601
		connection_id = in_message[VarNames.CONNECTION_ID]
602
		self_status = self.sync_redis.shget(connection_id, self.id)
603
		if self_status == WebRtcRedisStates.RESPONDED:
604
			conn_users = self.sync_redis.shgetall(connection_id)
605
			self.publish_call_answer(
606
				conn_users,
607
				connection_id,
608
				HandlerNames.WEBRTC_TRANSFER,
609
				Actions.ACCEPT_CALL,
610
				WebRtcRedisStates.READY,
611
				{}
612
			)
613
		else:
614
			raise ValidationError("Invalid channel status")
615
616
	def send_call_answer(self, in_message, status_set, reply_action, allowed_state, message_handler):
617
		connection_id = in_message[VarNames.CONNECTION_ID]
618
		content = in_message.get(VarNames.CONTENT)  # cancel call can skip browser
619
		conn_users = self.sync_redis.shgetall(connection_id)
620
		if conn_users[self.id] in allowed_state:
621
			self.publish_call_answer(conn_users, connection_id, message_handler, reply_action, status_set, content)
622
		else:
623
			raise ValidationError("Invalid channel status.")
624
625
	def publish_call_answer(self, conn_users, connection_id, message_handler, reply_action, status_set, content):
626
		self.async_redis_publisher.hset(connection_id, self.id, status_set)
627
		del conn_users[self.id]
628
		message = self.reply_webrtc(reply_action, connection_id, message_handler, content)
629
		for user in conn_users:
630
			if conn_users[user] != WebRtcRedisStates.CLOSED:
631
				self.publish(message, user)