Total Complexity | 47 |
Total Lines | 253 |
Duplicated Lines | 0 % |
Complex classes like MessagesHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import json |
||
212 | class MessagesHandler(MessagesCreator): |
||
213 | |||
214 | def __init__(self, *args, **kwargs): |
||
215 | super(MessagesHandler, self).__init__(*args, **kwargs) |
||
216 | self.log_id = str(id(self) % 10000).rjust(4, '0') |
||
217 | self.ip = None |
||
218 | log_params = { |
||
219 | 'username': '00000000', |
||
220 | 'id': self.log_id, |
||
221 | 'ip': 'initializing' |
||
222 | } |
||
223 | self.logger = logging.LoggerAdapter(logger, log_params) |
||
224 | self.async_redis = tornadoredis.Client() |
||
225 | self.process_message = { |
||
226 | GET_MINE_USERNAME_EVENT: self.process_change_username, |
||
227 | GET_MESSAGES_EVENT: self.process_get_messages, |
||
228 | SEND_MESSAGE_EVENT: self.process_send_message, |
||
229 | } |
||
230 | |||
231 | def do_db(self, callback, *arg, **args): |
||
232 | try: |
||
233 | return callback(*arg, **args) |
||
234 | except (OperationalError, InterfaceError) as e: # Connection has gone away |
||
235 | self.logger.warning('%s, reconnecting' % e) # TODO |
||
236 | connection.close() |
||
237 | return callback(*arg, **args) |
||
238 | |||
239 | def get_online_from_redis(self, check_name=None, check_id=None): |
||
240 | """ |
||
241 | :rtype : dict |
||
242 | returns (dict, bool) if check_type is present |
||
243 | """ |
||
244 | online = sync_redis.hgetall(REDIS_ONLINE_USERS) |
||
245 | self.logger.debug('!! redis online: %s', online) |
||
246 | result = {} |
||
247 | user_is_online = False |
||
248 | # redis stores REDIS_USER_FORMAT, so parse them |
||
249 | if online: |
||
250 | for key, raw_user_sex in online.items(): # py2 iteritems |
||
251 | (name, sex, user_id) = raw_user_sex.decode('utf-8').split(':') |
||
252 | if name == check_name and check_id != int(key.decode('utf-8')): |
||
253 | user_is_online = True |
||
254 | result.update(self.online_js_structure(name, sex, user_id)) |
||
255 | if check_id: |
||
256 | return result, user_is_online |
||
257 | else: |
||
258 | return result |
||
259 | |||
260 | def add_online_user(self): |
||
261 | """ |
||
262 | adds to redis |
||
263 | online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
||
264 | :return: |
||
265 | """ |
||
266 | online = self.get_online_from_redis() |
||
267 | async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user) |
||
268 | first_tab = False |
||
269 | if self.sender_name not in online: # if a new tab has been opened |
||
270 | online.update(self.online_self_js_structure) |
||
271 | first_tab = True |
||
272 | |||
273 | if first_tab: # Login event, sent user names to all |
||
274 | online_user_names_mes = self.online_user_names(online, LOGIN_EVENT) |
||
275 | self.logger.info('!! First tab, sending refresh online for all') |
||
276 | self.publish(online_user_names_mes) |
||
277 | else: # Send user names to self |
||
278 | online_user_names_mes = self.online_user_names(online, REFRESH_USER_EVENT) |
||
279 | self.logger.info('!! Second tab, retrieving online for self') |
||
280 | self.safe_write(online_user_names_mes) |
||
281 | # send usernamechat |
||
282 | username_message = self.default(self.sender_name, GET_MINE_USERNAME_EVENT) |
||
283 | self.safe_write(username_message) |
||
284 | |||
285 | def set_username(self, session_key): |
||
286 | """ |
||
287 | Case registered: Fetch userName and its channels from database. returns them |
||
288 | Case anonymous: generates a new name and saves it to session. returns default channel |
||
289 | :return: channels user should subscribe |
||
290 | """ |
||
291 | session = SessionStore(session_key) |
||
292 | try: |
||
293 | self.user_id = int(session["_auth_user_id"]) |
||
294 | user_db = self.do_db(User.objects.get, id=self.user_id) # everything but 0 is a registered user |
||
295 | self.sender_name = user_db.username |
||
296 | self.sex = user_db.sex_str |
||
297 | rooms = user_db.rooms.all() # do_db is used already |
||
298 | room_names = {} |
||
299 | channels = [self.channel, ] |
||
300 | for room in rooms: |
||
301 | room_names[room.id] = room.name |
||
302 | channels.append(REDIS_ROOM_CHANNEL_PREFIX % room.id) |
||
303 | rooms_message = self.default(room_names, ROOMS_EVENT) |
||
304 | self.logger.info("!! User %s subscribes for %s", self.sender_name, room_names) |
||
305 | except (KeyError, User.DoesNotExist): |
||
306 | # Anonymous |
||
307 | self.sender_name = session.get(SESSION_USER_VAR_KEY) |
||
308 | if self.sender_name is None: |
||
309 | self.sender_name = id_generator(8) |
||
310 | session[SESSION_USER_VAR_KEY] = self.sender_name |
||
311 | session.save() |
||
312 | self.logger.info("!! A new user log in, created username %s", self.sender_name) |
||
313 | else: |
||
314 | self.logger.info("!! Anonymous with name %s has logged", self.sender_name) |
||
315 | channels = [ANONYMOUS_REDIS_CHANNEL, self.channel] |
||
316 | rooms_message = self.default(ANONYMOUS_ROOM_NAMES, ROOMS_EVENT) |
||
317 | finally: |
||
318 | self.safe_write(rooms_message) |
||
319 | return channels |
||
320 | |||
321 | def publish(self, message, channel=ANONYMOUS_REDIS_CHANNEL): |
||
322 | jsoned_mess = json.dumps(message) |
||
323 | self.logger.debug('<%s> %s', channel, jsoned_mess) |
||
324 | async_redis_publisher.publish(channel, jsoned_mess) |
||
325 | |||
326 | # TODO really parse every single message for 1 action? |
||
327 | def check_and_finish_change_name(self, message): |
||
328 | if self.sex == ANONYMOUS_GENDER: |
||
329 | parsed_message = json.loads(message) |
||
330 | if parsed_message[EVENT_VAR_NAME] == CHANGE_ANONYMOUS_NAME_EVENT\ |
||
331 | and parsed_message[OLD_NAME_VAR_NAME] == self.sender_name: |
||
332 | self.async_redis.unsubscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name) |
||
333 | self.sender_name = parsed_message[USER_VAR_NAME] |
||
334 | self.async_redis.subscribe(REDIS_USERNAME_CHANNEL_PREFIX % self.sender_name) |
||
335 | async_redis_publisher.hset(REDIS_ONLINE_USERS, id(self), self.stored_redis_user) |
||
336 | |||
337 | def new_message(self, message): |
||
338 | if type(message.body) is not int: # subscribe event |
||
339 | self.safe_write(message.body) |
||
340 | self.check_and_finish_change_name(message.body) |
||
341 | |||
342 | def safe_write(self, message): |
||
343 | raise NotImplementedError('WebSocketHandler implements') |
||
344 | |||
345 | def process_send_message(self, message): |
||
346 | """ |
||
347 | :type message: dict |
||
348 | """ |
||
349 | content = message[CONTENT_VAR_NAME] |
||
350 | receiver_id = message.get(RECEIVER_USERID_VAR_NAME) # if receiver_id is None then its a private message |
||
351 | receiver_name = message.get(RECEIVER_USERNAME_VAR_NAME) |
||
352 | self.logger.info('!! Sending message %s to username:%s, id:%s', content, receiver_name, receiver_id) |
||
353 | save_to_db = True |
||
354 | if receiver_id is not None and receiver_id != 0: |
||
355 | receiver_channel = REDIS_USERID_CHANNEL_PREFIX % receiver_id |
||
356 | elif receiver_name is not None: |
||
357 | receiver_channel = REDIS_USERNAME_CHANNEL_PREFIX % receiver_name |
||
358 | save_to_db = False |
||
359 | |||
360 | if self.user_id != 0 and save_to_db: |
||
361 | self.logger.debug('!! Saving it to db') |
||
362 | message_db = Message(sender_id=self.user_id, content=content, receiver_id=receiver_id) |
||
363 | self.do_db(message_db.save) # exit on hacked id with exception |
||
364 | prepared_message = self.create_send_message(message_db) |
||
365 | else: |
||
366 | self.logger.debug('!! NOT saving it') |
||
367 | prepared_message = self.send_anonymous(content, receiver_name, receiver_id) |
||
368 | |||
369 | if receiver_id is None: |
||
370 | self.logger.debug('!! Detected as public') |
||
371 | self.publish(prepared_message) |
||
372 | else: |
||
373 | self.publish(prepared_message, self.channel) |
||
374 | self.logger.debug('!! Detected as private, channel %s', receiver_channel) |
||
375 | if receiver_channel != self.channel: |
||
376 | self.publish(prepared_message, receiver_channel) |
||
377 | |||
378 | def process_change_username(self, message): |
||
379 | """ |
||
380 | :type message: dict |
||
381 | """ |
||
382 | self.logger.info('!! Changing username to %s', message[CONTENT_VAR_NAME]) |
||
383 | new_username = message[CONTENT_VAR_NAME] |
||
384 | try: |
||
385 | self.do_db(check_user, new_username) |
||
386 | online = self.get_online_from_redis() |
||
387 | if new_username in online: |
||
388 | self.logger.info('!! This name is already used') |
||
389 | raise ValidationError('This name is already used by another anonymous!') |
||
390 | session_key = self.get_cookie(settings.SESSION_COOKIE_NAME) |
||
391 | session = SessionStore(session_key) |
||
392 | session[SESSION_USER_VAR_KEY] = new_username |
||
393 | session.save() |
||
394 | try: |
||
395 | del online[self.sender_name] |
||
396 | except KeyError: # if two or more change_username events in fast time |
||
397 | pass |
||
398 | old_username = self.sender_name |
||
399 | # temporary set username for creating messages with self.online_self_js_structure, change_user_nickname |
||
400 | self.sender_name = new_username |
||
401 | online.update(self.online_self_js_structure) |
||
402 | message_all = self.change_user_nickname(old_username, online) |
||
403 | self.sender_name = old_username # see check_and_finish_change_name |
||
404 | self.publish(message_all) |
||
405 | except ValidationError as e: |
||
406 | self.safe_write(self.default(str(e.message), event=GROWL_MESSAGE_EVENT)) |
||
407 | |||
408 | def process_get_messages(self, data): |
||
409 | """ |
||
410 | :type data: dict |
||
411 | """ |
||
412 | header_id = data.get(HEADER_ID_VAR_NAME, None) |
||
413 | count = int(data.get(COUNT_VAR_NAME, 10)) |
||
414 | self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
||
415 | if header_id is None: |
||
416 | messages = Message.objects.filter( |
||
417 | # Only public or private or private |
||
418 | Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id) |
||
419 | ).order_by('-pk')[:count] |
||
420 | else: |
||
421 | messages = Message.objects.filter( |
||
422 | Q(id__lt=header_id), |
||
423 | Q(receiver=None) | Q(sender=self.user_id) | Q(receiver=self.user_id) |
||
424 | ).order_by('-pk')[:count] |
||
425 | response = self.do_db(self.get_messages, messages) |
||
426 | self.safe_write(response) |
||
427 | |||
428 | def save_ip(self): |
||
429 | user_id = None if self.user_id == 0 else self.user_id |
||
430 | anon_name = self.sender_name if self.user_id == 0 else None |
||
431 | if (self.do_db(UserJoinedInfo.objects.filter( |
||
432 | Q(ip__ip=self.ip) & Q(anon_name=anon_name) & Q(user_id=user_id)).exists)): |
||
433 | return |
||
434 | ip_address = self.get_or_create_ip() |
||
435 | UserJoinedInfo.objects.create( |
||
436 | ip=ip_address, |
||
437 | user_id=user_id, |
||
438 | anon_name=anon_name |
||
439 | ) |
||
440 | |||
441 | def get_or_create_ip(self): |
||
442 | try: |
||
443 | ip_address = IpAddress.objects.get(ip=self.ip) |
||
444 | except IpAddress.DoesNotExist: |
||
445 | try: |
||
446 | if not api_url: |
||
447 | raise Exception('api url is absent') |
||
448 | self.logger.debug("Creating ip record %s", self.ip) |
||
449 | f = urlopen(api_url % self.ip) |
||
450 | raw_response = f.read().decode("utf-8") |
||
451 | response = json.loads(raw_response) |
||
452 | if response['status'] != "success": |
||
453 | raise Exception("Creating iprecord failed, server responded: %s" % raw_response) |
||
454 | ip_address = IpAddress.objects.create( |
||
455 | ip=self.ip, |
||
456 | isp=response['isp'], |
||
457 | country=response['country'], |
||
458 | region=response['regionName'], |
||
459 | city=response['city'] |
||
460 | ) |
||
461 | except Exception as e: |
||
462 | self.logger.error("Error while creating ip with country info, because %s", e) |
||
463 | ip_address = IpAddress.objects.create(ip=self.ip) |
||
464 | return ip_address |
||
465 | |||
601 |