Total Complexity | 78 |
Total Lines | 410 |
Duplicated Lines | 4.88 % |
Changes | 16 | ||
Bugs | 2 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like MessagesHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import json |
||
248 | class MessagesHandler(MessagesCreator): |
||
249 | |||
250 | def __init__(self, *args, **kwargs): |
||
251 | self.parsable_prefix = 'p' |
||
252 | super(MessagesHandler, self).__init__(*args, **kwargs) |
||
253 | self.id = id(self) |
||
254 | self.log_id = str(self.id % 10000).rjust(4, '0') |
||
255 | self.ip = None |
||
256 | from chat import global_redis |
||
257 | self.async_redis_publisher = global_redis.async_redis_publisher |
||
258 | self.sync_redis = global_redis.sync_redis |
||
259 | self.channels = [] |
||
260 | self.call_receiver_channel = None |
||
261 | self._logger = None |
||
262 | self.async_redis = tornadoredis.Client() |
||
263 | self.pre_process_message = { |
||
264 | Actions.GET_MESSAGES: self.process_get_messages, |
||
265 | Actions.SEND_MESSAGE: self.process_send_message, |
||
266 | Actions.CALL: self.process_call, |
||
267 | Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel, |
||
268 | Actions.DELETE_ROOM: self.delete_channel, |
||
269 | Actions.EDIT_MESSAGE: self.edit_message, |
||
270 | Actions.CREATE_ROOM_CHANNEL: self.create_new_room, |
||
271 | Actions.INVITE_USER: self.invite_user, |
||
272 | } |
||
273 | self.post_process_message = { |
||
274 | Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel, |
||
275 | Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel, |
||
276 | Actions.DELETE_ROOM: self.send_client_delete_channel, |
||
277 | Actions.INVITE_USER: self.send_client_new_channel, |
||
278 | Actions.CALL: self.set_opponent_call_channel |
||
279 | } |
||
280 | |||
281 | @tornado.gen.engine |
||
282 | def listen(self, channels): |
||
283 | yield tornado.gen.Task( |
||
284 | self.async_redis.subscribe, channels) |
||
285 | self.async_redis.listen(self.new_message) |
||
286 | |||
287 | @property |
||
288 | def logger(self): |
||
289 | return self._logger if self._logger else base_logger |
||
290 | |||
291 | @tornado.gen.engine |
||
292 | def add_channel(self, channel): |
||
293 | self.channels.append(channel) |
||
294 | yield tornado.gen.Task( |
||
295 | self.async_redis.subscribe, (channel,)) |
||
296 | |||
297 | def do_db(self, callback, *args, **kwargs): |
||
298 | try: |
||
299 | return callback(*args, **kwargs) |
||
300 | except (OperationalError, InterfaceError) as e: # Connection has gone away |
||
301 | self.logger.warning('%s, reconnecting' % e) # TODO |
||
302 | connection.close() |
||
303 | return callback(*args, **kwargs) |
||
304 | |||
305 | def execute_query(self, query, *args, **kwargs): |
||
306 | cursor = connection.cursor() |
||
307 | cursor.execute(query, *args, **kwargs) |
||
308 | return cursor.fetchall() |
||
309 | |||
310 | def get_online_from_redis(self, channel, check_user_id=None, check_hash=None): |
||
311 | """ |
||
312 | :rtype : dict |
||
313 | returns (dict, bool) if check_type is present |
||
314 | """ |
||
315 | online = self.sync_redis.hgetall(channel) |
||
316 | self.logger.debug('!! channel %s redis online: %s', channel, online) |
||
317 | result = set() |
||
318 | user_is_online = False |
||
319 | # redis stores REDIS_USER_FORMAT, so parse them |
||
320 | if online: |
||
321 | for key_hash, raw_user_id in online.items(): # py2 iteritems |
||
322 | user_id = int(raw_user_id.decode('utf-8')) |
||
323 | if user_id == check_user_id and check_hash != int(key_hash.decode('utf-8')): |
||
324 | user_is_online = True |
||
325 | result.add(user_id) |
||
326 | result = list(result) |
||
327 | return (result, user_is_online) if check_user_id else result |
||
328 | |||
329 | def add_online_user(self, room_id, offline_messages=None): |
||
330 | """ |
||
331 | adds to redis |
||
332 | online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
||
333 | :return: |
||
334 | """ |
||
335 | self.async_redis_publisher.hset(room_id, self.id, self.stored_redis_user) |
||
336 | # since we add user to online first, latest trigger will always show correct online |
||
337 | online, is_online = self.get_online_from_redis(room_id, self.user_id, self.id) |
||
338 | if not is_online: # if a new tab has been opened |
||
339 | online.append(self.user_id) |
||
340 | online_user_names_mes = self.room_online( |
||
341 | online, |
||
342 | Actions.LOGIN, |
||
343 | room_id |
||
344 | ) |
||
345 | self.logger.info('!! First tab, sending refresh online for all') |
||
346 | self.publish(online_user_names_mes, room_id) |
||
347 | if offline_messages: |
||
348 | self.safe_write(self.load_offline_message(offline_messages, room_id)) |
||
349 | else: # Send user names to self |
||
350 | online_user_names_mes = self.room_online( |
||
351 | online, |
||
352 | Actions.REFRESH_USER, |
||
353 | room_id |
||
354 | ) |
||
355 | self.logger.info('!! Second tab, retrieving online for self') |
||
356 | self.safe_write(online_user_names_mes) |
||
357 | |||
358 | def publish(self, message, channel, parsable=False): |
||
359 | jsoned_mess = json.dumps(message) |
||
360 | self.logger.debug('<%s> %s', channel, jsoned_mess) |
||
361 | if parsable: |
||
362 | jsoned_mess = self.encode(jsoned_mess) |
||
363 | self.async_redis_publisher.publish(channel, jsoned_mess) |
||
364 | |||
365 | def encode(self, message): |
||
366 | """ |
||
367 | Marks message with prefix to specify that |
||
368 | it should be decoded and proccesed before sending to client |
||
369 | @param message: message to mark |
||
370 | @return: marked message |
||
371 | """ |
||
372 | return self.parsable_prefix + message |
||
373 | |||
374 | def decode(self, message): |
||
375 | """ |
||
376 | Check if message should be proccessed by server before writing to client |
||
377 | @param message: message to check |
||
378 | @type message: str |
||
379 | @return: Object structure of message if it should be processed, None if not |
||
380 | """ |
||
381 | if message.startswith(self.parsable_prefix): |
||
382 | return json.loads(message[1:]) |
||
383 | |||
384 | def new_message(self, message): |
||
385 | data = message.body |
||
386 | if isinstance(data, str_type): # subscribe event |
||
387 | decoded = self.decode(data) |
||
388 | if decoded: |
||
389 | data = decoded |
||
390 | self.safe_write(data) |
||
391 | if decoded: |
||
392 | self.post_process_message[decoded[VarNames.EVENT]](decoded) |
||
393 | |||
394 | def safe_write(self, message): |
||
395 | raise NotImplementedError('WebSocketHandler implements') |
||
396 | |||
397 | def process_send_message(self, message): |
||
398 | """ |
||
399 | :type message: dict |
||
400 | """ |
||
401 | channel = message[VarNames.CHANNEL] |
||
402 | message_db = Message( |
||
403 | sender_id=self.user_id, |
||
404 | content=message[VarNames.CONTENT] |
||
405 | ) |
||
406 | message_db.room_id = channel |
||
407 | if VarNames.IMG in message: |
||
408 | message_db.img = extract_photo(message[VarNames.IMG]) |
||
409 | self.do_db(message_db.save) # exit on hacked id with exception |
||
410 | prepared_message = self.create_send_message(message_db) |
||
411 | self.publish(prepared_message, channel) |
||
412 | |||
413 | def process_call(self, in_message): |
||
414 | """ |
||
415 | :type in_message: dict |
||
416 | """ |
||
417 | call_type = in_message.get(VarNames.CALL_TYPE) |
||
418 | set_opponent_channel = False |
||
419 | out_message = self.offer_call(in_message.get(VarNames.CONTENT), call_type) |
||
420 | if call_type == CallType.OFFER: |
||
421 | room_id = in_message[VarNames.CHANNEL] |
||
422 | user = User.rooms.through.objects.get(~Q(user_id=self.user_id), Q(room_id=room_id), Q(room__name__isnull=True)) |
||
423 | self.call_receiver_channel = RedisPrefix.generate_user(user.user_id) |
||
424 | set_opponent_channel = True |
||
425 | out_message[VarNames.CHANNEL] = room_id |
||
426 | # TODO |
||
427 | self.logger.info('!! Offering a call to user with id %s', self.call_receiver_channel) |
||
428 | self.publish(out_message, self.call_receiver_channel, set_opponent_channel) |
||
429 | |||
430 | def create_new_room(self, message): |
||
431 | room_name = message[VarNames.ROOM_NAME] |
||
432 | if not room_name or len(room_name) > 16: |
||
433 | raise ValidationError('Incorrect room name "{}"'.format(room_name)) |
||
434 | room = Room(name=room_name) |
||
435 | self.do_db(room.save) |
||
436 | RoomUsers(room_id=room.id, user_id=self.user_id).save() |
||
437 | subscribe_message = self.subscribe_room_channel_message(room.id, room_name) |
||
438 | self.publish(subscribe_message, self.channel, True) |
||
439 | |||
440 | def invite_user(self, message): |
||
441 | room_id = message[VarNames.ROOM_ID] |
||
442 | user_id = message[VarNames.USER_ID] |
||
443 | if room_id not in self.channels: |
||
444 | raise ValidationError("Access denied, only allowed for channels {}".format(self.channels)) |
||
445 | room = self.do_db(Room.objects.get, id=room_id) |
||
446 | if room.is_private: |
||
447 | raise ValidationError("You can't add users to direct room, create a new room instead") |
||
448 | try: |
||
449 | Room.users.through.objects.create(room_id=room_id, user_id=user_id) |
||
450 | except IntegrityError: |
||
451 | raise ValidationError("User is already in channel") |
||
452 | users_in_room = {} |
||
453 | for user in room.users.all(): |
||
454 | self.set_js_user_structure(users_in_room, user.id, user.username, user.sex) |
||
455 | self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id) |
||
456 | subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room) |
||
457 | View Code Duplication | self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True) |
|
|
|||
458 | |||
459 | def create_self_room(self, user_rooms): |
||
460 | rooms_ids = list([room['room_id'] for room in user_rooms]) |
||
461 | query_res = self.execute_query(SELECT_SELF_ROOM, [rooms_ids,]) |
||
462 | if len(query_res) > 0: |
||
463 | room = query_res[0] |
||
464 | room_id = room[0] |
||
465 | self.update_room(room_id, room[1]) |
||
466 | else: |
||
467 | room = Room() |
||
468 | room.save() |
||
469 | room_id = room.id |
||
470 | View Code Duplication | RoomUsers(user_id=self.user_id, room_id=room_id).save() |
|
471 | return room_id |
||
472 | |||
473 | def create_other_room(self, user_rooms, user_id): |
||
474 | query_res = Room.users.through.objects.filter(user_id=user_id, room__in=user_rooms).values('room__id', 'room__disabled') |
||
475 | if len(query_res) > 0: |
||
476 | room = query_res[0] |
||
477 | room_id = room['room__id'] |
||
478 | self.update_room(room_id, room['room__disabled']) |
||
479 | else: |
||
480 | room = Room() |
||
481 | room.save() |
||
482 | room_id = room.id |
||
483 | RoomUsers.objects.bulk_create([ |
||
484 | RoomUsers(user_id=user_id, room_id=room_id), |
||
485 | RoomUsers(user_id=self.user_id, room_id=room_id), |
||
486 | ]) |
||
487 | return room_id |
||
488 | |||
489 | def update_room(self, room_id, disabled): |
||
490 | if not disabled: |
||
491 | raise ValidationError('This room already exist') |
||
492 | else: |
||
493 | Room.objects.filter(id=room_id).update(disabled=False) |
||
494 | |||
495 | def create_user_channel(self, message): |
||
496 | user_id = message[VarNames.USER_ID] |
||
497 | # get all self private rooms ids |
||
498 | user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id') |
||
499 | # get private room that contains another user from rooms above |
||
500 | if self.user_id == user_id: |
||
501 | room_id = self.create_self_room(user_rooms) |
||
502 | else: |
||
503 | room_id = self.create_other_room(user_rooms, user_id) |
||
504 | subscribe_message = self.subscribe_direct_channel_message(room_id, user_id) |
||
505 | self.publish(subscribe_message, self.channel, True) |
||
506 | other_channel = RedisPrefix.generate_user(user_id) |
||
507 | if self.channel != other_channel: |
||
508 | self.publish(subscribe_message, other_channel, True) |
||
509 | |||
510 | def delete_channel(self, message): |
||
511 | room_id = message[VarNames.ROOM_ID] |
||
512 | if room_id not in self.channels or room_id == ALL_ROOM_ID: |
||
513 | raise ValidationError('You are not allowed to exit this room') |
||
514 | room = self.do_db(Room.objects.get, id=room_id) |
||
515 | if room.disabled: |
||
516 | raise ValidationError('Room is already deleted') |
||
517 | if room.name is None: # if private then disable |
||
518 | room.disabled = True |
||
519 | else: # if public -> leave the room, delete the link |
||
520 | RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete() |
||
521 | online = self.get_online_from_redis(room_id) |
||
522 | online.remove(self.user_id) |
||
523 | self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id) |
||
524 | room.save() |
||
525 | message = self.unsubscribe_direct_message(room_id) |
||
526 | self.publish(message, room_id, True) |
||
527 | |||
528 | def edit_message(self, data): |
||
529 | message_id = data[VarNames.MESSAGE_ID] |
||
530 | message = Message.objects.get(id=message_id) |
||
531 | if message.sender_id != self.user_id: |
||
532 | raise ValidationError("You can only edit your messages") |
||
533 | if message.time + 60000 < get_milliseconds(): |
||
534 | raise ValidationError("You can only edit messages that were send not more than 1 min ago") |
||
535 | if message.deleted: |
||
536 | raise ValidationError("Already deleted") |
||
537 | message.content = data[VarNames.CONTENT] |
||
538 | selector = Message.objects.filter(id=message_id) |
||
539 | if message.content is None: |
||
540 | selector.update(deleted=True) |
||
541 | action = Actions.DELETE_MESSAGE |
||
542 | else: |
||
543 | action = Actions.EDIT_MESSAGE |
||
544 | selector.update(content=message.content) |
||
545 | self.publish(self.create_send_message(message, action), message.room_id) |
||
546 | |||
547 | def send_client_new_channel(self, message): |
||
548 | room_id = message[VarNames.ROOM_ID] |
||
549 | self.add_channel(room_id) |
||
550 | self.add_online_user(room_id) |
||
551 | |||
552 | def set_opponent_call_channel(self, message): |
||
553 | self.call_receiver_channel = RedisPrefix.generate_user(message[VarNames.USER_ID]) |
||
554 | |||
555 | def send_client_delete_channel(self, message): |
||
556 | room_id = message[VarNames.ROOM_ID] |
||
557 | self.async_redis.unsubscribe((room_id,)) |
||
558 | self.async_redis_publisher.hdel(room_id, self.id) |
||
559 | self.channels.remove(room_id) |
||
560 | |||
561 | def process_get_messages(self, data): |
||
562 | """ |
||
563 | :type data: dict |
||
564 | """ |
||
565 | header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None) |
||
566 | count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10)) |
||
567 | room_id = data[VarNames.CHANNEL] |
||
568 | self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
||
569 | if header_id is None: |
||
570 | messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
571 | else: |
||
572 | messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
573 | response = self.do_db(self.get_messages, messages, room_id) |
||
574 | self.safe_write(response) |
||
575 | |||
576 | def get_offline_messages(self): |
||
577 | res = {} |
||
578 | offline_messages = Message.objects.filter( |
||
579 | id__gt=F('room__roomusers__last_read_message_id'), |
||
580 | deleted=False, |
||
581 | room__roomusers__user_id=self.user_id |
||
582 | ) |
||
583 | for message in offline_messages: |
||
584 | res.setdefault(message.room_id, []).append(self.create_message(message)) |
||
585 | return res |
||
586 | |||
587 | def get_users_in_current_user_rooms(self): |
||
588 | """ |
||
589 | { |
||
590 | "ROOM_ID:1": { |
||
591 | "name": "All", |
||
592 | "users": { |
||
593 | "USER_ID:admin": { |
||
594 | "name": "USER_NAME:admin", |
||
595 | "sex": "SEX:Secret" |
||
596 | }, |
||
597 | "USER_ID_2": { |
||
598 | "name": "USER_NAME:Mike", |
||
599 | "sex": "Male" |
||
600 | } |
||
601 | }, |
||
602 | "isPrivate": true |
||
603 | } |
||
604 | } |
||
605 | """ |
||
606 | user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name') |
||
607 | res = {room['id']: { |
||
608 | VarNames.ROOM_NAME: room['name'], |
||
609 | VarNames.ROOM_USERS: {} |
||
610 | } for room in user_rooms} |
||
611 | room_ids = (room_id for room_id in res) |
||
612 | rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id') |
||
613 | for user in rooms_users: |
||
614 | self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex']) |
||
615 | return res |
||
616 | |||
617 | def set_js_user_structure(self, user_dict, user_id, name, sex): |
||
618 | user_dict[user_id] = { |
||
619 | VarNames.USER: name, |
||
620 | VarNames.GENDER: GENDERS[sex] |
||
621 | } |
||
622 | |||
623 | def save_ip(self): |
||
624 | if (self.do_db(UserJoinedInfo.objects.filter( |
||
625 | Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)): |
||
626 | return |
||
627 | ip_address = self.get_or_create_ip() |
||
628 | UserJoinedInfo.objects.create( |
||
629 | ip=ip_address, |
||
630 | user_id=self.user_id |
||
631 | ) |
||
632 | |||
633 | def get_or_create_ip(self): |
||
634 | try: |
||
635 | ip_address = IpAddress.objects.get(ip=self.ip) |
||
636 | except IpAddress.DoesNotExist: |
||
637 | try: |
||
638 | if not api_url: |
||
639 | raise Exception('api url is absent') |
||
640 | self.logger.debug("Creating ip record %s", self.ip) |
||
641 | f = urlopen(api_url % self.ip) |
||
642 | raw_response = f.read().decode("utf-8") |
||
643 | response = json.loads(raw_response) |
||
644 | if response['status'] != "success": |
||
645 | raise Exception("Creating iprecord failed, server responded: %s" % raw_response) |
||
646 | ip_address = IpAddress.objects.create( |
||
647 | ip=self.ip, |
||
648 | isp=response['isp'], |
||
649 | country=response['country'], |
||
650 | region=response['regionName'], |
||
651 | city=response['city'], |
||
652 | country_code=response['countryCode'] |
||
653 | ) |
||
654 | except Exception as e: |
||
655 | self.logger.error("Error while creating ip with country info, because %s", e) |
||
656 | ip_address = IpAddress.objects.create(ip=self.ip) |
||
657 | return ip_address |
||
658 | |||
802 |