| Total Complexity | 36 |
| Total Lines | 201 |
| Duplicated Lines | 13.43 % |
| Changes | 3 | ||
| Bugs | 0 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
| 1 | import json |
||
| 309 | class WebRtcMessageHandler(MessagesHandler, WebRtcMessageCreator): |
||
| 310 | |||
| 311 | def __init__(self, *args, **kwargs): |
||
| 312 | super(WebRtcMessageHandler, self).__init__(*args, **kwargs) |
||
| 313 | self.pre_process_message.update({ |
||
| 314 | Actions.WEBRTC: self.proxy_webrtc, |
||
| 315 | Actions.CLOSE_FILE_CONNECTION: self.close_file_connection, |
||
| 316 | Actions.CLOSE_CALL_CONNECTION: self.close_call_connection, |
||
| 317 | Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection, |
||
| 318 | Actions.ACCEPT_CALL: self.accept_call, |
||
| 319 | Actions.ACCEPT_FILE: self.accept_file, |
||
| 320 | Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection, |
||
| 321 | Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection, |
||
| 322 | Actions.REPLY_FILE_CONNECTION: self.reply_file_connection, |
||
| 323 | Actions.RETRY_FILE_CONNECTION: self.retry_file_connection, |
||
| 324 | Actions.REPLY_CALL_CONNECTION: self.reply_call_connection, |
||
| 325 | }) |
||
| 326 | self.post_process_message.update({ |
||
| 327 | Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel, |
||
| 328 | Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel |
||
| 329 | }) |
||
| 330 | |||
| 331 | def set_opponent_call_channel(self, message): |
||
| 332 | connection_id = message[VarNames.CONNECTION_ID] |
||
| 333 | if message[VarNames.WEBRTC_OPPONENT_ID] == self.id: |
||
| 334 | return True |
||
| 335 | self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED) |
||
| 336 | |||
| 337 | def offer_webrtc_connection(self, in_message): |
||
| 338 | room_id = in_message[VarNames.CHANNEL] |
||
| 339 | content = in_message.get(VarNames.CONTENT) |
||
| 340 | qued_id = in_message[VarNames.WEBRTC_QUED_ID] |
||
| 341 | connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH) |
||
| 342 | # use list because sets dont have 1st element which is offerer |
||
| 343 | self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id) |
||
| 344 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
| 345 | opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT]) |
||
| 346 | self_message = self.set_connection_id(qued_id, connection_id) |
||
| 347 | self.ws_write(self_message) |
||
| 348 | self.logger.info('!! Offering a webrtc, connection_id %s', connection_id) |
||
| 349 | self.publish(opponents_message, room_id, True) |
||
| 350 | |||
| 351 | def retry_file_connection(self, in_message): |
||
| 352 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 353 | opponent_ws_id = in_message[VarNames.WEBRTC_OPPONENT_ID] |
||
| 354 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
| 355 | receiver_ws_status = self.sync_redis.shget(connection_id, opponent_ws_id) |
||
| 356 | if receiver_ws_status == WebRtcRedisStates.READY and self.id == sender_ws_id: |
||
| 357 | self.publish(self.retry_file(connection_id), opponent_ws_id) |
||
| 358 | else: |
||
| 359 | raise ValidationError("Invalid channel status.") |
||
| 360 | |||
| 361 | View Code Duplication | def reply_file_connection(self, in_message): |
|
|
1 ignored issue
–
show
|
|||
| 362 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 363 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
| 364 | sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id) |
||
| 365 | self_ws_status = self.sync_redis.shget(connection_id, self.id) |
||
| 366 | if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED: |
||
| 367 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED) |
||
| 368 | self.publish(self.reply_webrtc( |
||
| 369 | Actions.REPLY_FILE_CONNECTION, |
||
| 370 | connection_id, |
||
| 371 | HandlerNames.WEBRTC_TRANSFER, |
||
| 372 | in_message[VarNames.CONTENT] |
||
| 373 | ), sender_ws_id) |
||
| 374 | else: |
||
| 375 | raise ValidationError("Invalid channel status.") |
||
| 376 | |||
| 377 | def reply_call_connection(self, in_message): |
||
| 378 | self.send_call_answer( |
||
| 379 | in_message, |
||
| 380 | WebRtcRedisStates.RESPONDED, |
||
| 381 | Actions.REPLY_CALL_CONNECTION, |
||
| 382 | [WebRtcRedisStates.OFFERED], |
||
| 383 | HandlerNames.WEBRTC_TRANSFER |
||
| 384 | ) |
||
| 385 | |||
| 386 | def proxy_webrtc(self, in_message): |
||
| 387 | """ |
||
| 388 | :type in_message: dict |
||
| 389 | """ |
||
| 390 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 391 | channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID) |
||
| 392 | self_channel_status = self.sync_redis.shget(connection_id, self.id) |
||
| 393 | opponent_channel_status = self.sync_redis.shget(connection_id, channel) |
||
| 394 | if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY): |
||
| 395 | raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format( |
||
| 396 | self_channel_status, opponent_channel_status |
||
| 397 | )) # todo receiver should only accept proxy_webrtc from sender, sender can accept all |
||
| 398 | # I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd, |
||
| 399 | # 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file |
||
| 400 | in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id |
||
| 401 | in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION |
||
| 402 | self.logger.debug( |
||
| 403 | "Forwarding message to channel %s, self %s, other status %s", |
||
| 404 | channel, |
||
| 405 | self_channel_status, |
||
| 406 | opponent_channel_status |
||
| 407 | ) |
||
| 408 | self.publish(in_message, channel) |
||
| 409 | |||
| 410 | def close_file_connection(self, in_message): |
||
| 411 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 412 | self_channel_status = self.sync_redis.shget(connection_id, self.id) |
||
| 413 | if not self_channel_status: |
||
| 414 | raise Exception("Access Denied") |
||
| 415 | if self_channel_status != WebRtcRedisStates.CLOSED: |
||
| 416 | sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
| 417 | if sender_id == self.id: |
||
| 418 | self.close_file_sender(connection_id) |
||
| 419 | else: |
||
| 420 | self.close_file_receiver(connection_id, in_message, sender_id) |
||
| 421 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED) |
||
| 422 | |||
| 423 | def close_call_connection(self, in_message): |
||
| 424 | self.send_call_answer( |
||
| 425 | in_message, |
||
| 426 | WebRtcRedisStates.CLOSED, |
||
| 427 | Actions.CLOSE_CALL_CONNECTION, |
||
| 428 | [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED], |
||
| 429 | HandlerNames.PEER_CONNECTION |
||
| 430 | ) |
||
| 431 | |||
| 432 | def cancel_call_connection(self, in_message): |
||
| 433 | self.send_call_answer( |
||
| 434 | in_message, |
||
| 435 | WebRtcRedisStates.CLOSED, |
||
| 436 | Actions.CANCEL_CALL_CONNECTION, |
||
| 437 | [WebRtcRedisStates.OFFERED], |
||
| 438 | HandlerNames.WEBRTC_TRANSFER |
||
| 439 | ) |
||
| 440 | |||
| 441 | def close_file_receiver(self, connection_id, in_message, sender_id): |
||
| 442 | sender_status = self.sync_redis.shget(connection_id, sender_id) |
||
| 443 | if not sender_status: |
||
| 444 | raise Exception("Access denied") |
||
| 445 | if sender_status != WebRtcRedisStates.CLOSED: |
||
| 446 | in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id |
||
| 447 | in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION |
||
| 448 | self.publish(in_message, sender_id) |
||
| 449 | |||
| 450 | def close_file_sender(self, connection_id): |
||
| 451 | values = self.sync_redis.shgetall(connection_id) |
||
| 452 | del values[self.id] |
||
| 453 | message = self.get_close_file_sender_message(connection_id) |
||
| 454 | for ws_id in values: |
||
| 455 | if values[ws_id] == WebRtcRedisStates.CLOSED: |
||
| 456 | continue |
||
| 457 | self.publish(message, ws_id) |
||
| 458 | |||
| 459 | View Code Duplication | def accept_file(self, in_message): |
|
|
1 ignored issue
–
show
|
|||
| 460 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 461 | content = in_message[VarNames.CONTENT] |
||
| 462 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
| 463 | sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id) |
||
| 464 | self_ws_status = self.sync_redis.shget(connection_id, self.id) |
||
| 465 | if sender_ws_status == WebRtcRedisStates.READY \ |
||
| 466 | and self_ws_status in [WebRtcRedisStates.RESPONDED, WebRtcRedisStates.READY]: |
||
| 467 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
| 468 | self.publish(self.get_accept_file_message(connection_id, content), sender_ws_id) |
||
| 469 | else: |
||
| 470 | raise ValidationError("Invalid channel status") |
||
| 471 | |||
| 472 | # todo |
||
| 473 | # we can use channel_status = self.sync_redis.shgetall(connection_id) |
||
| 474 | # and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
| 475 | # if we shgetall and only then do async hset |
||
| 476 | # we can catch an issue when 2 concurrent users accepted the call |
||
| 477 | # but we didn't send them ACCEPT_CALL as they both were in status 'offered' |
||
| 478 | def accept_call(self, in_message): |
||
| 479 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 480 | self_status = self.sync_redis.shget(connection_id, self.id) |
||
| 481 | if self_status == WebRtcRedisStates.RESPONDED: |
||
| 482 | conn_users = self.sync_redis.shgetall(connection_id) |
||
| 483 | self.publish_call_answer( |
||
| 484 | conn_users, |
||
| 485 | connection_id, |
||
| 486 | HandlerNames.WEBRTC_TRANSFER, |
||
| 487 | Actions.ACCEPT_CALL, |
||
| 488 | WebRtcRedisStates.READY, |
||
| 489 | {} |
||
| 490 | ) |
||
| 491 | else: |
||
| 492 | raise ValidationError("Invalid channel status") |
||
| 493 | |||
| 494 | def send_call_answer(self, in_message, status_set, reply_action, allowed_state, message_handler): |
||
| 495 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 496 | content = in_message[VarNames.CONTENT] |
||
| 497 | conn_users = self.sync_redis.shgetall(connection_id) |
||
| 498 | if conn_users[self.id] in allowed_state: |
||
| 499 | self.publish_call_answer(conn_users, connection_id, message_handler, reply_action, status_set, content) |
||
| 500 | else: |
||
| 501 | raise ValidationError("Invalid channel status.") |
||
| 502 | |||
| 503 | def publish_call_answer(self, conn_users, connection_id, message_handler, reply_action, status_set, content): |
||
| 504 | self.async_redis_publisher.hset(connection_id, self.id, status_set) |
||
| 505 | del conn_users[self.id] |
||
| 506 | message = self.reply_webrtc(reply_action, connection_id, message_handler, content) |
||
| 507 | for user in conn_users: |
||
| 508 | if conn_users[user] != WebRtcRedisStates.CLOSED: |
||
| 509 | self.publish(message, user) |