| Total Complexity | 46 |
| Total Lines | 246 |
| Duplicated Lines | 0 % |
Complex classes like MessagesHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | import json |
||
| 211 | class MessagesHandler(MessagesCreator): |
||
| 212 | |||
| 213 | def __init__(self, *args, **kwargs): |
||
| 214 | super(MessagesHandler, self).__init__(*args, **kwargs) |
||
| 215 | self.log_id = str(id(self) % 10000).rjust(4, '0') |
||
| 216 | self.ip = None |
||
| 217 | log_params = { |
||
| 218 | 'username': '00000000', |
||
| 219 | 'id': self.log_id, |
||
| 220 | 'ip': 'initializing' |
||
| 221 | } |
||
| 222 | self.logger = logging.LoggerAdapter(logger, log_params) |
||
| 223 | self.async_redis = tornadoredis.Client() |
||
| 224 | self.process_message = { |
||
| 225 | GET_MINE_USERNAME_EVENT: self.process_change_username, |
||
| 226 | GET_MESSAGES_EVENT: self.process_get_messages, |
||
| 227 | SEND_MESSAGE_EVENT: self.process_send_message, |
||
| 228 | } |
||
| 229 | |||
| 230 | def do_db(self, callback, *arg, **args): |
||
| 231 | try: |
||
| 232 | return callback(*arg, **args) |
||
| 233 | except (OperationalError, InterfaceError) as e: # Connection has gone away |
||
| 234 | self.logger.warning('%s, reconnecting' % e) # TODO |
||
| 235 | connection.close() |
||
| 236 | return callback( *arg, **args) |
||
| 237 | |||
| 238 | def get_online_from_redis(self, check_name=None, check_id=None): |
||
| 239 | """ |
||
| 240 | :rtype : dict |
||
| 241 | returns (dict, bool) if check_type is present |
||
| 242 | """ |
||
| 243 | online = sync_redis.hgetall(REDIS_ONLINE_USERS) |
||
| 244 | self.logger.debug('!! redis online: %s', online) |
||
| 245 | result = {} |
||
| 246 | user_is_online = False |
||
| 247 | # redis stores REDIS_USER_FORMAT, so parse them |
||
| 248 | if online: |
||
| 249 | for key, raw_user_sex in online.items(): # py2 iteritems |
||
| 250 | (name, sex, user_id) = raw_user_sex.decode('utf-8').split(':') |
||
| 251 | if name == check_name and check_id != int(key.decode('utf-8')): |
||
| 252 | user_is_online = True |
||
| 253 | result.update(self.online_js_structure(name, sex, user_id)) |
||
| 254 | if check_id: |
||
| 255 | return result, user_is_online |
||
| 256 | else: |
||
| 257 | return result |
||
| 258 | |||
| 259 | def add_online_user(self): |
||
| 260 | """ |
||
| 261 | adds to redis |
||
| 262 | online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
||
| 263 | :return: |
||
| 264 | """ |
||
| 265 | online = self.get_online_from_redis() |
||
| 266 | async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user) |
||
| 267 | first_tab = False |
||
| 268 | if self.sender_name not in online: # if a new tab has been opened |
||
| 269 | online.update(self.online_self_js_structure) |
||
| 270 | first_tab = True |
||
| 271 | |||
| 272 | if first_tab: # Login event, sent user names to all |
||
| 273 | online_user_names_mes = self.online_user_names(online, LOGIN_EVENT) |
||
| 274 | self.logger.info('!! First tab, sending refresh online for all') |
||
| 275 | self.publish(online_user_names_mes) |
||
| 276 | else: # Send user names to self |
||
| 277 | online_user_names_mes = self.online_user_names(online, REFRESH_USER_EVENT) |
||
| 278 | self.logger.info('!! Second tab, retrieving online for self') |
||
| 279 | self.safe_write(online_user_names_mes) |
||
| 280 | # send usernamechat |
||
| 281 | username_message = self.default(self.sender_name, GET_MINE_USERNAME_EVENT) |
||
| 282 | self.safe_write(username_message) |
||
| 283 | |||
| 284 | def set_username(self, session_key): |
||
| 285 | """ |
||
| 286 | Case registered: Fetch userName and its channels from database. returns them |
||
| 287 | Case anonymous: generates a new name and saves it to session. returns default channel |
||
| 288 | :return: channels user should subscribe |
||
| 289 | """ |
||
| 290 | session = SessionStore(session_key) |
||
| 291 | try: |
||
| 292 | self.user_id = int(session["_auth_user_id"]) |
||
| 293 | user_db = self.do_db(User.objects.get, id=self.user_id) # everything but 0 is a registered user |
||
| 294 | self.sender_name = user_db.username |
||
| 295 | self.sex = user_db.sex_str |
||
| 296 | rooms = user_db.rooms.all() # do_db is used already |
||
| 297 | room_names = {} |
||
| 298 | channels = [self.channel, ] |
||
| 299 | for room in rooms: |
||
| 300 | room_names[room.id] = room.name |
||
| 301 | channels.append(REDIS_ROOM_CHANNEL_PREFIX % room.id) |
||
| 302 | rooms_message = self.default(room_names, ROOMS_EVENT) |
||
| 303 | self.logger.info("!! User %s subscribes for %s", self.sender_name, room_names) |
||
| 304 | except (KeyError, User.DoesNotExist): |
||
| 305 | # Anonymous |
||
| 306 | self.sender_name = session.get(SESSION_USER_VAR_KEY) |
||
| 307 | if self.sender_name is None: |
||
| 308 | self.sender_name = id_generator(8) |
||
| 309 | session[SESSION_USER_VAR_KEY] = self.sender_name |
||
| 310 | session.save() |
||
| 311 | self.logger.info("!! A new user log in, created username %s", self.sender_name) |
||
| 312 | else: |
||
| 313 | self.logger.info("!! Anonymous with name %s has logged", self.sender_name) |
||
| 314 | channels = [ANONYMOUS_REDIS_CHANNEL, self.channel] |
||
| 315 | rooms_message = self.default(ANONYMOUS_ROOM_NAMES, ROOMS_EVENT) |
||
| 316 | finally: |
||
| 317 | self.safe_write(rooms_message) |
||
| 318 | return channels |
||
| 319 | |||
| 320 | def publish(self, message, channel=ANONYMOUS_REDIS_CHANNEL): |
||
| 321 | jsoned_mess = json.dumps(message) |
||
| 322 | self.logger.debug('<%s> %s', channel, jsoned_mess) |
||
| 323 | async_redis_publisher.publish(channel, jsoned_mess) |
||
| 324 | |||
| 325 | # TODO really parse every single message for 1 action? |
||
| 326 | def check_and_finish_change_name(self, message): |
||
| 327 | if self.sex == ANONYMOUS_GENDER: |
||
| 328 | parsed_message = json.loads(message) |
||
| 329 | if parsed_message[EVENT_VAR_NAME] == GET_MINE_USERNAME_EVENT: |
||
| 330 | self.async_redis.unsubscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name) # TODO is it allowed? |
||
| 331 | self.sender_name = parsed_message[CONTENT_VAR_NAME] |
||
| 332 | self.async_redis.subscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name) |
||
| 333 | async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user) |
||
| 334 | |||
| 335 | def new_message(self, message): |
||
| 336 | if type(message.body) is not int: # subscribe event |
||
| 337 | self.safe_write(message.body) |
||
| 338 | self.check_and_finish_change_name(message.body) |
||
| 339 | |||
| 340 | def safe_write(self, message): |
||
| 341 | raise NotImplementedError('WebSocketHandler implements') |
||
| 342 | |||
| 343 | def process_send_message(self, message): |
||
| 344 | """ |
||
| 345 | :type message: dict |
||
| 346 | """ |
||
| 347 | content = message[CONTENT_VAR_NAME] |
||
| 348 | receiver_id = message.get(RECEIVER_USERID_VAR_NAME) # if receiver_id is None then its a private message |
||
| 349 | receiver_name = message.get(RECEIVER_USERNAME_VAR_NAME) |
||
| 350 | self.logger.info('!! Sending message %s to username:%s, id:%s', content, receiver_name, receiver_id) |
||
| 351 | save_to_db = True |
||
| 352 | if receiver_id is not None and receiver_id != 0: |
||
| 353 | receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id |
||
| 354 | elif receiver_name is not None: |
||
| 355 | receiver_channel = REDIS_USERNAME_CHANNEL_PREFIX % receiver_name |
||
| 356 | save_to_db = False |
||
| 357 | |||
| 358 | if self.user_id != 0 and save_to_db: |
||
| 359 | self.logger.debug('!! Saving it to db') |
||
| 360 | message_db = Message(sender_id=self.user_id, content=content, receiver_id=receiver_id) |
||
| 361 | self.do_db(message_db.save) # exit on hacked id with exception |
||
| 362 | prepared_message = self.create_send_message(message_db) |
||
| 363 | else: |
||
| 364 | self.logger.debug('!! NOT saving it') |
||
| 365 | prepared_message = self.send_anonymous(content, receiver_name, receiver_id) |
||
| 366 | |||
| 367 | if receiver_id is None: |
||
| 368 | self.logger.debug('!! Detected as public') |
||
| 369 | self.publish(prepared_message) |
||
| 370 | else: |
||
| 371 | self.publish(prepared_message, self.channel) |
||
| 372 | self.logger.debug('!! Detected as private, channel %s', receiver_channel) |
||
| 373 | if receiver_channel != self.channel: |
||
| 374 | self.publish(prepared_message, receiver_channel) |
||
| 375 | |||
| 376 | def process_change_username(self, message): |
||
| 377 | """ |
||
| 378 | :type message: dict |
||
| 379 | """ |
||
| 380 | self.logger.info('!! Changing username to %s', message[CONTENT_VAR_NAME]) |
||
| 381 | new_username = message[CONTENT_VAR_NAME] |
||
| 382 | try: |
||
| 383 | check_user(new_username) |
||
| 384 | online = self.get_online_from_redis() |
||
| 385 | if new_username in online: |
||
| 386 | self.logger.info('!! This name is already used') |
||
| 387 | raise ValidationError('Anonymous already has this name') |
||
| 388 | session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
||
| 389 | session = SessionStore(session_key) |
||
| 390 | session[SESSION_USER_VAR_KEY] = new_username |
||
| 391 | session.save() |
||
| 392 | |||
| 393 | del online[self.sender_name] |
||
| 394 | old_name = self.sender_name |
||
| 395 | old_channel = self.channel |
||
| 396 | self.sender_name = new_username # change_user_name required new_username in sender_name |
||
| 397 | online.update(self.online_self_js_structure) |
||
| 398 | message_all = self.change_user_nickname(old_name, online) |
||
| 399 | message_me = self.default(new_username, GET_MINE_USERNAME_EVENT) |
||
| 400 | # TODO perform ton of checks or emit twice ? |
||
| 401 | self.publish(message_me, self.channel) # to new user channel |
||
| 402 | self.publish(message_me, old_channel) # to old user channel |
||
| 403 | self.publish(message_all) |
||
| 404 | except ValidationError as e: |
||
| 405 | self.safe_write(self.default(str(e.message))) |
||
| 406 | |||
| 407 | def process_get_messages(self, data): |
||
| 408 | """ |
||
| 409 | :type data: dict |
||
| 410 | """ |
||
| 411 | header_id = data.get(HEADER_ID_VAR_NAME, None) |
||
| 412 | count = int(data.get(COUNT_VAR_NAME, 10)) |
||
| 413 | self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
||
| 414 | if header_id is None: |
||
| 415 | messages = Message.objects.filter( |
||
| 416 | Q(receiver=None) # Only public |
||
| 417 | | Q(sender=self.user_id) # private s |
||
| 418 | | Q(receiver=self.user_id) # and private |
||
| 419 | ).order_by('-pk')[:count] |
||
| 420 | else: |
||
| 421 | messages = Message.objects.filter( |
||
| 422 | Q(id__lt=header_id), |
||
| 423 | Q(receiver=None) |
||
| 424 | | Q(sender=self.user_id) |
||
| 425 | | Q(receiver=self.user_id) |
||
| 426 | ).order_by('-pk')[:count] |
||
| 427 | response = self.do_db(self.get_messages, messages) |
||
| 428 | self.safe_write(response) |
||
| 429 | |||
| 430 | def save_ip(self): |
||
| 431 | user_id = None if self.user_id == 0 else self.user_id |
||
| 432 | anon_name = self.sender_name if self.user_id == 0 else None |
||
| 433 | if ((user_id and self.do_db(IpAddress.objects.filter(user_id=user_id, ip=self.ip).exists)) or |
||
| 434 | # << ip record exist for existed user, ip record exist for anonymous >> |
||
| 435 | (anon_name and self.do_db(IpAddress.objects.filter(anon_name=anon_name, ip=self.ip).exists))): |
||
| 436 | return |
||
| 437 | try: |
||
| 438 | if not api_url: |
||
| 439 | raise Exception('api url is absent') |
||
| 440 | self.logger.debug("Creating ip record %s", self.ip) |
||
| 441 | f = urlopen(api_url % self.ip) |
||
| 442 | raw_response = f.read().decode("utf-8") |
||
| 443 | response = json.loads(raw_response) |
||
| 444 | if response['status'] != "success": |
||
| 445 | raise Exception("Creating iprecord failed, server responded: %s" % raw_response) |
||
| 446 | IpAddress.objects.create( |
||
| 447 | isp=response['isp'], |
||
| 448 | country=response['country'], |
||
| 449 | region=response['regionName'], |
||
| 450 | city=response['city'], |
||
| 451 | user_id=user_id, |
||
| 452 | anon_name=anon_name, |
||
| 453 | ip=self.ip) |
||
| 454 | except Exception as e: |
||
| 455 | self.logger.error("Error while creating ip with country info, because %s", e) |
||
| 456 | IpAddress.objects.create(user_id=user_id, ip=self.ip, anon_name=anon_name) |
||
| 457 | |||
| 594 |