Total Complexity | 36 |
Total Lines | 201 |
Duplicated Lines | 13.43 % |
Changes | 3 | ||
Bugs | 0 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
1 | import json |
||
309 | class WebRtcMessageHandler(MessagesHandler, WebRtcMessageCreator): |
||
310 | |||
311 | def __init__(self, *args, **kwargs): |
||
312 | super(WebRtcMessageHandler, self).__init__(*args, **kwargs) |
||
313 | self.pre_process_message.update({ |
||
314 | Actions.WEBRTC: self.proxy_webrtc, |
||
315 | Actions.CLOSE_FILE_CONNECTION: self.close_file_connection, |
||
316 | Actions.CLOSE_CALL_CONNECTION: self.close_call_connection, |
||
317 | Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection, |
||
318 | Actions.ACCEPT_CALL: self.accept_call, |
||
319 | Actions.ACCEPT_FILE: self.accept_file, |
||
320 | Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection, |
||
321 | Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection, |
||
322 | Actions.REPLY_FILE_CONNECTION: self.reply_file_connection, |
||
323 | Actions.RETRY_FILE_CONNECTION: self.retry_file_connection, |
||
324 | Actions.REPLY_CALL_CONNECTION: self.reply_call_connection, |
||
325 | }) |
||
326 | self.post_process_message.update({ |
||
327 | Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel, |
||
328 | Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel |
||
329 | }) |
||
330 | |||
331 | def set_opponent_call_channel(self, message): |
||
332 | connection_id = message[VarNames.CONNECTION_ID] |
||
333 | if message[VarNames.WEBRTC_OPPONENT_ID] == self.id: |
||
334 | return True |
||
335 | self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED) |
||
336 | |||
337 | def offer_webrtc_connection(self, in_message): |
||
338 | room_id = in_message[VarNames.CHANNEL] |
||
339 | content = in_message.get(VarNames.CONTENT) |
||
340 | qued_id = in_message[VarNames.WEBRTC_QUED_ID] |
||
341 | connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH) |
||
342 | # use list because sets dont have 1st element which is offerer |
||
343 | self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id) |
||
344 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
345 | opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT]) |
||
346 | self_message = self.set_connection_id(qued_id, connection_id) |
||
347 | self.ws_write(self_message) |
||
348 | self.logger.info('!! Offering a webrtc, connection_id %s', connection_id) |
||
349 | self.publish(opponents_message, room_id, True) |
||
350 | |||
351 | def retry_file_connection(self, in_message): |
||
352 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
353 | opponent_ws_id = in_message[VarNames.WEBRTC_OPPONENT_ID] |
||
354 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
355 | receiver_ws_status = self.sync_redis.shget(connection_id, opponent_ws_id) |
||
356 | if receiver_ws_status == WebRtcRedisStates.READY and self.id == sender_ws_id: |
||
357 | self.publish(self.retry_file(connection_id), opponent_ws_id) |
||
358 | else: |
||
359 | raise ValidationError("Invalid channel status.") |
||
360 | |||
361 | View Code Duplication | def reply_file_connection(self, in_message): |
|
1 ignored issue
–
show
|
|||
362 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
363 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
364 | sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id) |
||
365 | self_ws_status = self.sync_redis.shget(connection_id, self.id) |
||
366 | if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED: |
||
367 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED) |
||
368 | self.publish(self.reply_webrtc( |
||
369 | Actions.REPLY_FILE_CONNECTION, |
||
370 | connection_id, |
||
371 | HandlerNames.WEBRTC_TRANSFER, |
||
372 | in_message[VarNames.CONTENT] |
||
373 | ), sender_ws_id) |
||
374 | else: |
||
375 | raise ValidationError("Invalid channel status.") |
||
376 | |||
377 | def reply_call_connection(self, in_message): |
||
378 | self.send_call_answer( |
||
379 | in_message, |
||
380 | WebRtcRedisStates.RESPONDED, |
||
381 | Actions.REPLY_CALL_CONNECTION, |
||
382 | [WebRtcRedisStates.OFFERED], |
||
383 | HandlerNames.WEBRTC_TRANSFER |
||
384 | ) |
||
385 | |||
386 | def proxy_webrtc(self, in_message): |
||
387 | """ |
||
388 | :type in_message: dict |
||
389 | """ |
||
390 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
391 | channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID) |
||
392 | self_channel_status = self.sync_redis.shget(connection_id, self.id) |
||
393 | opponent_channel_status = self.sync_redis.shget(connection_id, channel) |
||
394 | if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY): |
||
395 | raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format( |
||
396 | self_channel_status, opponent_channel_status |
||
397 | )) # todo receiver should only accept proxy_webrtc from sender, sender can accept all |
||
398 | # I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd, |
||
399 | # 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file |
||
400 | in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id |
||
401 | in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION |
||
402 | self.logger.debug( |
||
403 | "Forwarding message to channel %s, self %s, other status %s", |
||
404 | channel, |
||
405 | self_channel_status, |
||
406 | opponent_channel_status |
||
407 | ) |
||
408 | self.publish(in_message, channel) |
||
409 | |||
410 | def close_file_connection(self, in_message): |
||
411 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
412 | self_channel_status = self.sync_redis.shget(connection_id, self.id) |
||
413 | if not self_channel_status: |
||
414 | raise Exception("Access Denied") |
||
415 | if self_channel_status != WebRtcRedisStates.CLOSED: |
||
416 | sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
417 | if sender_id == self.id: |
||
418 | self.close_file_sender(connection_id) |
||
419 | else: |
||
420 | self.close_file_receiver(connection_id, in_message, sender_id) |
||
421 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED) |
||
422 | |||
423 | def close_call_connection(self, in_message): |
||
424 | self.send_call_answer( |
||
425 | in_message, |
||
426 | WebRtcRedisStates.CLOSED, |
||
427 | Actions.CLOSE_CALL_CONNECTION, |
||
428 | [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED], |
||
429 | HandlerNames.PEER_CONNECTION |
||
430 | ) |
||
431 | |||
432 | def cancel_call_connection(self, in_message): |
||
433 | self.send_call_answer( |
||
434 | in_message, |
||
435 | WebRtcRedisStates.CLOSED, |
||
436 | Actions.CANCEL_CALL_CONNECTION, |
||
437 | [WebRtcRedisStates.OFFERED], |
||
438 | HandlerNames.WEBRTC_TRANSFER |
||
439 | ) |
||
440 | |||
441 | def close_file_receiver(self, connection_id, in_message, sender_id): |
||
442 | sender_status = self.sync_redis.shget(connection_id, sender_id) |
||
443 | if not sender_status: |
||
444 | raise Exception("Access denied") |
||
445 | if sender_status != WebRtcRedisStates.CLOSED: |
||
446 | in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id |
||
447 | in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION |
||
448 | self.publish(in_message, sender_id) |
||
449 | |||
450 | def close_file_sender(self, connection_id): |
||
451 | values = self.sync_redis.shgetall(connection_id) |
||
452 | del values[self.id] |
||
453 | message = self.get_close_file_sender_message(connection_id) |
||
454 | for ws_id in values: |
||
455 | if values[ws_id] == WebRtcRedisStates.CLOSED: |
||
456 | continue |
||
457 | self.publish(message, ws_id) |
||
458 | |||
459 | View Code Duplication | def accept_file(self, in_message): |
|
1 ignored issue
–
show
|
|||
460 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
461 | content = in_message[VarNames.CONTENT] |
||
462 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
463 | sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id) |
||
464 | self_ws_status = self.sync_redis.shget(connection_id, self.id) |
||
465 | if sender_ws_status == WebRtcRedisStates.READY \ |
||
466 | and self_ws_status in [WebRtcRedisStates.RESPONDED, WebRtcRedisStates.READY]: |
||
467 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
468 | self.publish(self.get_accept_file_message(connection_id, content), sender_ws_id) |
||
469 | else: |
||
470 | raise ValidationError("Invalid channel status") |
||
471 | |||
472 | # todo |
||
473 | # we can use channel_status = self.sync_redis.shgetall(connection_id) |
||
474 | # and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
475 | # if we shgetall and only then do async hset |
||
476 | # we can catch an issue when 2 concurrent users accepted the call |
||
477 | # but we didn't send them ACCEPT_CALL as they both were in status 'offered' |
||
478 | def accept_call(self, in_message): |
||
479 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
480 | self_status = self.sync_redis.shget(connection_id, self.id) |
||
481 | if self_status == WebRtcRedisStates.RESPONDED: |
||
482 | conn_users = self.sync_redis.shgetall(connection_id) |
||
483 | self.publish_call_answer( |
||
484 | conn_users, |
||
485 | connection_id, |
||
486 | HandlerNames.WEBRTC_TRANSFER, |
||
487 | Actions.ACCEPT_CALL, |
||
488 | WebRtcRedisStates.READY, |
||
489 | {} |
||
490 | ) |
||
491 | else: |
||
492 | raise ValidationError("Invalid channel status") |
||
493 | |||
494 | def send_call_answer(self, in_message, status_set, reply_action, allowed_state, message_handler): |
||
495 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
496 | content = in_message[VarNames.CONTENT] |
||
497 | conn_users = self.sync_redis.shgetall(connection_id) |
||
498 | if conn_users[self.id] in allowed_state: |
||
499 | self.publish_call_answer(conn_users, connection_id, message_handler, reply_action, status_set, content) |
||
500 | else: |
||
501 | raise ValidationError("Invalid channel status.") |
||
502 | |||
503 | def publish_call_answer(self, conn_users, connection_id, message_handler, reply_action, status_set, content): |
||
504 | self.async_redis_publisher.hset(connection_id, self.id, status_set) |
||
505 | del conn_users[self.id] |
||
506 | message = self.reply_webrtc(reply_action, connection_id, message_handler, content) |
||
507 | for user in conn_users: |
||
508 | if conn_users[user] != WebRtcRedisStates.CLOSED: |
||
509 | self.publish(message, user) |