Total Complexity | 51 |
Total Lines | 267 |
Duplicated Lines | 0 % |
Complex classes like MessagesHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import json |
||
221 | class MessagesHandler(MessagesCreator): |
||
222 | |||
223 | def __init__(self, *args, **kwargs): |
||
224 | super(MessagesHandler, self).__init__(*args, **kwargs) |
||
225 | self.log_id = str(id(self) % 10000).rjust(4, '0') |
||
226 | self.ip = None |
||
227 | log_params = { |
||
228 | 'username': '00000000', |
||
229 | 'id': self.log_id, |
||
230 | 'ip': 'initializing' |
||
231 | } |
||
232 | self.logger = logging.LoggerAdapter(logger, log_params) |
||
233 | self.async_redis = tornadoredis.Client() |
||
234 | self.process_message = { |
||
235 | GET_MINE_USERNAME_EVENT: self.process_change_username, |
||
236 | GET_MESSAGES_EVENT: self.process_get_messages, |
||
237 | SEND_MESSAGE_EVENT: self.process_send_message, |
||
238 | CALL_EVENT: self.process_call |
||
239 | } |
||
240 | |||
241 | def do_db(self, callback, *arg, **args): |
||
242 | try: |
||
243 | return callback(*arg, **args) |
||
244 | except (OperationalError, InterfaceError) as e: # Connection has gone away |
||
245 | self.logger.warning('%s, reconnecting' % e) # TODO |
||
246 | connection.close() |
||
247 | return callback(*arg, **args) |
||
248 | |||
249 | def get_online_from_redis(self, check_name=None, check_id=None): |
||
250 | """ |
||
251 | :rtype : dict |
||
252 | returns (dict, bool) if check_type is present |
||
253 | """ |
||
254 | online = self.sync_redis.hgetall(REDIS_ONLINE_USERS) |
||
255 | self.logger.debug('!! redis online: %s', online) |
||
256 | result = {} |
||
257 | user_is_online = False |
||
258 | # redis stores REDIS_USER_FORMAT, so parse them |
||
259 | if online: |
||
260 | for key, raw_user_sex in online.items(): # py2 iteritems |
||
261 | (name, sex, user_id) = raw_user_sex.decode('utf-8').split(':') |
||
262 | if name == check_name and check_id != int(key.decode('utf-8')): |
||
263 | user_is_online = True |
||
264 | result.update(self.online_js_structure(name, sex, user_id)) |
||
265 | if check_id: |
||
266 | return result, user_is_online |
||
267 | else: |
||
268 | return result |
||
269 | |||
270 | def add_online_user(self): |
||
271 | """ |
||
272 | adds to redis |
||
273 | online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
||
274 | :return: |
||
275 | """ |
||
276 | online = self.get_online_from_redis() |
||
277 | self.async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user) |
||
278 | if self.sender_name not in online: # if a new tab has been opened |
||
279 | online.update(self.online_self_js_structure) |
||
280 | online_user_names_mes = self.online_user_names(online, LOGIN_EVENT) |
||
281 | self.logger.info('!! First tab, sending refresh online for all') |
||
282 | self.publish(online_user_names_mes) |
||
283 | else: # Send user names to self |
||
284 | online_user_names_mes = self.online_user_names(online, REFRESH_USER_EVENT) |
||
285 | self.logger.info('!! Second tab, retrieving online for self') |
||
286 | self.safe_write(online_user_names_mes) |
||
287 | # send usernamechat |
||
288 | username_message = self.default(self.sender_name, GET_MINE_USERNAME_EVENT) |
||
289 | self.safe_write(username_message) |
||
290 | |||
291 | def set_username(self, session_key): |
||
292 | """ |
||
293 | Case registered: Fetch userName and its channels from database. returns them |
||
294 | Case anonymous: generates a new name and saves it to session. returns default channel |
||
295 | :return: channels user should subscribe |
||
296 | """ |
||
297 | session = SessionStore(session_key) |
||
298 | try: |
||
299 | self.user_id = int(session["_auth_user_id"]) |
||
300 | user_db = self.do_db(User.objects.get, id=self.user_id) # everything but 0 is a registered user |
||
301 | self.sender_name = user_db.username |
||
302 | self.sex = user_db.sex_str |
||
303 | rooms = user_db.rooms.all() # do_db is used already |
||
304 | room_names = {} |
||
305 | channels = [self.channel, ] |
||
306 | for room in rooms: |
||
307 | room_names[room.id] = room.name |
||
308 | channels.append(REDIS_ROOM_CHANNEL_PREFIX % room.id) |
||
309 | rooms_message = self.default(room_names, ROOMS_EVENT) |
||
310 | self.logger.info("!! User %s subscribes for %s", self.sender_name, room_names) |
||
311 | except (KeyError, User.DoesNotExist): |
||
312 | # Anonymous |
||
313 | self.sender_name = session.get(SESSION_USER_VAR_KEY) |
||
314 | if self.sender_name is None: |
||
315 | self.sender_name = id_generator(8) |
||
316 | session[SESSION_USER_VAR_KEY] = self.sender_name |
||
317 | session.save() |
||
318 | self.logger.info("!! A new user log in, created username %s", self.sender_name) |
||
319 | else: |
||
320 | self.logger.info("!! Anonymous with name %s has logged", self.sender_name) |
||
321 | channels = [ANONYMOUS_REDIS_CHANNEL, self.channel] |
||
322 | rooms_message = self.default(ANONYMOUS_ROOM_NAMES, ROOMS_EVENT) |
||
323 | self.safe_write(rooms_message) |
||
324 | return channels |
||
325 | |||
326 | def publish(self, message, channel=ANONYMOUS_REDIS_CHANNEL): |
||
327 | jsoned_mess = json.dumps(message) |
||
328 | self.logger.debug('<%s> %s', channel, jsoned_mess) |
||
329 | self.async_redis_publisher.publish(channel, jsoned_mess) |
||
330 | |||
331 | # TODO really parse every single message for 1 action? |
||
332 | def check_and_finish_change_name(self, message): |
||
333 | if self.sex == ANONYMOUS_GENDER: |
||
334 | parsed_message = json.loads(message) |
||
335 | if parsed_message[EVENT_VAR_NAME] == CHANGE_ANONYMOUS_NAME_EVENT\ |
||
336 | and parsed_message[OLD_NAME_VAR_NAME] == self.sender_name: |
||
337 | self.async_redis.unsubscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name) |
||
338 | self.sender_name = parsed_message[USER_VAR_NAME] |
||
339 | self.async_redis.subscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name) |
||
340 | self.async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user) |
||
341 | |||
342 | def new_message(self, message): |
||
343 | if type(message.body) is not int: # subscribe event |
||
344 | self.safe_write(message.body) |
||
345 | self.check_and_finish_change_name(message.body) |
||
346 | |||
347 | def safe_write(self, message): |
||
348 | raise NotImplementedError('WebSocketHandler implements') |
||
349 | |||
350 | def process_send_message(self, message): |
||
351 | """ |
||
352 | :type message: dict |
||
353 | """ |
||
354 | content = message[CONTENT_VAR_NAME] |
||
355 | receiver_id = message.get(RECEIVER_USERID_VAR_NAME) # if receiver_id is None then its a private message |
||
356 | receiver_name = message.get(RECEIVER_USERNAME_VAR_NAME) |
||
357 | self.logger.info('!! Sending message %s to username:%s, id:%s', content, receiver_name, receiver_id) |
||
358 | save_to_db = True |
||
359 | receiver_channel = None # public by default |
||
360 | if receiver_id is not None and receiver_id != 0: |
||
361 | receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id |
||
362 | elif receiver_name is not None: |
||
363 | receiver_channel = REDIS_USERNAME_CHANNEL_PREFIX % receiver_name |
||
364 | save_to_db = False |
||
365 | self.publish_message(content, receiver_channel, receiver_id, receiver_name, save_to_db) |
||
366 | |||
367 | def process_call(self, message): |
||
368 | """ |
||
369 | :type message: dict |
||
370 | """ |
||
371 | # TODO refactor this, duplicate code with process_webrtc and send message |
||
372 | receiver_id = message.get(RECEIVER_USERID_VAR_NAME) # if receiver_id is None then its a private message |
||
373 | receiver_name = message.get(RECEIVER_USERNAME_VAR_NAME) |
||
374 | cal_type = message.get(CALL_TYPE_VAR_NAME) |
||
375 | self.logger.info('!! Offering a call to username:%s, id:%s', receiver_name, receiver_id) |
||
376 | receiver_channel = None # public by default |
||
377 | if receiver_id is not None and receiver_id != 0: |
||
378 | receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id |
||
379 | elif receiver_name is not None: |
||
380 | receiver_channel = REDIS_USERNAME_CHANNEL_PREFIX % receiver_name |
||
381 | self.publish(self.offer_call(message.get(CONTENT_VAR_NAME), message.get(CALL_TYPE_VAR_NAME)), receiver_channel) |
||
382 | |||
383 | def publish_message(self, content, receiver_channel, receiver_id, receiver_name, save_to_db): |
||
384 | if self.user_id != 0 and save_to_db: |
||
385 | self.logger.debug('!! Saving it to db') |
||
386 | message_db = Message(sender_id=self.user_id, content=content, receiver_id=receiver_id) |
||
387 | self.do_db(message_db.save) # exit on hacked id with exception |
||
388 | prepared_message = self.create_send_message(message_db) |
||
389 | else: |
||
390 | self.logger.debug('!! NOT saving it') |
||
391 | prepared_message = self.send_anonymous(content, receiver_name, receiver_id) |
||
392 | if receiver_id is None: |
||
393 | self.logger.debug('!! Detected as public') |
||
394 | self.publish(prepared_message) |
||
395 | else: |
||
396 | self.publish(prepared_message, self.channel) |
||
397 | self.logger.debug('!! Detected as private, channel %s', receiver_channel) |
||
398 | if receiver_channel != self.channel: |
||
399 | self.publish(prepared_message, receiver_channel) |
||
400 | |||
401 | def process_change_username(self, message): |
||
402 | """ |
||
403 | :type message: dict |
||
404 | """ |
||
405 | self.logger.info('!! Changing username to %s', message[CONTENT_VAR_NAME]) |
||
406 | new_username = message[CONTENT_VAR_NAME] |
||
407 | try: |
||
408 | self.do_db(check_user, new_username) |
||
409 | online = self.get_online_from_redis() |
||
410 | if new_username in online: |
||
411 | self.logger.info('!! This name is already used') |
||
412 | raise ValidationError('This name is already used by another anonymous!') |
||
413 | session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
||
414 | session = SessionStore(session_key) |
||
415 | session[SESSION_USER_VAR_KEY] = new_username |
||
416 | session.save() |
||
417 | try: |
||
418 | del online[self.sender_name] |
||
419 | except KeyError: # if two or more change_username events in fast time |
||
420 | pass |
||
421 | old_username = self.sender_name |
||
422 | # temporary set username for creating messages with self.online_self_js_structure, change_user_nickname |
||
423 | self.sender_name = new_username |
||
424 | online.update(self.online_self_js_structure) |
||
425 | message_all = self.change_user_nickname(old_username, online) |
||
426 | self.sender_name = old_username # see check_and_finish_change_name |
||
427 | self.publish(message_all) |
||
428 | except ValidationError as e: |
||
429 | self.safe_write(self.default(str(e.message), event=GROWL_MESSAGE_EVENT)) |
||
430 | |||
431 | def process_get_messages(self, data): |
||
432 | """ |
||
433 | :type data: dict |
||
434 | """ |
||
435 | header_id = data.get(HEADER_ID_VAR_NAME, None) |
||
436 | count = int(data.get(COUNT_VAR_NAME, 10)) |
||
437 | self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
||
438 | if header_id is None: |
||
439 | messages = Message.objects.filter( |
||
440 | # Only public or private or private |
||
441 | Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id) |
||
442 | ).order_by('-pk')[:count] |
||
443 | else: |
||
444 | messages = Message.objects.filter( |
||
445 | Q(id__lt=header_id), |
||
446 | Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id) |
||
447 | ).order_by('-pk')[:count] |
||
448 | response = self.do_db(self.get_messages, messages) |
||
449 | self.safe_write(response) |
||
450 | |||
451 | def save_ip(self): |
||
452 | user_id = None if self.user_id == 0 else self.user_id |
||
453 | anon_name = self.sender_name if self.user_id == 0 else None |
||
454 | if (self.do_db(UserJoinedInfo.objects.filter( |
||
455 | Q(ip__ip=self.ip) & Q(anon_name=anon_name) & Q(user_id=user_id)).exists)): |
||
456 | return |
||
457 | ip_address = self.get_or_create_ip() |
||
458 | UserJoinedInfo.objects.create( |
||
459 | ip=ip_address, |
||
460 | user_id=user_id, |
||
461 | anon_name=anon_name |
||
462 | ) |
||
463 | |||
464 | def get_or_create_ip(self): |
||
465 | try: |
||
466 | ip_address = IpAddress.objects.get(ip=self.ip) |
||
467 | except IpAddress.DoesNotExist: |
||
468 | try: |
||
469 | if not api_url: |
||
470 | raise Exception('api url is absent') |
||
471 | self.logger.debug("Creating ip record %s", self.ip) |
||
472 | f = urlopen(api_url % self.ip) |
||
473 | raw_response = f.read().decode("utf-8") |
||
474 | response = json.loads(raw_response) |
||
475 | if response['status'] != "success": |
||
476 | raise Exception("Creating iprecord failed, server responded: %s" % raw_response) |
||
477 | ip_address = IpAddress.objects.create( |
||
478 | ip=self.ip, |
||
479 | isp=response['isp'], |
||
480 | country=response['country'], |
||
481 | region=response['regionName'], |
||
482 | city=response['city'] |
||
483 | ) |
||
484 | except Exception as e: |
||
485 | self.logger.error("Error while creating ip with country info, because %s", e) |
||
486 | ip_address = IpAddress.objects.create(ip=self.ip) |
||
487 | return ip_address |
||
488 | |||
623 |