| Total Complexity | 76 |
| Total Lines | 404 |
| Duplicated Lines | 4.95 % |
| Changes | 13 | ||
| Bugs | 2 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like MessagesHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | import json |
||
| 249 | class MessagesHandler(MessagesCreator): |
||
| 250 | |||
| 251 | def __init__(self, *args, **kwargs): |
||
| 252 | self.parsable_prefix = 'p' |
||
| 253 | super(MessagesHandler, self).__init__(*args, **kwargs) |
||
| 254 | self.id = id(self) |
||
| 255 | self.log_id = str(self.id % 10000).rjust(4, '0') |
||
| 256 | self.ip = None |
||
| 257 | from chat import global_redis |
||
| 258 | self.async_redis_publisher = global_redis.async_redis_publisher |
||
| 259 | self.sync_redis = global_redis.sync_redis |
||
| 260 | self.channels = [] |
||
| 261 | self.call_receiver_channel = None |
||
| 262 | self.logger = None |
||
| 263 | self.async_redis = tornadoredis.Client() |
||
| 264 | self.pre_process_message = { |
||
| 265 | Actions.GET_MESSAGES: self.process_get_messages, |
||
| 266 | Actions.SEND_MESSAGE: self.process_send_message, |
||
| 267 | Actions.CALL: self.process_call, |
||
| 268 | Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel, |
||
| 269 | Actions.DELETE_ROOM: self.delete_channel, |
||
| 270 | Actions.EDIT_MESSAGE: self.edit_message, |
||
| 271 | Actions.CREATE_ROOM_CHANNEL: self.create_new_room, |
||
| 272 | Actions.INVITE_USER: self.invite_user, |
||
| 273 | } |
||
| 274 | self.post_process_message = { |
||
| 275 | Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel, |
||
| 276 | Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel, |
||
| 277 | Actions.DELETE_ROOM: self.send_client_delete_channel, |
||
| 278 | Actions.INVITE_USER: self.send_client_new_channel, |
||
| 279 | Actions.CALL: self.set_opponent_call_channel |
||
| 280 | } |
||
| 281 | |||
| 282 | @tornado.gen.engine |
||
| 283 | def listen(self, channels): |
||
| 284 | yield tornado.gen.Task( |
||
| 285 | self.async_redis.subscribe, channels) |
||
| 286 | self.async_redis.listen(self.new_message) |
||
| 287 | |||
| 288 | @tornado.gen.engine |
||
| 289 | def add_channel(self, channel): |
||
| 290 | self.channels.append(channel) |
||
| 291 | yield tornado.gen.Task( |
||
| 292 | self.async_redis.subscribe, (channel,)) |
||
| 293 | |||
| 294 | def do_db(self, callback, *args, **kwargs): |
||
| 295 | try: |
||
| 296 | return callback(*args, **kwargs) |
||
| 297 | except (OperationalError, InterfaceError) as e: # Connection has gone away |
||
| 298 | self.logger.warning('%s, reconnecting' % e) # TODO |
||
| 299 | connection.close() |
||
| 300 | return callback(*args, **kwargs) |
||
| 301 | |||
| 302 | def execute_query(self, query, *args, **kwargs): |
||
| 303 | cursor = connection.cursor() |
||
| 304 | cursor.execute(query, *args, **kwargs) |
||
| 305 | return cursor.fetchall() |
||
| 306 | |||
| 307 | def get_online_from_redis(self, channel, check_user_id=None, check_hash=None): |
||
| 308 | """ |
||
| 309 | :rtype : dict |
||
| 310 | returns (dict, bool) if check_type is present |
||
| 311 | """ |
||
| 312 | online = self.sync_redis.hgetall(channel) |
||
| 313 | self.logger.debug('!! channel %s redis online: %s', channel, online) |
||
| 314 | result = set() |
||
| 315 | user_is_online = False |
||
| 316 | # redis stores REDIS_USER_FORMAT, so parse them |
||
| 317 | if online: |
||
| 318 | for key_hash, raw_user_id in online.items(): # py2 iteritems |
||
| 319 | user_id = int(raw_user_id.decode('utf-8')) |
||
| 320 | if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')): |
||
| 321 | user_is_online = True |
||
| 322 | result.add(user_id) |
||
| 323 | result = list(result) |
||
| 324 | return (result, user_is_online) if check_user_id else result |
||
| 325 | |||
| 326 | def add_online_user(self, room_id, offline_messages=None): |
||
| 327 | """ |
||
| 328 | adds to redis |
||
| 329 | online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
||
| 330 | :return: |
||
| 331 | """ |
||
| 332 | online = self.get_online_from_redis(room_id) |
||
| 333 | self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user) |
||
| 334 | if self.user_id not in online: # if a new tab has been opened |
||
| 335 | online.append(self.user_id) |
||
| 336 | online_user_names_mes = self.room_online( |
||
| 337 | online, |
||
| 338 | Actions.LOGIN, |
||
| 339 | room_id |
||
| 340 | ) |
||
| 341 | self.logger.info('!! First tab, sending refresh online for all') |
||
| 342 | self.publish(online_user_names_mes, room_id) |
||
| 343 | if offline_messages: |
||
| 344 | self.safe_write(self.load_offline_message(offline_messages, room_id)) |
||
| 345 | else: # Send user names to self |
||
| 346 | online_user_names_mes = self.room_online( |
||
| 347 | online, |
||
| 348 | Actions.REFRESH_USER, |
||
| 349 | room_id |
||
| 350 | ) |
||
| 351 | self.logger.info('!! Second tab, retrieving online for self') |
||
| 352 | self.safe_write(online_user_names_mes) |
||
| 353 | |||
| 354 | def publish(self, message, channel, parsable=False): |
||
| 355 | jsoned_mess = json.dumps(message) |
||
| 356 | self.logger.debug('<%s> %s', channel, jsoned_mess) |
||
| 357 | if parsable: |
||
| 358 | jsoned_mess = self.encode(jsoned_mess) |
||
| 359 | self.async_redis_publisher.publish(channel, jsoned_mess) |
||
| 360 | |||
| 361 | def encode(self, message): |
||
| 362 | """ |
||
| 363 | Marks message with prefix to specify that |
||
| 364 | it should be decoded and proccesed before sending to client |
||
| 365 | @param message: message to mark |
||
| 366 | @return: marked message |
||
| 367 | """ |
||
| 368 | return self.parsable_prefix + message |
||
| 369 | |||
| 370 | def decode(self, message): |
||
| 371 | """ |
||
| 372 | Check if message should be proccessed by server before writing to client |
||
| 373 | @param message: message to check |
||
| 374 | @return: Object structure of message if it should be processed, None if not |
||
| 375 | """ |
||
| 376 | if message.startswith(self.parsable_prefix): |
||
| 377 | return json.loads(message[1:]) |
||
| 378 | |||
| 379 | def new_message(self, message): |
||
| 380 | data = message.body |
||
| 381 | if type(data) is not int: # subscribe event |
||
| 382 | decoded = self.decode(data) |
||
| 383 | if decoded: |
||
| 384 | data = decoded |
||
| 385 | self.safe_write(data) |
||
| 386 | if decoded: |
||
| 387 | self.post_process_message[decoded[VarNames.EVENT]](decoded) |
||
| 388 | |||
| 389 | def safe_write(self, message): |
||
| 390 | raise NotImplementedError('WebSocketHandler implements') |
||
| 391 | |||
| 392 | def process_send_message(self, message): |
||
| 393 | """ |
||
| 394 | :type message: dict |
||
| 395 | """ |
||
| 396 | channel = message[VarNames.CHANNEL] |
||
| 397 | message_db = Message( |
||
| 398 | sender_id=self.user_id, |
||
| 399 | content=message[VarNames.CONTENT] |
||
| 400 | ) |
||
| 401 | message_db.room_id = channel |
||
| 402 | if VarNames.IMG in message: |
||
| 403 | message_db.img = extract_photo(message[VarNames.IMG]) |
||
| 404 | self.do_db(message_db.save) # exit on hacked id with exception |
||
| 405 | prepared_message = self.create_send_message(message_db) |
||
| 406 | self.publish(prepared_message, channel) |
||
| 407 | |||
| 408 | def process_call(self, in_message): |
||
| 409 | """ |
||
| 410 | :type in_message: dict |
||
| 411 | """ |
||
| 412 | call_type = in_message.get(VarNames.CALL_TYPE) |
||
| 413 | set_opponent_channel = False |
||
| 414 | out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type) |
||
| 415 | if call_type == CallType.OFFER: |
||
| 416 | room_id = in_message[VarNames.CHANNEL] |
||
| 417 | user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True)) |
||
| 418 | self.call_receiver_channel = RedisPrefix.generate_user(user.user_id) |
||
| 419 | set_opponent_channel = True |
||
| 420 | out_message[VarNames.CHANNEL] = room_id |
||
| 421 | # TODO |
||
| 422 | self.logger.info('!! Offering a call to user with id %s', self.call_receiver_channel) |
||
| 423 | self.publish(out_message, self.call_receiver_channel, set_opponent_channel) |
||
| 424 | |||
| 425 | def create_new_room(self, message): |
||
| 426 | room_name = message[VarNames.ROOM_NAME] |
||
| 427 | if not room_name or len(room_name) > 16: |
||
| 428 | raise ValidationError('Incorrect room name "{}"'.format(room_name)) |
||
| 429 | room = Room(name=room_name) |
||
| 430 | self.do_db(room.save) |
||
| 431 | RoomUsers(room_id=room.id, user_id=self.user_id).save() |
||
| 432 | subscribe_message = self.subscribe_room_channel_message(room.id, room_name) |
||
| 433 | self.publish(subscribe_message, self.channel, True) |
||
| 434 | |||
| 435 | def invite_user(self, message): |
||
| 436 | room_id = message[VarNames.ROOM_ID] |
||
| 437 | user_id = message[VarNames.USER_ID] |
||
| 438 | if room_id not in self.channels: |
||
| 439 | raise ValidationError("Access denied, only allowed for channels {}".format(self.channels)) |
||
| 440 | room = self.do_db(Room.objects.get, id=room_id) |
||
| 441 | if room.is_private: |
||
| 442 | raise ValidationError("You can't add users to direct room, create a new room instead") |
||
| 443 | try: |
||
| 444 | Room.users.through.objects.create(room_id=room_id, user_id=user_id) |
||
| 445 | except IntegrityError: |
||
| 446 | raise ValidationError("User is already in channel") |
||
| 447 | users_in_room = {} |
||
| 448 | for user in room.users.all(): |
||
| 449 | self.set_js_user_structure(users_in_room, user.id, user.username, user.sex) |
||
| 450 | self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id) |
||
| 451 | subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room) |
||
| 452 | self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True) |
||
| 453 | |||
| 454 | def create_self_room(self, user_rooms): |
||
| 455 | rooms_ids = list([room['room_id'] for room in user_rooms]) |
||
| 456 | query_res = self.execute_query(SELECT_SELF_ROOM, [rooms_ids,]) |
||
| 457 | View Code Duplication | if len(query_res) > 0: |
|
|
|
|||
| 458 | room = query_res[0] |
||
| 459 | room_id = room[0] |
||
| 460 | self.update_room(room_id, room[1]) |
||
| 461 | else: |
||
| 462 | room = Room() |
||
| 463 | room.save() |
||
| 464 | room_id = room.id |
||
| 465 | RoomUsers(user_id=self.user_id, room_id=room_id).save() |
||
| 466 | return room_id |
||
| 467 | |||
| 468 | def create_other_room(self, user_rooms, user_id): |
||
| 469 | query_res = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms).values('room__id', 'room__disabled') |
||
| 470 | View Code Duplication | if len(query_res) > 0: |
|
| 471 | room = query_res[0] |
||
| 472 | room_id = room['room__id'] |
||
| 473 | self.update_room(room_id, room['room__disabled']) |
||
| 474 | else: |
||
| 475 | room = Room() |
||
| 476 | room.save() |
||
| 477 | room_id = room.id |
||
| 478 | RoomUsers.objects.bulk_create([ |
||
| 479 | RoomUsers(user_id=user_id, room_id=room_id), |
||
| 480 | RoomUsers(user_id=self.user_id, room_id=room_id), |
||
| 481 | ]) |
||
| 482 | return room_id |
||
| 483 | |||
| 484 | def update_room(self, room_id, disabled): |
||
| 485 | if not disabled: |
||
| 486 | raise ValidationError('This room already exist') |
||
| 487 | else: |
||
| 488 | Room.objects.filter(id=room_id).update(disabled=False) |
||
| 489 | |||
| 490 | def create_user_channel(self, message): |
||
| 491 | user_id = message[VarNames.USER_ID] |
||
| 492 | # get all self private rooms ids |
||
| 493 | user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id') |
||
| 494 | # get private room that contains another user from rooms above |
||
| 495 | if self.user_id == user_id: |
||
| 496 | room_id = self.create_self_room(user_rooms) |
||
| 497 | else: |
||
| 498 | room_id = self.create_other_room(user_rooms, user_id) |
||
| 499 | subscribe_message = self.subscribe_direct_channel_message(room_id, user_id) |
||
| 500 | self.publish(subscribe_message, self.channel, True) |
||
| 501 | other_channel = RedisPrefix.generate_user(user_id) |
||
| 502 | if self.channel != other_channel: |
||
| 503 | self.publish(subscribe_message, other_channel, True) |
||
| 504 | |||
| 505 | def delete_channel(self, message): |
||
| 506 | room_id = message[VarNames.ROOM_ID] |
||
| 507 | if room_id not in self.channels or room_id == ALL_ROOM_ID: |
||
| 508 | raise ValidationError('You are not allowed to exit this room') |
||
| 509 | room = self.do_db(Room.objects.get, id=room_id) |
||
| 510 | if room.disabled: |
||
| 511 | raise ValidationError('Room is already deleted') |
||
| 512 | if room.name is None: # if private then disable |
||
| 513 | room.disabled = True |
||
| 514 | else: # if public -> leave the room, delete the link |
||
| 515 | RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete() |
||
| 516 | online = self.get_online_from_redis(room_id) |
||
| 517 | online.remove(self.user_id) |
||
| 518 | self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id) |
||
| 519 | room.save() |
||
| 520 | message = self.unsubscribe_direct_message(room_id) |
||
| 521 | self.publish(message, room_id, True) |
||
| 522 | |||
| 523 | def edit_message(self, data): |
||
| 524 | message_id = data[VarNames.MESSAGE_ID] |
||
| 525 | message = Message.objects.get(id=message_id) |
||
| 526 | if message.sender_id != self.user_id: |
||
| 527 | raise ValidationError("You can only edit your messages") |
||
| 528 | if message.time + 60000 < get_milliseconds(): |
||
| 529 | raise ValidationError("You can only edit messages that were send not more than 1 min ago") |
||
| 530 | if message.deleted: |
||
| 531 | raise ValidationError("Already deleted") |
||
| 532 | message.content = data[VarNames.CONTENT] |
||
| 533 | selector = Message.objects.filter(id=message_id) |
||
| 534 | if message.content is None: |
||
| 535 | selector.update(deleted=True) |
||
| 536 | action = Actions.DELETE_MESSAGE |
||
| 537 | else: |
||
| 538 | action = Actions.EDIT_MESSAGE |
||
| 539 | selector.update(content=message.content) |
||
| 540 | self.publish(self.create_send_message(message, action), message.room_id) |
||
| 541 | |||
| 542 | def send_client_new_channel(self, message): |
||
| 543 | room_id = message[VarNames.ROOM_ID] |
||
| 544 | self.add_channel(room_id) |
||
| 545 | self.add_online_user(room_id)# TODO doesnt work if already subscribed |
||
| 546 | |||
| 547 | def set_opponent_call_channel(self, message): |
||
| 548 | self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID]) |
||
| 549 | |||
| 550 | def send_client_delete_channel(self, message): |
||
| 551 | room_id = message[VarNames.ROOM_ID] |
||
| 552 | self.async_redis.unsubscribe((room_id,)) |
||
| 553 | self.async_redis_publisher.hdel(room_id, self.id) |
||
| 554 | self.channels.remove(room_id) |
||
| 555 | |||
| 556 | def process_get_messages(self, data): |
||
| 557 | """ |
||
| 558 | :type data: dict |
||
| 559 | """ |
||
| 560 | header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None) |
||
| 561 | count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10)) |
||
| 562 | room_id = data[VarNames.CHANNEL] |
||
| 563 | self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
||
| 564 | if header_id is None: |
||
| 565 | messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
| 566 | else: |
||
| 567 | messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
| 568 | response = self.do_db(self.get_messages, messages, room_id) |
||
| 569 | self.safe_write(response) |
||
| 570 | |||
| 571 | def get_offline_messages(self): |
||
| 572 | res = {} |
||
| 573 | offline_messages = Message.objects.filter( |
||
| 574 | id__gt=F('room__roomusers__last_read_message_id'), |
||
| 575 | deleted=False, |
||
| 576 | room__roomusers__user_id=self.user_id |
||
| 577 | ) |
||
| 578 | for message in offline_messages: |
||
| 579 | res.setdefault(message.room_id, []).append(self.create_message(message)) |
||
| 580 | return res |
||
| 581 | |||
| 582 | def get_users_in_current_user_rooms(self): |
||
| 583 | """ |
||
| 584 | { |
||
| 585 | "ROOM_ID:1": { |
||
| 586 | "name": "All", |
||
| 587 | "users": { |
||
| 588 | "USER_ID:admin": { |
||
| 589 | "name": "USER_NAME:admin", |
||
| 590 | "sex": "SEX:Secret" |
||
| 591 | }, |
||
| 592 | "USER_ID_2": { |
||
| 593 | "name": "USER_NAME:Mike", |
||
| 594 | "sex": "Male" |
||
| 595 | } |
||
| 596 | }, |
||
| 597 | "isPrivate": true |
||
| 598 | } |
||
| 599 | } |
||
| 600 | """ |
||
| 601 | user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name') |
||
| 602 | res = {room['id']: { |
||
| 603 | VarNames.ROOM_NAME: room['name'], |
||
| 604 | VarNames.ROOM_USERS: {} |
||
| 605 | } for room in user_rooms} |
||
| 606 | room_ids = (room_id for room_id in res) |
||
| 607 | rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id') |
||
| 608 | for user in rooms_users: |
||
| 609 | self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex']) |
||
| 610 | return res |
||
| 611 | |||
| 612 | def set_js_user_structure(self, user_dict, user_id, name, sex): |
||
| 613 | user_dict[user_id] = { |
||
| 614 | VarNames.USER: name, |
||
| 615 | VarNames.GENDER: GENDERS[sex] |
||
| 616 | } |
||
| 617 | |||
| 618 | def save_ip(self): |
||
| 619 | if (self.do_db(UserJoinedInfo.objects.filter( |
||
| 620 | Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)): |
||
| 621 | return |
||
| 622 | ip_address = self.get_or_create_ip() |
||
| 623 | UserJoinedInfo.objects.create( |
||
| 624 | ip=ip_address, |
||
| 625 | user_id=self.user_id |
||
| 626 | ) |
||
| 627 | |||
| 628 | def get_or_create_ip(self): |
||
| 629 | try: |
||
| 630 | ip_address = IpAddress.objects.get(ip=self.ip) |
||
| 631 | except IpAddress.DoesNotExist: |
||
| 632 | try: |
||
| 633 | if not api_url: |
||
| 634 | raise Exception('api url is absent') |
||
| 635 | self.logger.debug("Creating ip record %s", self.ip) |
||
| 636 | f = urlopen(api_url % self.ip) |
||
| 637 | raw_response = f.read().decode("utf-8") |
||
| 638 | response = json.loads(raw_response) |
||
| 639 | if response['status'] != "success": |
||
| 640 | raise Exception("Creating iprecord failed, server responded: %s" % raw_response) |
||
| 641 | ip_address = IpAddress.objects.create( |
||
| 642 | ip=self.ip, |
||
| 643 | isp=response['isp'], |
||
| 644 | country=response['country'], |
||
| 645 | region=response['regionName'], |
||
| 646 | city=response['city'], |
||
| 647 | country_code=response['countryCode'] |
||
| 648 | ) |
||
| 649 | except Exception as e: |
||
| 650 | self.logger.error("Error while creating ip with country info, because %s", e) |
||
| 651 | ip_address = IpAddress.objects.create(ip=self.ip) |
||
| 652 | return ip_address |
||
| 653 | |||
| 797 |