Completed
Push — master ( bc7fa4...abbc4f )
by Andrew
32s
created

WebRtcMessageHandler   B

Complexity

Total Complexity 36

Size/Duplication

Total Lines 201
Duplicated Lines 13.43 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 27
loc 201
rs 8.8
wmc 36

16 Methods

Rating   Name   Duplication   Size   Complexity  
A publish_call_answer() 0 7 3
A close_file_sender() 0 8 3
A reply_file_connection() 15 15 3
A proxy_webrtc() 0 23 3
A close_file_connection() 0 12 4
A cancel_call_connection() 0 7 1
A set_opponent_call_channel() 0 5 2
A close_file_receiver() 0 8 3
A accept_file() 12 12 3
A __init__() 0 18 1
A close_call_connection() 0 7 1
A retry_file_connection() 0 9 3
A send_call_answer() 0 8 2
A accept_call() 0 15 2
A reply_call_connection() 0 7 1
A offer_webrtc_connection() 0 13 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
import json
2
import logging
3
4
from django.core.exceptions import ValidationError
5
from django.db.models import Q
6
from tornado.gen import engine, Task
7
from tornadoredis import Client
8
9
from chat.log_filters import id_generator
10
from chat.models import Message, Room, RoomUsers
11
from chat.py2_3 import str_type
12
from chat.settings import ALL_ROOM_ID, TORNADO_REDIS_PORT, WEBRTC_CONNECTION
13
from chat.tornado.constants import VarNames, HandlerNames, Actions, RedisPrefix, WebRtcRedisStates
14
from chat.tornado.image_utils import process_images, prepare_img, save_images, get_message_images
15
from chat.tornado.message_creator import WebRtcMessageCreator, MessagesCreator
16
from chat.utils import get_max_key, do_db, validate_edit_message, get_or_create_room, \
17
	create_room
18
19
parent_logger = logging.getLogger(__name__)
20
base_logger = logging.LoggerAdapter(parent_logger, {
21
	'id': 0,
22
	'ip': '000.000.000.000'
23
})
24
25
# TODO https://github.com/leporo/tornado-redis#connection-pool-support
26
# CONNECTION_POOL = tornadoredis.ConnectionPool(
27
# max_connections=500,
28
# wait_for_available=True)
29
30
31
class MessagesHandler(MessagesCreator):
32
33
	def __init__(self, *args, **kwargs):
34
		self.closed_channels = None
35
		self.parsable_prefix = 'p'
36
		super(MessagesHandler, self).__init__()
37
		self.webrtc_ids = {}
38
		self.id = None  # child init
39
		self.sex = None
40
		self.sender_name = None
41
		self.user_id = 0  # anonymous by default
42
		self.ip = None
43
		from chat import global_redis
44
		self.async_redis_publisher = global_redis.async_redis_publisher
45
		self.sync_redis = global_redis.sync_redis
46
		self.channels = []
47
		self._logger = None
48
		self.async_redis = Client(port=TORNADO_REDIS_PORT)
49
		self.patch_tornadoredis()
50
		self.pre_process_message = {
51
			Actions.GET_MESSAGES: self.process_get_messages,
52
			Actions.SEND_MESSAGE: self.process_send_message,
53
			Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel,
54
			Actions.DELETE_ROOM: self.delete_channel,
55
			Actions.EDIT_MESSAGE: self.edit_message,
56
			Actions.CREATE_ROOM_CHANNEL: self.create_new_room,
57
			Actions.INVITE_USER: self.invite_user
58
		}
59
		self.post_process_message = {
60
			Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel,
61
			Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel,
62
			Actions.DELETE_ROOM: self.send_client_delete_channel,
63
			Actions.INVITE_USER: self.send_client_new_channel
64
		}
65
66
	def patch_tornadoredis(self):  # TODO remove this
67
		fabric = type(self.async_redis.connection.readline)
68
		self.async_redis.connection.old_read = self.async_redis.connection.readline
69
70
		def new_read(new_self, callback=None):
71
			try:
72
				return new_self.old_read(callback=callback)
73
			except Exception as e:
74
				current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL)
75
				self.logger.error(e)
76
				self.logger.error(
77
					"Exception info: "
78
					"self.id: %s ;;; "
79
					"self.connected = '%s';;; "
80
					"Redis default channel online = '%s';;; "
81
					"self.channels = '%s';;; "
82
					"self.closed_channels  = '%s';;;",
83
					self.id, self.connected, current_online, self.channels, self.closed_channels
84
				)
85
				raise e
86
87
		self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection)
88
89
	@property
90
	def connected(self):
91
		raise NotImplemented
92
93
	@connected.setter
94
	def connected(self, value):
95
		raise NotImplemented
96
97
	@engine
98
	def listen(self, channels):
99
		yield Task(
100
			self.async_redis.subscribe, channels)
101
		self.async_redis.listen(self.pub_sub_message)
102
103
	@property
104
	def logger(self):
105
		return self._logger if self._logger else base_logger
106
107
	@engine
108
	def add_channel(self, channel):
109
		self.channels.append(channel)
110
		yield Task(self.async_redis.subscribe, (channel,))
111
112
	def get_online_from_redis(self, channel):
113
		return self.get_online_and_status_from_redis(channel)[1]
114
115
	def get_online_and_status_from_redis(self, channel):
116
		"""
117
		:rtype : (bool, list)
118
		"""
119
		online = self.sync_redis.ssmembers(channel)
120
		self.logger.debug('!! channel %s redis online: %s', channel, online)
121
		return self.parse_redis_online(online) if online else (False, [])
122
123
	def parse_redis_online(self, online):
124
		"""
125
		:rtype : (bool, list)
126
		"""
127
		result = set()
128
		user_is_online = False
129
		for decoded in online:  # py2 iteritems
130
			# : char specified in cookies_middleware.py.create_id
131
			user_id = int(decoded.split(':')[0])
132
			if user_id == self.user_id and decoded != self.id:
133
				user_is_online = True
134
			result.add(user_id)
135
		return user_is_online, list(result)
136
137
	def add_online_user(self, room_id, offline_messages=None):
138
		"""
139
		adds to redis
140
		online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 }
141
		:return:
142
		"""
143
		self.async_redis_publisher.sadd(room_id, self.id)
144
		# since we add user to online first, latest trigger will always show correct online
145
		is_online, online = self.get_online_and_status_from_redis(room_id)
146
		if is_online:  # Send user names to self
147
			online_user_names_mes = self.room_online(online, Actions.REFRESH_USER, room_id)
148
			self.logger.info('!! Second tab, retrieving online for self')
149
			self.ws_write(online_user_names_mes)
150
		else:  # if a new tab has been opened
151
			online.append(self.user_id)
152
			online_user_names_mes = self.room_online(online, Actions.LOGIN, room_id)
153
			self.logger.info('!! First tab, sending refresh online for all')
154
			self.publish(online_user_names_mes, room_id)
155
			if offline_messages:
156
				self.ws_write(self.load_offline_message(offline_messages, room_id))
157
158
	def publish(self, message, channel, parsable=False):
159
		jsoned_mess = json.dumps(message)
160
		self.logger.debug('<%s> %s', channel, jsoned_mess)
161
		if parsable:
162
			jsoned_mess = self.encode(jsoned_mess)
163
		self.async_redis_publisher.publish(channel, jsoned_mess)
164
165
	def encode(self, message):
166
		"""
167
		Marks message with prefix to specify that
168
		it should be decoded and proccesed before sending to client
169
		@param message: message to mark
170
		@return: marked message
171
		"""
172
		return self.parsable_prefix + message
173
174
	def remove_parsable_prefix(self, message):
175
		if message.startswith(self.parsable_prefix):
176
			return message[1:]
177
178
	def pub_sub_message(self, message):
179
		data = message.body
180
		if isinstance(data, str_type):  # subscribe event
181
			prefixless_str = self.remove_parsable_prefix(data)
182
			if prefixless_str:
183
				dict_message = json.loads(prefixless_str)
184
				res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message)
185
				if not res:
186
					self.ws_write(prefixless_str)
187
			else:
188
				self.ws_write(data)
189
190
	def ws_write(self, message):
191
		raise NotImplementedError('WebSocketHandler implements')
192
193
	def process_send_message(self, message):
194
		"""
195
		:type message: dict
196
		"""
197
		raw_imgs = message.get(VarNames.IMG)
198
		channel = message[VarNames.CHANNEL]
199
		message_db = Message(
200
			sender_id=self.user_id,
201
			content=message[VarNames.CONTENT],
202
			symbol=get_max_key(raw_imgs)
203
		)
204
		message_db.room_id = channel
205
		do_db(message_db.save)
206
		db_images = save_images(raw_imgs, message_db.id)
207
		prepared_message = self.create_send_message(
208
			message_db,
209
			Actions.PRINT_MESSAGE,
210
			prepare_img(db_images, message_db.id)
211
		)
212
		self.publish(prepared_message, channel)
213
214
	def create_new_room(self, message):
215
		room_name = message[VarNames.ROOM_NAME]
216
		if not room_name or len(room_name) > 16:
217
			raise ValidationError('Incorrect room name "{}"'.format(room_name))
218
		room = Room(name=room_name)
219
		do_db(room.save)
220
		RoomUsers(room_id=room.id, user_id=self.user_id).save()
221
		subscribe_message = self.subscribe_room_channel_message(room.id, room_name)
222
		self.publish(subscribe_message, self.channel, True)
223
224
	def invite_user(self, message):
225
		room_id = message[VarNames.ROOM_ID]
226
		user_id = message[VarNames.USER_ID]
227
		room = get_or_create_room(self.channels, room_id, user_id)
228
		users_in_room = {
229
			user.id: RedisPrefix.set_js_user_structure(user.username, user.sex)
230
			for user in room.users.all()
231
		}
232
		self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id)
233
		subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room)
234
		self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True)
235
236
237
	def create_user_channel(self, message):
238
		user_id = message[VarNames.USER_ID]
239
		room_id = create_room(self.user_id, user_id)
240
		subscribe_message = self.subscribe_direct_channel_message(room_id, user_id)
241
		self.publish(subscribe_message, self.channel, True)
242
		other_channel = RedisPrefix.generate_user(user_id)
243
		if self.channel != other_channel:
244
			self.publish(subscribe_message, other_channel, True)
245
246
	def delete_channel(self, message):
247
		room_id = message[VarNames.ROOM_ID]
248
		if room_id not in self.channels or room_id == ALL_ROOM_ID:
249
			raise ValidationError('You are not allowed to exit this room')
250
		room = do_db(Room.objects.get, id=room_id)
251
		if room.disabled:
252
			raise ValidationError('Room is already deleted')
253
		if room.name is None:  # if private then disable
254
			room.disabled = True
255
		else:  # if public -> leave the room, delete the link
256
			RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete()
257
			online = self.get_online_from_redis(room_id)
258
			online.remove(self.user_id)
259
			self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id)
260
		room.save()
261
		message = self.unsubscribe_direct_message(room_id)
262
		self.publish(message, room_id, True)
263
264
	def edit_message(self, data):
265
		# ord(next (iter (message['images'])))
266
		message_id = data[VarNames.MESSAGE_ID]
267
		message = Message.objects.get(id=message_id)
268
		validate_edit_message(self.user_id, message)
269
		message.content = data[VarNames.CONTENT]
270
		selector = Message.objects.filter(id=message_id)
271
		if message.content is None:
272
			action = Actions.DELETE_MESSAGE
273
			prep_imgs = None
274
			selector.update(deleted=True)
275
		else:
276
			action = Actions.EDIT_MESSAGE
277
			prep_imgs = process_images(data.get(VarNames.IMG), message)
278
			selector.update(content=message.content, symbol=message.symbol)
279
		self.publish(self.create_send_message(message, action, prep_imgs), message.room_id)
280
281
	def send_client_new_channel(self, message):
282
		room_id = message[VarNames.ROOM_ID]
283
		self.add_channel(room_id)
284
		self.add_online_user(room_id)
285
286
	def send_client_delete_channel(self, message):
287
		room_id = message[VarNames.ROOM_ID]
288
		self.async_redis.unsubscribe((room_id,))
289
		self.async_redis_publisher.hdel(room_id, self.id)
290
		self.channels.remove(room_id)
291
292
	def process_get_messages(self, data):
293
		"""
294
		:type data: dict
295
		"""
296
		header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None)
297
		count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10))
298
		room_id = data[VarNames.CHANNEL]
299
		self.logger.info('!! Fetching %d messages starting from %s', count, header_id)
300
		if header_id is None:
301
			messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
302
		else:
303
			messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count]
304
		images = do_db(get_message_images, messages)
305
		response = self.get_messages(messages, room_id, images)
306
		self.ws_write(response)
307
308
309
class WebRtcMessageHandler(MessagesHandler, WebRtcMessageCreator):
310
311
	def __init__(self, *args, **kwargs):
312
		super(WebRtcMessageHandler, self).__init__(*args, **kwargs)
313
		self.pre_process_message.update({
314
			Actions.WEBRTC: self.proxy_webrtc,
315
			Actions.CLOSE_FILE_CONNECTION: self.close_file_connection,
316
			Actions.CLOSE_CALL_CONNECTION: self.close_call_connection,
317
			Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection,
318
			Actions.ACCEPT_CALL: self.accept_call,
319
			Actions.ACCEPT_FILE: self.accept_file,
320
			Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection,
321
			Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection,
322
			Actions.REPLY_FILE_CONNECTION: self.reply_file_connection,
323
			Actions.RETRY_FILE_CONNECTION: self.retry_file_connection,
324
			Actions.REPLY_CALL_CONNECTION: self.reply_call_connection,
325
		})
326
		self.post_process_message.update({
327
			Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel,
328
			Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel
329
		})
330
331
	def set_opponent_call_channel(self, message):
332
		connection_id = message[VarNames.CONNECTION_ID]
333
		if message[VarNames.WEBRTC_OPPONENT_ID] == self.id:
334
			return True
335
		self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED)
336
337
	def offer_webrtc_connection(self, in_message):
338
		room_id = in_message[VarNames.CHANNEL]
339
		content = in_message.get(VarNames.CONTENT)
340
		qued_id = in_message[VarNames.WEBRTC_QUED_ID]
341
		connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH)
342
		# use list because sets dont have 1st element which is offerer
343
		self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id)
344
		self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
345
		opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT])
346
		self_message = self.set_connection_id(qued_id, connection_id)
347
		self.ws_write(self_message)
348
		self.logger.info('!! Offering a webrtc, connection_id %s', connection_id)
349
		self.publish(opponents_message, room_id, True)
350
351
	def retry_file_connection(self, in_message):
352
		connection_id = in_message[VarNames.CONNECTION_ID]
353
		opponent_ws_id = in_message[VarNames.WEBRTC_OPPONENT_ID]
354
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
355
		receiver_ws_status = self.sync_redis.shget(connection_id, opponent_ws_id)
356
		if receiver_ws_status == WebRtcRedisStates.READY and self.id == sender_ws_id:
357
			self.publish(self.retry_file(connection_id), opponent_ws_id)
358
		else:
359
			raise ValidationError("Invalid channel status.")
360
361 View Code Duplication
	def reply_file_connection(self, in_message):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
362
		connection_id = in_message[VarNames.CONNECTION_ID]
363
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
364
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
365
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
366
		if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED:
367
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED)
368
			self.publish(self.reply_webrtc(
369
				Actions.REPLY_FILE_CONNECTION,
370
				connection_id,
371
				HandlerNames.WEBRTC_TRANSFER,
372
				in_message[VarNames.CONTENT]
373
			), sender_ws_id)
374
		else:
375
			raise ValidationError("Invalid channel status.")
376
377
	def reply_call_connection(self, in_message):
378
		self.send_call_answer(
379
			in_message,
380
			WebRtcRedisStates.RESPONDED,
381
			Actions.REPLY_CALL_CONNECTION,
382
			[WebRtcRedisStates.OFFERED],
383
			HandlerNames.WEBRTC_TRANSFER
384
		)
385
386
	def proxy_webrtc(self, in_message):
387
		"""
388
		:type in_message: dict
389
		"""
390
		connection_id = in_message[VarNames.CONNECTION_ID]
391
		channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID)
392
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
393
		opponent_channel_status = self.sync_redis.shget(connection_id, channel)
394
		if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY):
395
			raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format(
396
				self_channel_status, opponent_channel_status
397
			))  # todo receiver should only accept proxy_webrtc from sender, sender can accept all
398
		# I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd,
399
		# 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file
400
		in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
401
		in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
402
		self.logger.debug(
403
			"Forwarding message to channel %s, self %s, other status %s",
404
			channel,
405
			self_channel_status,
406
			opponent_channel_status
407
		)
408
		self.publish(in_message, channel)
409
410
	def close_file_connection(self, in_message):
411
		connection_id = in_message[VarNames.CONNECTION_ID]
412
		self_channel_status = self.sync_redis.shget(connection_id, self.id)
413
		if not self_channel_status:
414
			raise Exception("Access Denied")
415
		if self_channel_status != WebRtcRedisStates.CLOSED:
416
			sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
417
			if sender_id == self.id:
418
				self.close_file_sender(connection_id)
419
			else:
420
				self.close_file_receiver(connection_id, in_message, sender_id)
421
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED)
422
423
	def close_call_connection(self, in_message):
424
		self.send_call_answer(
425
			in_message,
426
			WebRtcRedisStates.CLOSED,
427
			Actions.CLOSE_CALL_CONNECTION,
428
			[WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED],
429
			HandlerNames.PEER_CONNECTION
430
		)
431
432
	def cancel_call_connection(self, in_message):
433
		self.send_call_answer(
434
			in_message,
435
			WebRtcRedisStates.CLOSED,
436
			Actions.CANCEL_CALL_CONNECTION,
437
			[WebRtcRedisStates.OFFERED],
438
			HandlerNames.WEBRTC_TRANSFER
439
		)
440
441
	def close_file_receiver(self, connection_id, in_message, sender_id):
442
		sender_status = self.sync_redis.shget(connection_id, sender_id)
443
		if not sender_status:
444
			raise Exception("Access denied")
445
		if sender_status != WebRtcRedisStates.CLOSED:
446
			in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id
447
			in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION
448
			self.publish(in_message, sender_id)
449
450
	def close_file_sender(self, connection_id):
451
		values = self.sync_redis.shgetall(connection_id)
452
		del values[self.id]
453
		message = self.get_close_file_sender_message(connection_id)
454
		for ws_id in values:
455
			if values[ws_id] == WebRtcRedisStates.CLOSED:
456
				continue
457
			self.publish(message, ws_id)
458
459 View Code Duplication
	def accept_file(self, in_message):
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
460
		connection_id = in_message[VarNames.CONNECTION_ID]
461
		content = in_message[VarNames.CONTENT]
462
		sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id)
463
		sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id)
464
		self_ws_status = self.sync_redis.shget(connection_id, self.id)
465
		if sender_ws_status == WebRtcRedisStates.READY \
466
				and self_ws_status in [WebRtcRedisStates.RESPONDED, WebRtcRedisStates.READY]:
467
			self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
468
			self.publish(self.get_accept_file_message(connection_id, content), sender_ws_id)
469
		else:
470
			raise ValidationError("Invalid channel status")
471
472
	# todo
473
	# we can use channel_status = self.sync_redis.shgetall(connection_id)
474
	# and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY)
475
	# if we shgetall and only then do async hset
476
	# we can catch an issue when 2 concurrent users accepted the call
477
	# but we didn't  send them ACCEPT_CALL as they both were in status 'offered'
478
	def accept_call(self, in_message):
479
		connection_id = in_message[VarNames.CONNECTION_ID]
480
		self_status = self.sync_redis.shget(connection_id, self.id)
481
		if self_status == WebRtcRedisStates.RESPONDED:
482
			conn_users = self.sync_redis.shgetall(connection_id)
483
			self.publish_call_answer(
484
				conn_users,
485
				connection_id,
486
				HandlerNames.WEBRTC_TRANSFER,
487
				Actions.ACCEPT_CALL,
488
				WebRtcRedisStates.READY,
489
				{}
490
			)
491
		else:
492
			raise ValidationError("Invalid channel status")
493
494
	def send_call_answer(self, in_message, status_set, reply_action, allowed_state, message_handler):
495
		connection_id = in_message[VarNames.CONNECTION_ID]
496
		content = in_message[VarNames.CONTENT]
497
		conn_users = self.sync_redis.shgetall(connection_id)
498
		if conn_users[self.id] in allowed_state:
499
			self.publish_call_answer(conn_users, connection_id, message_handler, reply_action, status_set, content)
500
		else:
501
			raise ValidationError("Invalid channel status.")
502
503
	def publish_call_answer(self, conn_users, connection_id, message_handler, reply_action, status_set, content):
504
		self.async_redis_publisher.hset(connection_id, self.id, status_set)
505
		del conn_users[self.id]
506
		message = self.reply_webrtc(reply_action, connection_id, message_handler, content)
507
		for user in conn_users:
508
			if conn_users[user] != WebRtcRedisStates.CLOSED:
509
				self.publish(message, user)