| Total Complexity | 131 |
| Total Lines | 643 |
| Duplicated Lines | 10.58 % |
| Changes | 38 | ||
| Bugs | 4 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like MessagesHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | import json |
||
| 323 | class MessagesHandler(MessagesCreator): |
||
| 324 | |||
| 325 | def __init__(self, *args, **kwargs): |
||
| 326 | self.closed_channels = None |
||
| 327 | self.parsable_prefix = 'p' |
||
| 328 | super(MessagesHandler, self).__init__(*args, **kwargs) |
||
| 329 | self.webrtc_ids = {} |
||
| 330 | self.ip = None |
||
| 331 | from chat import global_redis |
||
| 332 | self.async_redis_publisher = global_redis.async_redis_publisher |
||
| 333 | self.sync_redis = global_redis.sync_redis |
||
| 334 | self.channels = [] |
||
| 335 | self._logger = None |
||
| 336 | self.async_redis = tornadoredis.Client(port=TORNADO_REDIS_PORT) |
||
| 337 | self.patch_tornadoredis() |
||
| 338 | self.pre_process_message = { |
||
| 339 | Actions.GET_MESSAGES: self.process_get_messages, |
||
| 340 | Actions.SEND_MESSAGE: self.process_send_message, |
||
| 341 | Actions.WEBRTC: self.proxy_webrtc, |
||
| 342 | Actions.CLOSE_FILE_CONNECTION: self.close_file_connection, |
||
| 343 | Actions.CLOSE_CALL_CONNECTION: self.close_call_connection, |
||
| 344 | Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection, |
||
| 345 | Actions.ACCEPT_CALL: self.accept_call, |
||
| 346 | Actions.ACCEPT_FILE: self.accept_file, |
||
| 347 | Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel, |
||
| 348 | Actions.DELETE_ROOM: self.delete_channel, |
||
| 349 | Actions.EDIT_MESSAGE: self.edit_message, |
||
| 350 | Actions.CREATE_ROOM_CHANNEL: self.create_new_room, |
||
| 351 | Actions.INVITE_USER: self.invite_user, |
||
| 352 | Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection, |
||
| 353 | Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection, |
||
| 354 | Actions.REPLY_FILE_CONNECTION: self.reply_file_connection, |
||
| 355 | Actions.REPLY_CALL_CONNECTION: self.reply_call_connection, |
||
| 356 | } |
||
| 357 | self.post_process_message = { |
||
| 358 | Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel, |
||
| 359 | Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel, |
||
| 360 | Actions.DELETE_ROOM: self.send_client_delete_channel, |
||
| 361 | Actions.INVITE_USER: self.send_client_new_channel, |
||
| 362 | Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel, |
||
| 363 | Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel |
||
| 364 | } |
||
| 365 | |||
| 366 | def patch_tornadoredis(self): # TODO remove this |
||
| 367 | fabric = type(self.async_redis.connection.readline) |
||
| 368 | self.async_redis.connection.old_read = self.async_redis.connection.readline |
||
| 369 | def new_read(new_self, callback=None): |
||
| 370 | try: |
||
| 371 | return new_self.old_read(callback=callback) |
||
| 372 | except Exception as e: |
||
| 373 | current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL) |
||
| 374 | self.logger.error(e) |
||
| 375 | self.logger.error( |
||
| 376 | "Exception info: " |
||
| 377 | "self.id: %s ;;; " |
||
| 378 | "self.connected = '%s';;; " |
||
| 379 | "Redis default channel online = '%s';;; " |
||
| 380 | "self.channels = '%s';;; " |
||
| 381 | "self.closed_channels = '%s';;;", |
||
| 382 | self.id, self.connected, current_online, self.channels, self.closed_channels |
||
| 383 | ) |
||
| 384 | raise e |
||
| 385 | |||
| 386 | self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection) |
||
| 387 | |||
| 388 | @property |
||
| 389 | def connected(self): |
||
| 390 | raise NotImplemented |
||
| 391 | |||
| 392 | @connected.setter |
||
| 393 | def connected(self, value): |
||
| 394 | raise NotImplemented |
||
| 395 | |||
| 396 | @tornado.gen.engine |
||
| 397 | def listen(self, channels): |
||
| 398 | yield tornado.gen.Task( |
||
| 399 | self.async_redis.subscribe, channels) |
||
| 400 | self.async_redis.listen(self.pub_sub_message) |
||
| 401 | |||
| 402 | @property |
||
| 403 | def logger(self): |
||
| 404 | return self._logger if self._logger else base_logger |
||
| 405 | |||
| 406 | @tornado.gen.engine |
||
| 407 | def add_channel(self, channel): |
||
| 408 | self.channels.append(channel) |
||
| 409 | yield tornado.gen.Task( |
||
| 410 | self.async_redis.subscribe, (channel,)) |
||
| 411 | |||
| 412 | def evaluate(self, query_set): |
||
| 413 | self.do_db(len, query_set) |
||
| 414 | return query_set |
||
| 415 | |||
| 416 | def do_db(self, callback, *args, **kwargs): |
||
| 417 | try: |
||
| 418 | return callback(*args, **kwargs) |
||
| 419 | except (OperationalError, InterfaceError) as e: |
||
| 420 | if 'MySQL server has gone away' in str(e): |
||
| 421 | self.logger.warning('%s, reconnecting' % e) |
||
| 422 | connection.close() |
||
| 423 | return callback(*args, **kwargs) |
||
| 424 | else: |
||
| 425 | raise e |
||
| 426 | |||
| 427 | def execute_query(self, query, *args, **kwargs): |
||
| 428 | cursor = connection.cursor() |
||
| 429 | cursor.execute(query, *args, **kwargs) |
||
| 430 | desc = cursor.description |
||
| 431 | return [ |
||
| 432 | dict(zip([col[0] for col in desc], row)) |
||
| 433 | for row in cursor.fetchall() |
||
| 434 | ] |
||
| 435 | |||
| 436 | def get_online_from_redis(self, channel, check_self_online=False): |
||
| 437 | """ |
||
| 438 | :rtype : dict |
||
| 439 | returns (dict, bool) if check_type is present |
||
| 440 | """ |
||
| 441 | online = self.sync_redis.smembers(channel) |
||
| 442 | self.logger.debug('!! channel %s redis online: %s', channel, online) |
||
| 443 | result = set() |
||
| 444 | user_is_online = False |
||
| 445 | # redis stores8 REDIS_USER_FORMAT, so parse them |
||
| 446 | if online: |
||
| 447 | for raw in online: # py2 iteritems |
||
| 448 | decoded = raw.decode('utf-8') |
||
| 449 | # : char specified in cookies_middleware.py.create_id |
||
| 450 | user_id = int(decoded.split(':')[0]) |
||
| 451 | if user_id == self.user_id and decoded != self.id: |
||
| 452 | user_is_online = True |
||
| 453 | result.add(user_id) |
||
| 454 | result = list(result) |
||
| 455 | return (result, user_is_online) if check_self_online else result |
||
| 456 | |||
| 457 | def add_online_user(self, room_id, offline_messages=None): |
||
| 458 | """ |
||
| 459 | adds to redis |
||
| 460 | online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
||
| 461 | :return: |
||
| 462 | """ |
||
| 463 | self.async_redis_publisher.sadd(room_id, self.id) |
||
| 464 | # since we add user to online first, latest trigger will always show correct online |
||
| 465 | online, is_online = self.get_online_from_redis(room_id, True) |
||
| 466 | if not is_online: # if a new tab has been opened |
||
| 467 | online.append(self.user_id) |
||
| 468 | online_user_names_mes = self.room_online( |
||
| 469 | online, |
||
| 470 | Actions.LOGIN, |
||
| 471 | room_id |
||
| 472 | ) |
||
| 473 | self.logger.info('!! First tab, sending refresh online for all') |
||
| 474 | self.publish(online_user_names_mes, room_id) |
||
| 475 | if offline_messages: |
||
| 476 | self.ws_write(self.load_offline_message(offline_messages, room_id)) |
||
| 477 | else: # Send user names to self |
||
| 478 | online_user_names_mes = self.room_online( |
||
| 479 | online, |
||
| 480 | Actions.REFRESH_USER, |
||
| 481 | room_id |
||
| 482 | ) |
||
| 483 | self.logger.info('!! Second tab, retrieving online for self') |
||
| 484 | self.ws_write(online_user_names_mes) |
||
| 485 | |||
| 486 | def publish(self, message, channel, parsable=False): |
||
| 487 | jsoned_mess = json.dumps(message) |
||
| 488 | self.logger.debug('<%s> %s', channel, jsoned_mess) |
||
| 489 | if parsable: |
||
| 490 | jsoned_mess = self.encode(jsoned_mess) |
||
| 491 | self.async_redis_publisher.publish(channel, jsoned_mess) |
||
| 492 | |||
| 493 | def encode(self, message): |
||
| 494 | """ |
||
| 495 | Marks message with prefix to specify that |
||
| 496 | it should be decoded and proccesed before sending to client |
||
| 497 | @param message: message to mark |
||
| 498 | @return: marked message |
||
| 499 | """ |
||
| 500 | return self.parsable_prefix + message |
||
| 501 | |||
| 502 | def remove_parsable_prefix(self, message): |
||
| 503 | if message.startswith(self.parsable_prefix): |
||
| 504 | return message[1:] |
||
| 505 | |||
| 506 | def pub_sub_message(self, message): |
||
| 507 | data = message.body |
||
| 508 | if isinstance(data, str_type): # subscribe event |
||
| 509 | prefixless_str = self.remove_parsable_prefix(data) |
||
| 510 | if prefixless_str: |
||
| 511 | dict_message = json.loads(prefixless_str) |
||
| 512 | res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message) |
||
| 513 | if not res: |
||
| 514 | self.ws_write(prefixless_str) |
||
| 515 | else: |
||
| 516 | self.ws_write(data) |
||
| 517 | |||
| 518 | def ws_write(self, message): |
||
| 519 | raise NotImplementedError('WebSocketHandler implements') |
||
| 520 | |||
| 521 | def process_send_message(self, message): |
||
| 522 | """ |
||
| 523 | :type message: dict |
||
| 524 | """ |
||
| 525 | raw_imgs = message.get(VarNames.IMG) |
||
| 526 | channel = message[VarNames.CHANNEL] |
||
| 527 | message_db = Message( |
||
| 528 | sender_id=self.user_id, |
||
| 529 | content=message[VarNames.CONTENT], |
||
| 530 | symbol=get_max_key(raw_imgs) |
||
| 531 | ) |
||
| 532 | message_db.room_id = channel |
||
| 533 | self.do_db(message_db.save) |
||
| 534 | db_images = self.save_images(raw_imgs, message_db.id) |
||
| 535 | prepared_message = self.create_send_message( |
||
| 536 | View Code Duplication | message_db, |
|
|
|
|||
| 537 | Actions.PRINT_MESSAGE, |
||
| 538 | self.prepare_img(db_images, message_db.id) |
||
| 539 | ) |
||
| 540 | self.publish(prepared_message, channel) |
||
| 541 | |||
| 542 | def close_file_connection(self, in_message): |
||
| 543 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 544 | self_channel_status = self.sync_redis.shget(connection_id, self.id) |
||
| 545 | if not self_channel_status: |
||
| 546 | raise Exception("Access Denied") |
||
| 547 | if self_channel_status != WebRtcRedisStates.CLOSED: |
||
| 548 | sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
| 549 | if sender_id == self.id: |
||
| 550 | self.close_file_sender(connection_id) |
||
| 551 | else: |
||
| 552 | self.close_file_receiver(connection_id, in_message, sender_id) |
||
| 553 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED) |
||
| 554 | |||
| 555 | def close_call_connection(self, in_message): |
||
| 556 | View Code Duplication | connection_id = in_message[VarNames.CONNECTION_ID] |
|
| 557 | conn_users = self.sync_redis.shgetall(connection_id) |
||
| 558 | if conn_users[self.id] in [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED]: |
||
| 559 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED) |
||
| 560 | del conn_users[self.id] |
||
| 561 | message = { |
||
| 562 | VarNames.EVENT: Actions.CLOSE_CALL_CONNECTION, |
||
| 563 | VarNames.CONNECTION_ID: connection_id, |
||
| 564 | VarNames.USER_ID: self.user_id, |
||
| 565 | VarNames.WEBRTC_OPPONENT_ID: self.id, |
||
| 566 | VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION, |
||
| 567 | } |
||
| 568 | for user in conn_users: |
||
| 569 | if conn_users[user] != WebRtcRedisStates.CLOSED: |
||
| 570 | self.publish(message, user) |
||
| 571 | else: |
||
| 572 | raise ValidationError("Invalid channel status.") |
||
| 573 | |||
| 574 | def cancel_call_connection(self, in_message, reply_action): |
||
| 575 | self.send_call_answer(in_message, WebRtcRedisStates.CLOSED, Actions.CANCEL_CALL_CONNECTION) |
||
| 576 | |||
| 577 | def close_file_receiver(self, connection_id, in_message, sender_id): |
||
| 578 | sender_status = self.sync_redis.shget(connection_id, sender_id) |
||
| 579 | if not sender_status: |
||
| 580 | raise Exception("Access denied") |
||
| 581 | if sender_status != WebRtcRedisStates.CLOSED: |
||
| 582 | in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id |
||
| 583 | in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION |
||
| 584 | self.publish(in_message, sender_id) |
||
| 585 | |||
| 586 | def close_file_sender(self, connection_id): |
||
| 587 | values = self.sync_redis.shgetall(connection_id) |
||
| 588 | del values[self.id] |
||
| 589 | for ws_id in values: |
||
| 590 | if values[ws_id] == WebRtcRedisStates.CLOSED: |
||
| 591 | continue |
||
| 592 | self.publish({ |
||
| 593 | VarNames.EVENT: Actions.CLOSE_FILE_CONNECTION, |
||
| 594 | VarNames.CONNECTION_ID: connection_id, |
||
| 595 | VarNames.WEBRTC_OPPONENT_ID: self.id, |
||
| 596 | VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER, |
||
| 597 | }, ws_id) |
||
| 598 | |||
| 599 | def accept_file(self, in_message): |
||
| 600 | connection_id = in_message[VarNames.CONNECTION_ID] # TODO accept all if call |
||
| 601 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
| 602 | sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id) |
||
| 603 | self_ws_status = self.sync_redis.shget(connection_id, self.id) |
||
| 604 | if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.RESPONDED: |
||
| 605 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
| 606 | self.publish({ |
||
| 607 | VarNames.EVENT: Actions.ACCEPT_FILE, |
||
| 608 | VarNames.CONNECTION_ID: connection_id, |
||
| 609 | VarNames.WEBRTC_OPPONENT_ID: self.id, |
||
| 610 | VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION, |
||
| 611 | }, sender_ws_id) |
||
| 612 | else: |
||
| 613 | raise ValidationError("Invalid channel status") |
||
| 614 | |||
| 615 | # todo |
||
| 616 | # we can use channel_status = self.sync_redis.shgetall(connection_id) |
||
| 617 | # and then self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
| 618 | # if we shgetall and only then do async hset |
||
| 619 | # we can catch an issue when 2 concurrent users accepted the call |
||
| 620 | # but we didn't send them ACCEPT_CALL as they both were in status 'offered' |
||
| 621 | def accept_call(self, in_message): |
||
| 622 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 623 | self_status = self.sync_redis.shget(connection_id, self.id) |
||
| 624 | if self_status == WebRtcRedisStates.RESPONDED: |
||
| 625 | View Code Duplication | self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.READY) |
|
| 626 | channel_status = self.sync_redis.shgetall(connection_id) |
||
| 627 | del channel_status[self.id] |
||
| 628 | message = { |
||
| 629 | VarNames.EVENT: Actions.ACCEPT_CALL, |
||
| 630 | VarNames.USER_ID: self.user_id, |
||
| 631 | VarNames.CONNECTION_ID: connection_id, |
||
| 632 | VarNames.WEBRTC_OPPONENT_ID: self.id, |
||
| 633 | VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER, |
||
| 634 | } |
||
| 635 | for key in channel_status: |
||
| 636 | if channel_status[key] != WebRtcRedisStates.CLOSED: |
||
| 637 | self.publish(message, key) |
||
| 638 | else: |
||
| 639 | raise ValidationError("Invalid channel status") |
||
| 640 | |||
| 641 | def offer_webrtc_connection(self, in_message): |
||
| 642 | room_id = in_message[VarNames.CHANNEL] |
||
| 643 | content = in_message.get(VarNames.CONTENT) |
||
| 644 | qued_id = in_message[VarNames.WEBRTC_QUED_ID] |
||
| 645 | View Code Duplication | connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH) |
|
|
1 ignored issue
–
show
|
|||
| 646 | # use list because sets dont have 1st element which is offerer |
||
| 647 | self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id) |
||
| 648 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
| 649 | opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT]) |
||
| 650 | self_message = self.set_connection_id(qued_id, connection_id) |
||
| 651 | self.ws_write(self_message) |
||
| 652 | self.logger.info('!! Offering a webrtc, connection_id %s', connection_id) |
||
| 653 | self.publish(opponents_message, room_id, True) |
||
| 654 | |||
| 655 | def reply_call_connection(self, in_message): |
||
| 656 | self.send_call_answer(in_message, WebRtcRedisStates.RESPONDED, Actions.REPLY_CALL_CONNECTION) |
||
| 657 | |||
| 658 | def send_call_answer(self, in_message, status_set, reply_action): |
||
| 659 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 660 | conn_users = self.sync_redis.shgetall(connection_id) |
||
| 661 | if conn_users[self.id] == WebRtcRedisStates.OFFERED: |
||
| 662 | self.async_redis_publisher.hset(connection_id, self.id, status_set) |
||
| 663 | del conn_users[self.id] |
||
| 664 | message = self.reply_webrtc(reply_action, connection_id) |
||
| 665 | for user in conn_users: |
||
| 666 | if conn_users[user] != WebRtcRedisStates.CLOSED: |
||
| 667 | self.publish(message, user) |
||
| 668 | else: |
||
| 669 | raise ValidationError("Invalid channel status.") |
||
| 670 | |||
| 671 | def reply_file_connection(self, in_message): |
||
| 672 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 673 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
| 674 | sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id) |
||
| 675 | self_ws_status = self.sync_redis.shget(connection_id, self.id) |
||
| 676 | if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED: |
||
| 677 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED) |
||
| 678 | self.publish(self.reply_webrtc(Actions.REPLY_FILE_CONNECTION, connection_id), sender_ws_id) |
||
| 679 | else: |
||
| 680 | raise ValidationError("Invalid channel status.") |
||
| 681 | |||
| 682 | def proxy_webrtc(self, in_message): |
||
| 683 | """ |
||
| 684 | :type in_message: dict |
||
| 685 | """ |
||
| 686 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
| 687 | channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID) |
||
| 688 | self_channel_status = self.sync_redis.shget(connection_id, self.id) |
||
| 689 | opponent_channel_status = self.sync_redis.shget(connection_id, channel) |
||
| 690 | if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY): |
||
| 691 | raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format( |
||
| 692 | self_channel_status, opponent_channel_status |
||
| 693 | )) # todo receiver should only accept proxy_webrtc from sender, sender can accept all |
||
| 694 | # I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd, |
||
| 695 | # 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file |
||
| 696 | in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id |
||
| 697 | in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION |
||
| 698 | self.logger.debug("Forwarding message to channel %s, self %s, other status %s", |
||
| 699 | channel, self_channel_status, opponent_channel_status |
||
| 700 | ) |
||
| 701 | self.publish(in_message, channel) |
||
| 702 | |||
| 703 | def create_new_room(self, message): |
||
| 704 | room_name = message[VarNames.ROOM_NAME] |
||
| 705 | if not room_name or len(room_name) > 16: |
||
| 706 | raise ValidationError('Incorrect room name "{}"'.format(room_name)) |
||
| 707 | room = Room(name=room_name) |
||
| 708 | self.do_db(room.save) |
||
| 709 | RoomUsers(room_id=room.id, user_id=self.user_id).save() |
||
| 710 | subscribe_message = self.subscribe_room_channel_message(room.id, room_name) |
||
| 711 | self.publish(subscribe_message, self.channel, True) |
||
| 712 | |||
| 713 | def invite_user(self, message): |
||
| 714 | room_id = message[VarNames.ROOM_ID] |
||
| 715 | user_id = message[VarNames.USER_ID] |
||
| 716 | if room_id not in self.channels: |
||
| 717 | raise ValidationError("Access denied, only allowed for channels {}".format(self.channels)) |
||
| 718 | room = self.do_db(Room.objects.get, id=room_id) |
||
| 719 | if room.is_private: |
||
| 720 | raise ValidationError("You can't add users to direct room, create a new room instead") |
||
| 721 | try: |
||
| 722 | Room.users.through.objects.create(room_id=room_id, user_id=user_id) |
||
| 723 | except IntegrityError: |
||
| 724 | raise ValidationError("User is already in channel") |
||
| 725 | users_in_room = {} |
||
| 726 | for user in room.users.all(): |
||
| 727 | self.set_js_user_structure(users_in_room, user.id, user.username, user.sex) |
||
| 728 | self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id) |
||
| 729 | subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room) |
||
| 730 | self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True) |
||
| 731 | |||
| 732 | def create_room(self, user_rooms, user_id): |
||
| 733 | if self.user_id == user_id: |
||
| 734 | room_ids = list([room['room_id'] for room in self.evaluate(user_rooms)]) |
||
| 735 | query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ]) |
||
| 736 | else: |
||
| 737 | rooms_query = RoomUsers.objects.filter(user_id=user_id, room__in=user_rooms) |
||
| 738 | query_res = rooms_query.values('room__id', 'room__disabled') |
||
| 739 | try: |
||
| 740 | room = self.do_db(query_res.get) |
||
| 741 | room_id = room['room__id'] |
||
| 742 | self.update_room(room_id, room['room__disabled']) |
||
| 743 | except RoomUsers.DoesNotExist: |
||
| 744 | room = Room() |
||
| 745 | room.save() |
||
| 746 | room_id = room.id |
||
| 747 | if self.user_id == user_id: |
||
| 748 | RoomUsers(user_id=self.user_id, room_id=room_id).save() |
||
| 749 | else: |
||
| 750 | RoomUsers.objects.bulk_create([ |
||
| 751 | RoomUsers(user_id=user_id, room_id=room_id), |
||
| 752 | RoomUsers(user_id=self.user_id, room_id=room_id), |
||
| 753 | ]) |
||
| 754 | return room_id |
||
| 755 | |||
| 756 | def update_room(self, room_id, disabled): |
||
| 757 | if not disabled: |
||
| 758 | raise ValidationError('This room already exist') |
||
| 759 | else: |
||
| 760 | Room.objects.filter(id=room_id).update(disabled=False) |
||
| 761 | |||
| 762 | def create_user_channel(self, message): |
||
| 763 | user_id = message[VarNames.USER_ID] |
||
| 764 | # get all self private rooms ids |
||
| 765 | user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id') |
||
| 766 | # get private room that contains another user from rooms above |
||
| 767 | room_id = self.create_room(user_rooms, user_id) |
||
| 768 | subscribe_message = self.subscribe_direct_channel_message(room_id, user_id) |
||
| 769 | self.publish(subscribe_message, self.channel, True) |
||
| 770 | other_channel = RedisPrefix.generate_user(user_id) |
||
| 771 | if self.channel != other_channel: |
||
| 772 | self.publish(subscribe_message, other_channel, True) |
||
| 773 | |||
| 774 | def delete_channel(self, message): |
||
| 775 | room_id = message[VarNames.ROOM_ID] |
||
| 776 | if room_id not in self.channels or room_id == ALL_ROOM_ID: |
||
| 777 | raise ValidationError('You are not allowed to exit this room') |
||
| 778 | room = self.do_db(Room.objects.get, id=room_id) |
||
| 779 | if room.disabled: |
||
| 780 | raise ValidationError('Room is already deleted') |
||
| 781 | if room.name is None: # if private then disable |
||
| 782 | room.disabled = True |
||
| 783 | else: # if public -> leave the room, delete the link |
||
| 784 | RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete() |
||
| 785 | online = self.get_online_from_redis(room_id) |
||
| 786 | online.remove(self.user_id) |
||
| 787 | self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id) |
||
| 788 | room.save() |
||
| 789 | message = self.unsubscribe_direct_message(room_id) |
||
| 790 | self.publish(message, room_id, True) |
||
| 791 | |||
| 792 | def edit_message(self, data): |
||
| 793 | # ord(next (iter (message['images']))) |
||
| 794 | message_id = data[VarNames.MESSAGE_ID] |
||
| 795 | message = Message.objects.get(id=message_id) |
||
| 796 | if message.sender_id != self.user_id: |
||
| 797 | raise ValidationError("You can only edit your messages") |
||
| 798 | if message.time + 600000 < get_milliseconds(): |
||
| 799 | raise ValidationError("You can only edit messages that were send not more than 10 min ago") |
||
| 800 | if message.deleted: |
||
| 801 | raise ValidationError("Already deleted") |
||
| 802 | message.content = data[VarNames.CONTENT] |
||
| 803 | selector = Message.objects.filter(id=message_id) |
||
| 804 | if message.content is None: |
||
| 805 | prep_imgs = None |
||
| 806 | selector.update(deleted=True) |
||
| 807 | action = Actions.DELETE_MESSAGE |
||
| 808 | else: |
||
| 809 | images = data.get(VarNames.IMG) |
||
| 810 | if images: |
||
| 811 | if message.symbol: |
||
| 812 | self.replace_symbols_if_needed(images, message) |
||
| 813 | new_symbol = get_max_key(images) |
||
| 814 | if message.symbol is None or new_symbol > message.symbol: |
||
| 815 | message.symbol = new_symbol |
||
| 816 | db_images = self.save_images(images, message.id) |
||
| 817 | if message.symbol: # fetch all, including that we just store |
||
| 818 | db_images = Image.objects.filter(message_id=message.id) |
||
| 819 | prep_imgs = self.prepare_img(db_images, message_id) |
||
| 820 | action = Actions.EDIT_MESSAGE |
||
| 821 | selector.update(content=message.content, symbol=message.symbol) |
||
| 822 | self.publish(self.create_send_message(message, action, prep_imgs), message.room_id) |
||
| 823 | |||
| 824 | def save_images(self, images, message_id): |
||
| 825 | db_images = [] |
||
| 826 | if images: |
||
| 827 | db_images = [Image( |
||
| 828 | message_id=message_id, |
||
| 829 | img=extract_photo( |
||
| 830 | images[k][VarNames.IMG_B64], |
||
| 831 | images[k][VarNames.IMG_FILE_NAME] |
||
| 832 | ), |
||
| 833 | symbol=k) for k in images] |
||
| 834 | Image.objects.bulk_create(db_images) |
||
| 835 | return db_images |
||
| 836 | |||
| 837 | def replace_symbols_if_needed(self, images, message): |
||
| 838 | # if message was edited user wasn't notified about that and he edits message again |
||
| 839 | # his symbol can go out of sync |
||
| 840 | order = ord(message.symbol) |
||
| 841 | new_dict = [] |
||
| 842 | for img in images: |
||
| 843 | if img <= message.symbol: |
||
| 844 | order += 1 |
||
| 845 | new_symb = chr(order) |
||
| 846 | new_dict.append({ |
||
| 847 | 'new': new_symb, |
||
| 848 | 'old': img, |
||
| 849 | 'value': images[img] |
||
| 850 | }) |
||
| 851 | message.content = message.content.replace(img, new_symb) |
||
| 852 | for d in new_dict: # dictionary changed size during iteration |
||
| 853 | del images[d['old']] |
||
| 854 | images[d['new']] = d['value'] |
||
| 855 | |||
| 856 | def send_client_new_channel(self, message): |
||
| 857 | room_id = message[VarNames.ROOM_ID] |
||
| 858 | self.add_channel(room_id) |
||
| 859 | self.add_online_user(room_id) |
||
| 860 | |||
| 861 | def set_opponent_call_channel(self, message): |
||
| 862 | connection_id = message[VarNames.CONNECTION_ID] |
||
| 863 | if message[VarNames.WEBRTC_OPPONENT_ID] == self.id: |
||
| 864 | return True |
||
| 865 | self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED) |
||
| 866 | |||
| 867 | def send_client_delete_channel(self, message): |
||
| 868 | room_id = message[VarNames.ROOM_ID] |
||
| 869 | self.async_redis.unsubscribe((room_id,)) |
||
| 870 | self.async_redis_publisher.hdel(room_id, self.id) |
||
| 871 | self.channels.remove(room_id) |
||
| 872 | |||
| 873 | def process_get_messages(self, data): |
||
| 874 | """ |
||
| 875 | :type data: dict |
||
| 876 | """ |
||
| 877 | header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None) |
||
| 878 | count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10)) |
||
| 879 | room_id = data[VarNames.CHANNEL] |
||
| 880 | self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
||
| 881 | if header_id is None: |
||
| 882 | messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
| 883 | else: |
||
| 884 | messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
| 885 | images = self.do_db(self.get_message_images, messages) |
||
| 886 | response = self.get_messages(messages, room_id, images) |
||
| 887 | self.ws_write(response) |
||
| 888 | |||
| 889 | def get_message_images(self, messages): |
||
| 890 | ids = [message.id for message in messages if message.symbol] |
||
| 891 | if ids: |
||
| 892 | images = Image.objects.filter(message_id__in=ids) |
||
| 893 | else: |
||
| 894 | images = [] |
||
| 895 | self.logger.info('!! Messages have %d images', len(images)) |
||
| 896 | return images |
||
| 897 | |||
| 898 | def get_offline_messages(self): |
||
| 899 | res = {} |
||
| 900 | off_mess = Message.objects.filter( |
||
| 901 | id__gt=F('room__roomusers__last_read_message_id'), |
||
| 902 | deleted=False, |
||
| 903 | room__roomusers__user_id=self.user_id |
||
| 904 | ) |
||
| 905 | images = self.do_db(self.get_message_images, off_mess) |
||
| 906 | for message in off_mess: |
||
| 907 | prep_m = self.create_message(message, self.prepare_img(images, message.id)) |
||
| 908 | res.setdefault(message.room_id, []).append(prep_m) |
||
| 909 | return res |
||
| 910 | |||
| 911 | def get_users_in_current_user_rooms(self): |
||
| 912 | """ |
||
| 913 | { |
||
| 914 | "ROOM_ID:1": { |
||
| 915 | "name": "All", |
||
| 916 | "users": { |
||
| 917 | "USER_ID:admin": { |
||
| 918 | "name": "USER_NAME:admin", |
||
| 919 | "sex": "SEX:Secret" |
||
| 920 | }, |
||
| 921 | "USER_ID_2": { |
||
| 922 | "name": "USER_NAME:Mike", |
||
| 923 | "sex": "Male" |
||
| 924 | } |
||
| 925 | }, |
||
| 926 | "isPrivate": true |
||
| 927 | } |
||
| 928 | } |
||
| 929 | """ |
||
| 930 | user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name') |
||
| 931 | res = {room['id']: { |
||
| 932 | VarNames.ROOM_NAME: room['name'], |
||
| 933 | VarNames.ROOM_USERS: {} |
||
| 934 | } for room in user_rooms} |
||
| 935 | room_ids = (room_id for room_id in res) |
||
| 936 | rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id') |
||
| 937 | for user in rooms_users: |
||
| 938 | self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex']) |
||
| 939 | return res |
||
| 940 | |||
| 941 | def set_js_user_structure(self, user_dict, user_id, name, sex): |
||
| 942 | user_dict[user_id] = { |
||
| 943 | VarNames.USER: name, |
||
| 944 | VarNames.GENDER: GENDERS[sex] |
||
| 945 | } |
||
| 946 | |||
| 947 | def save_ip(self): |
||
| 948 | if (self.do_db(UserJoinedInfo.objects.filter( |
||
| 949 | Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)): |
||
| 950 | return |
||
| 951 | ip_address = get_or_create_ip(self.ip, self.logger) |
||
| 952 | UserJoinedInfo.objects.create( |
||
| 953 | ip=ip_address, |
||
| 954 | user_id=self.user_id |
||
| 955 | ) |
||
| 956 | |||
| 957 | def publish_logout(self, channel, log_data): |
||
| 958 | # seems like async solves problem with connection lost and wrong data status |
||
| 959 | # http://programmers.stackexchange.com/questions/294663/how-to-store-online-status |
||
| 960 | online, is_online = self.get_online_from_redis(channel, True) |
||
| 961 | log_data[channel] = {'online': online, 'is_online': is_online} |
||
| 962 | if not is_online: |
||
| 963 | message = self.room_online(online, Actions.LOGOUT, channel) |
||
| 964 | self.publish(message, channel) |
||
| 965 | return True |
||
| 966 | |||
| 1136 |