Completed
Push — master ( 633e29...acef0c )
by Andrew
36s
created

chat/tornado/message_handler.py (2 issues)

1
import json
2
import logging
3
import re
4
import urllib
5
6
from django.core.exceptions import ValidationError
7
from django.db import transaction
8
from django.db.models import Q
9
from tornado import ioloop
10
from tornado.gen import engine, Task
11
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
12
from tornado.ioloop import IOLoop
13
from tornado.web import asynchronous
14
from tornadoredis import Client
15
from django.conf import settings
16
from chat.global_redis import remove_parsable_prefix, encode_message
17
from chat.log_filters import id_generator
18
from chat.models import Message, Room, RoomUsers, Subscription, User, UserProfile, SubscriptionMessages, MessageHistory, \
19
	UploadedFile, Image
20
from chat.py2_3 import str_type, quote
21
from chat.settings import ALL_ROOM_ID, REDIS_PORT, WEBRTC_CONNECTION, GIPHY_URL, GIPHY_REGEX, FIREBASE_URL, REDIS_HOST
22
from chat.tornado.constants import VarNames, HandlerNames, Actions, RedisPrefix, WebRtcRedisStates
23
from chat.tornado.message_creator import WebRtcMessageCreator, MessagesCreator
24
from chat.utils import get_max_key, do_db, validate_edit_message, get_or_create_room, \
25
	create_room, get_message_images_videos, update_symbols, up_files_to_img
26
27
parent_logger = logging.getLogger(__name__)
28
base_logger = logging.LoggerAdapter(parent_logger, {
29
	'id': 0,
30
	'ip': '000.000.000.000'
31
})
32
33
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
34
# CONNECTION_POOL = tornadoredis.ConnectionPool(
35
# max_connections=500,
36
# wait_for_available=True)
37
38
GIPHY_API_KEY = getattr(settings, "GIPHY_API_KEY", None)
39
FIREBASE_API_KEY = getattr(settings, "FIREBASE_API_KEY", None)
40
41
class MessagesHandler(MessagesCreator):
42
43
	def __init__(self, *args, **kwargs):
44
		self.closed_channels = None
45
		super(MessagesHandler, self).__init__()
46
		self.webrtc_ids = {}
47
		self.id = None  # child init
48
		self.sex = None
49
		self.last_client_ping = 0
50
		self.sender_name = None
51
		self.user_id = 0  # anonymous by default
52
		self.ip = None
53
		from chat import global_redis
54
		self.async_redis_publisher = global_redis.async_redis_publisher
55
		self.sync_redis = global_redis.sync_redis
56
		self.channels = []
57
		self._logger = None
58
		self.async_redis = Client(host=REDIS_HOST, port=REDIS_PORT)
59
		self.patch_tornadoredis()
60
		# input websocket messages handlers
61
		# The handler is determined by @VarNames.EVENT
62
		self.process_ws_message = {
63
			Actions.GET_MESSAGES: self.process_get_messages,
64
			Actions.SEND_MESSAGE: self.process_send_message,
65
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
66
			Actions.DELETE_ROOM: self.delete_channel,
67
			Actions.EDIT_MESSAGE: self.edit_message,
68
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
69
			Actions.INVITE_USER: self.invite_user,
70
			Actions.PING: self.respond_ping,
71
			Actions.PONG: self.process_pong_message,
72
		}
73
		# Handlers for redis messages, if handler returns true - message won't be sent to client
74
		# The handler is determined by @VarNames.EVENT
75
		self.process_pubsub_message = {
76
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
77
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
78
			Actions.DELETE_ROOM: self.send_client_delete_channel,
79
			Actions.INVITE_USER: self.send_client_new_channel,
80
			Actions.PING: self.process_ping_message,
81
		}
82
83
	def patch_tornadoredis(self):  # TODO remove this
84
		fabric = type(self.async_redis.connection.readline)
85
		self.async_redis.connection.old_read = self.async_redis.connection.readline
86
87
		def new_read(new_self, callback=None):
88
			try:
89
				return new_self.old_read(callback=callback)
90
			except Exception as e:
91
				return
92
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
93
				self.logger.error(e)
94
				self.logger.error(
95
					"Exception info: "
96
					"self.id: %s ;;; "
97
					"self.connected = '%s';;; "
98
					"Redis default channel online = '%s';;; "
99
					"self.channels = '%s';;; "
100
					"self.closed_channels  = '%s';;;",
101
					self.id, self.connected, current_online, self.channels, self.closed_channels
102
				)
103
				raise e
104
105
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
106
107
	@property
108
	def connected(self):
109
		raise NotImplemented
110
111
	@connected.setter
112
	def connected(self, value):
113
		raise NotImplemented
114
115
	@property
116
	def http_client(self):
117
		raise NotImplemented
118
119
	@engine
120
	def listen(self, channels):
121
		yield Task(
122
			self.async_redis.subscribe, channels)
123
		self.async_redis.listen(self.on_pub_sub_message)
124
125
	@property
126
	def logger(self):
127
		return self._logger if self._logger else base_logger
128
129
	@engine
130
	def add_channel(self, channel):
131
		self.channels.append(channel)
132
		yield Task(self.async_redis.subscribe, (channel,))
133
134
	def get_online_from_redis(self, channel):
135
		return self.get_online_and_status_from_redis(channel)[1]
136
137
	def get_online_and_status_from_redis(self, channel):
138
		"""
139
		:rtype : (bool, list)
140
		"""
141
		online = self.sync_redis.ssmembers(channel)
142
		self.logger.debug('!! channel %s redis online: %s', channel, online)
143
		return self.parse_redis_online(online) if online else (False, [])
144
145
	def parse_redis_online(self, online):
146
		"""
147
		:rtype : (bool, list)
148
		"""
149
		result = set()
150
		user_is_online = False
151
		for decoded in online:  # py2 iteritems
152
			# : char specified in cookies_middleware.py.create_id
153
			user_id = int(decoded.split(':')[0])
154
			if user_id == self.user_id and decoded != self.id:
155
				user_is_online = True
156
			result.add(user_id)
157
		return user_is_online, list(result)
158
159
	def add_online_user(self, room_id):
160
		"""
161
		adds to redis
162
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
163
		:return: if user is online
164
		"""
165
		self.async_redis_publisher.sadd(room_id, self.id)
166
		# since we add user to online first, latest trigger will always show correct online
167
		is_online, online = self.get_online_and_status_from_redis(room_id)
168
		if is_online:  # Send user names to self
169
			online_user_names_mes = self.room_online(online, Actions.REFRESH_USER, room_id)
170
			self.logger.info('!! Second tab, retrieving online for self')
171
			self.ws_write(online_user_names_mes)
172
		else:  # if a new tab has been opened
173
			online.append(self.user_id)
174
			online_user_names_mes = self.room_online(online, Actions.LOGIN, room_id)
175
			self.logger.info('!! First tab, sending refresh online for all')
176
			self.publish(online_user_names_mes, room_id)
177
		return is_online
178
179
	def publish(self, message, channel, parsable=False):
180
		jsoned_mess = encode_message(message, parsable)
181
		self.logger.debug('<%s> %s', channel, jsoned_mess)
182
		self.async_redis_publisher.publish(channel, jsoned_mess)
183
184
	def on_pub_sub_message(self, message):
185
		"""
186
		All pubsub messages are automatically sent to client.
187
		:param message:
188
		:return:
189
		"""
190
		data = message.body
191
		if isinstance(data, str_type):  # not subscribe event
192
			prefixless_str = remove_parsable_prefix(data)
193
			if prefixless_str:
194
				dict_message = json.loads(prefixless_str)
195
				res = self.process_pubsub_message[dict_message[VarNames.EVENT]](dict_message)
196
				if not res:
197
					self.ws_write(prefixless_str)
198
			else:
199
				self.ws_write(data)
200
201
	def ws_write(self, message):
202
		raise NotImplementedError('WebSocketHandler implements')
203
204
	@asynchronous
205
	def search_giphy(self, message, query, cb):
206
		self.logger.debug("!! Asking giphy for: %s", query)
207
		def on_giphy_reply(response):
208
			try:
209
				self.logger.debug("!! Got giphy response: " + str(response.body))
210
				res =  json.loads(response.body)
211
				giphy = res['data'][0]['images']['downsized_medium']['url']
212
			except:
213
				giphy = None
214
			cb(message, giphy)
215
		url = GIPHY_URL.format(GIPHY_API_KEY, quote(query, safe=''))
216
		self.http_client.fetch(url, callback=on_giphy_reply)
217
218
	def notify_offline(self, channel, message_id):
219
		if FIREBASE_API_KEY is None:
220
			return
221
		online = self.get_online_from_redis(channel)
222
		if channel == ALL_ROOM_ID:
223
			return
224
		offline_users = RoomUsers.objects.filter(room_id=channel, notifications=True).exclude(user_id__in=online).values_list('user_id')
225
		subscriptions = Subscription.objects.filter(user__in=offline_users, inactive=False)
226
		if len(subscriptions) == 0:
227
			return
228
		new_sub_mess =[SubscriptionMessages(message_id=message_id, subscription_id=r.id) for r in subscriptions]
229
		reg_ids =[r.registration_id for r in subscriptions]
230
		SubscriptionMessages.objects.bulk_create(new_sub_mess)
231
		self.post_firebase(list(reg_ids))
232
233
	@asynchronous
234
	def post_firebase(self, reg_ids):
235
		def on_reply(response):
236
			try:
237
				self.logger.debug("!! FireBase response: " + str(response.body))
238
				response_obj = json.loads(response.body)
239
				delete = []
240
				for index, elem in enumerate(response_obj['results']):
241
					if elem.get('error') in ['NotRegistered', 'InvalidRegistration']:
242
						delete.append(reg_ids[index])
243
				if len(delete) > 0:
244
					self.logger.info("Deactivating subscriptions: %s", delete)
245
					Subscription.objects.filter(registration_id__in=delete).update(inactive=True)
246
			except Exception as e:
247
				self.logger.error("Unable to parse response" + str(e))
248
				pass
249
250
		headers = {"Content-Type": "application/json", "Authorization": "key=%s" % FIREBASE_API_KEY}
251
		body = json.dumps({"registration_ids": reg_ids})
252
		self.logger.debug("!! post_fire_message %s", body)
253
		r = HTTPRequest(FIREBASE_URL, method="POST", headers=headers, body=body)
254
		self.http_client.fetch(r, callback=on_reply)
255
256
	def isGiphy(self, content):
257
		if GIPHY_API_KEY is not None and content is not None:
258
			giphy_match = re.search(GIPHY_REGEX, content)
259
			return giphy_match.group(1) if giphy_match is not None else None
260
261
	def process_send_message(self, message):
262
		"""
263
		:type message: dict
264
		"""
265
		content = message.get(VarNames.CONTENT)
266
		giphy_match = self.isGiphy(content)
267
268
		# @transaction.atomic mysql has gone away
269
		def send_message(message, giphy=None):
270
			files = UploadedFile.objects.filter(id__in=message.get(VarNames.FILES), user_id=self.user_id)
271
			symbol = get_max_key(files)
272
			channel = message[VarNames.CHANNEL]
273
			js_id = message[VarNames.JS_MESSAGE_ID]
274
			message_db = Message(
275
				sender_id=self.user_id,
276
				content=message[VarNames.CONTENT],
277
				symbol=symbol,
278
				giphy=giphy,
279
				room_id=channel
280
			)
281
			res_files = []
282
			do_db(message_db.save)
283
			if files:
284
				images = up_files_to_img(files, message_db.id)
285
				res_files = MessagesCreator.prepare_img_video(images, message_db.id)
286
			prepared_message = self.create_send_message(
287
				message_db,
288
				Actions.PRINT_MESSAGE,
289
				res_files,
290
				js_id
291
			)
292
			self.publish(prepared_message, channel)
293
			self.notify_offline(channel, message_db.id)
294
		if giphy_match is not None:
295
			self.search_giphy(message, giphy_match, send_message)
296
		else:
297
			send_message(message)
298
299
	def create_new_room(self, message):
300
		room_name = message[VarNames.ROOM_NAME]
301
		if not room_name or len(room_name) > 16:
302
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
303
		room = Room(name=room_name)
304
		do_db(room.save)
305
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
306
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
307
		self.publish(subscribe_message, self.channel, True)
308
309
	def invite_user(self, message):
310
		room_id = message[VarNames.ROOM_ID]
311
		user_id = message[VarNames.USER_ID]
312
		room = get_or_create_room(self.channels, room_id, user_id)
313
		users_in_room = {
314
			user.id: RedisPrefix.set_js_user_structure(user.username, user.sex)
315
			for user in room.users.all()
316
		}
317
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
318
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
319
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
320
321
	def respond_ping(self, message):
322
		self.ws_write(self.responde_pong(message[VarNames.JS_MESSAGE_ID]))
323
324
	def process_pong_message(self, message):
325
		self.last_client_ping = message[VarNames.TIME]
326
327
	def process_ping_message(self, message):
328
		def call_check():
329
			if message[VarNames.TIME] != self.last_client_ping:
330
				self.close(408, "Ping timeout")
331
		IOLoop.instance().call_later(settings.PING_CLOSE_SERVER_DELAY, call_check)
332
333
	def create_user_channel(self, message):
334
		user_id = message[VarNames.USER_ID]
335
		room_id = create_room(self.user_id, user_id)
336
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id, self.user_id != user_id)
337
		self.publish(subscribe_message, self.channel, True)
338
		other_channel = RedisPrefix.generate_user(user_id)
339
		if self.channel != other_channel:
340
			self.publish(subscribe_message, other_channel, True)
341
342
	def delete_channel(self, message):
343
		room_id = message[VarNames.ROOM_ID]
344
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
345
			raise ValidationError('You are not allowed to exit this room')
346
		room = do_db(Room.objects.get, id=room_id)
347
		if room.disabled:
348
			raise ValidationError('Room is already deleted')
349
		if room.name is None:  # if private then disable
350
			room.disabled = True
351
		else:  # if public -> leave the room, delete the link
352
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
353
			online = self.get_online_from_redis(room_id)
354
			online.remove(self.user_id)
355
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
356
		room.save()
357
		message = self.unsubscribe_direct_message(room_id)
358
		self.publish(message, room_id, True)
359
360
	def edit_message(self, data):
361 View Code Duplication
		js_id = data[VarNames.JS_MESSAGE_ID]
1 ignored issue
show
This code seems to be duplicated in your project.
Loading history...
362
		message = do_db(Message.objects.get, id=data[VarNames.MESSAGE_ID])
363
		validate_edit_message(self.user_id, message)
364
		message.content = data[VarNames.CONTENT]
365
		MessageHistory(message=message, content=message.content, giphy=message.giphy).save()
366
		message.edited_times += 1
367
		giphy_match = self.isGiphy(data[VarNames.CONTENT])
368
		if message.content is None:
369
			Message.objects.filter(id=data[VarNames.MESSAGE_ID]).update(deleted=True, edited_times=message.edited_times)
370
			self.publish(self.create_send_message(message, Actions.DELETE_MESSAGE, None, js_id), message.room_id)
371
		elif giphy_match is not None:
372
			self.edit_message_giphy(giphy_match, message, js_id)
373
		else:
374
			self.edit_message_edit(data, message, js_id)
375
376
	def edit_message_giphy(self, giphy_match, message, js_id):
377
		def edit_glyphy(message, giphy):
378
			do_db(Message.objects.filter(id=message.id).update, content=message.content, symbol=message.symbol, giphy=giphy,
379
					edited_times=message.edited_times)
380
			message.giphy = giphy
381
			self.publish(self.create_send_message(message, Actions.EDIT_MESSAGE, None, js_id), message.room_id)
382
383
		self.search_giphy(message, giphy_match, edit_glyphy)
384
385
	def edit_message_edit(self, data, message, js_id):
386
		action = Actions.EDIT_MESSAGE
387
		message.giphy = None
388
		files = UploadedFile.objects.filter(id__in=data.get(VarNames.FILES), user_id=self.user_id)
389
		if files:
390
			update_symbols(files, message)
391
			up_files_to_img(files, message.id)
392
		if message.symbol:  # fetch all, including that we just store
393
			db_images = Image.objects.filter(message_id=message.id)
394
			prep_files = MessagesCreator.prepare_img_video(db_images, message.id)
395
		else:
396
			prep_files = None
397
		Message.objects.filter(id=message.id).update(content=message.content, symbol=message.symbol, giphy=None, edited_times=message.edited_times)
398
		self.publish(self.create_send_message(message, action, prep_files, js_id), message.room_id)
399
400
	def send_client_new_channel(self, message):
401
		room_id = message[VarNames.ROOM_ID]
402
		self.add_channel(room_id)
403
		self.add_online_user(room_id)
404
405
	def send_client_delete_channel(self, message):
406
		room_id = message[VarNames.ROOM_ID]
407
		self.async_redis.unsubscribe((room_id,))
408
		self.async_redis_publisher.hdel(room_id, self.id)
409
		self.channels.remove(room_id)
410
411
	def process_get_messages(self, data):
412
		"""
413
		:type data: dict
414
		"""
415
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
416
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
417
		room_id = data[VarNames.CHANNEL]
418
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
419
		if header_id is None:
420
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
421
		else:
422
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
423
		imv = do_db(get_message_images_videos, messages)
424
		response = self.get_messages(messages, room_id, imv, MessagesCreator.prepare_img_video)
425
		self.ws_write(response)
426
427
428
class WebRtcMessageHandler(MessagesHandler, WebRtcMessageCreator):
429
430
	def __init__(self, *args, **kwargs):
431
		super(WebRtcMessageHandler, self).__init__(*args, **kwargs)
432
		self.process_ws_message.update({
433
			Actions.WEBRTC: self.proxy_webrtc,
434
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
435
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
436
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
437
			Actions.ACCEPT_CALL: self.accept_call,
438
			Actions.ACCEPT_FILE: self.accept_file,
439
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
440
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
441
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
442
			Actions.RETRY_FILE_CONNECTION: self.retry_file_connection,
443
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
444
		})
445
		self.process_pubsub_message.update({
446
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
447
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
448
		})
449
450
	def set_opponent_call_channel(self, message):
451
		connection_id = message[VarNames.CONNECTION_ID]
452
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
453
			return True
454
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
455
456
	def offer_webrtc_connection(self, in_message):
457
		room_id = in_message[VarNames.CHANNEL]
458
		content = in_message.get(VarNames.CONTENT)
459 View Code Duplication
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
1 ignored issue
show
This code seems to be duplicated in your project.
Loading history...
460
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
461
		# use list because sets dont have 1st element which is offerer
462
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
463
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
464
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
465
		self_message = self.set_connection_id(qued_id, connection_id)
466
		self.ws_write(self_message)
467
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
468
		self.publish(opponents_message, room_id, True)
469
470
	def retry_file_connection(self, in_message):
471
		connection_id = in_message[VarNames.CONNECTION_ID]
472
		opponent_ws_id = in_message[VarNames.WEBRTC_OPPONENT_ID]
473
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
474
		receiver_ws_status = self.sync_redis.shget(connection_id, opponent_ws_id)
475
		if receiver_ws_status == WebRtcRedisStates.READY and self.id == sender_ws_id:
476
			self.publish(self.retry_file(connection_id), opponent_ws_id)
477
		else:
478
			raise ValidationError("Invalid channel status.")
479
480
	def reply_file_connection(self, in_message):
481
		connection_id = in_message[VarNames.CONNECTION_ID]
482
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
483
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
484
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
485
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
486
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
487
			self.publish(self.reply_webrtc(
488
				Actions.REPLY_FILE_CONNECTION,
489
				connection_id,
490
				HandlerNames.WEBRTC_TRANSFER,
491
				in_message[VarNames.CONTENT]
492
			), sender_ws_id)
493
		else:
494
			raise ValidationError("Invalid channel status.")
495
496
	def reply_call_connection(self, in_message):
497
		self.send_call_answer(
498
			in_message,
499
			WebRtcRedisStates.RESPONDED,
500
			Actions.REPLY_CALL_CONNECTION,
501
			[WebRtcRedisStates.OFFERED],
502
			HandlerNames.WEBRTC_TRANSFER
503
		)
504
505
	def proxy_webrtc(self, in_message):
506
		"""
507
		:type in_message: dict
508
		"""
509
		connection_id = in_message[VarNames.CONNECTION_ID]
510
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
511
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
512
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
513
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
514
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
515
				self_channel_status, opponent_channel_status
516
			))  # todo receiver should only accept proxy_webrtc from sender, sender can accept all
517
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
518
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
519
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
520
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
521
		self.logger.debug(
522
			"!! Forwarding message to channel %s, self %s, other status %s",
523
			channel,
524
			self_channel_status,
525
			opponent_channel_status
526
		)
527
		self.publish(in_message, channel)
528
529
	def close_file_connection(self, in_message):
530
		connection_id = in_message[VarNames.CONNECTION_ID]
531
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
532
		if not self_channel_status:
533
			raise Exception("Access Denied")
534
		if self_channel_status != WebRtcRedisStates.CLOSED:
535
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
536
			if sender_id == self.id:
537
				self.close_file_sender(connection_id)
538
			else:
539
				self.close_file_receiver(connection_id, in_message, sender_id)
540
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
541
542
	def close_call_connection(self, in_message):
543
		self.send_call_answer(
544
			in_message,
545
			WebRtcRedisStates.CLOSED,
546
			Actions.CLOSE_CALL_CONNECTION,
547
			[WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED],
548
			HandlerNames.PEER_CONNECTION
549
		)
550
551
	def cancel_call_connection(self, in_message):
552
		self.send_call_answer(
553
			in_message,
554
			WebRtcRedisStates.CLOSED,
555
			Actions.CANCEL_CALL_CONNECTION,
556
			[WebRtcRedisStates.OFFERED],
557
			HandlerNames.WEBRTC_TRANSFER
558
		)
559
560
	def close_file_receiver(self, connection_id, in_message, sender_id):
561
		sender_status = self.sync_redis.shget(connection_id, sender_id)
562
		if not sender_status:
563
			raise Exception("Access denied")
564
		if sender_status != WebRtcRedisStates.CLOSED:
565
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
566
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
567
			self.publish(in_message, sender_id)
568
569
	def close_file_sender(self, connection_id):
570
		values = self.sync_redis.shgetall(connection_id)
571
		del values[self.id]
572
		message = self.get_close_file_sender_message(connection_id)
573
		for ws_id in values:
574
			if values[ws_id] == WebRtcRedisStates.CLOSED:
575
				continue
576
			self.publish(message, ws_id)
577
578
	def accept_file(self, in_message):
579
		connection_id = in_message[VarNames.CONNECTION_ID]
580
		content = in_message[VarNames.CONTENT]
581
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
582
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
583
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
584
		if sender_ws_status == WebRtcRedisStates.READY \
585
				and self_ws_status in [WebRtcRedisStates.RESPONDED, WebRtcRedisStates.READY]:
586
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
587
			self.publish(self.get_accept_file_message(connection_id, content), sender_ws_id)
588
		else:
589
			raise ValidationError("Invalid channel status")
590
591
	# todo
592
	# we can use channel_status = self.sync_redis.shgetall(connection_id)
593
	# and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
594
	# if we shgetall and only then do async hset
595
	# we can catch an issue when 2 concurrent users accepted the call
596
	# but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
597
	def accept_call(self, in_message):
598
		connection_id = in_message[VarNames.CONNECTION_ID]
599
		self_status = self.sync_redis.shget(connection_id, self.id)
600
		if self_status == WebRtcRedisStates.RESPONDED:
601
			conn_users = self.sync_redis.shgetall(connection_id)
602
			self.publish_call_answer(
603
				conn_users,
604
				connection_id,
605
				HandlerNames.WEBRTC_TRANSFER,
606
				Actions.ACCEPT_CALL,
607
				WebRtcRedisStates.READY,
608
				{}
609
			)
610
		else:
611
			raise ValidationError("Invalid channel status")
612
613
	def send_call_answer(self, in_message, status_set, reply_action, allowed_state, message_handler):
614
		connection_id = in_message[VarNames.CONNECTION_ID]
615
		content = in_message.get(VarNames.CONTENT)  # cancel call can skip browser
616
		conn_users = self.sync_redis.shgetall(connection_id)
617
		if conn_users[self.id] in allowed_state:
618
			self.publish_call_answer(conn_users, connection_id, message_handler, reply_action, status_set, content)
619
		else:
620
			raise ValidationError("Invalid channel status.")
621
622
	def publish_call_answer(self, conn_users, connection_id, message_handler, reply_action, status_set, content):
623
		self.async_redis_publisher.hset(connection_id, self.id, status_set)
624
		del conn_users[self.id]
625
		message = self.reply_webrtc(reply_action, connection_id, message_handler, content)
626
		for user in conn_users:
627
			if conn_users[user] != WebRtcRedisStates.CLOSED:
628
				self.publish(message, user)