Total Complexity | 125 |
Total Lines | 627 |
Duplicated Lines | 10.85 % |
Changes | 37 | ||
Bugs | 4 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like MessagesHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import json |
||
321 | class MessagesHandler(MessagesCreator): |
||
322 | |||
323 | def __init__(self, *args, **kwargs): |
||
324 | self.closed_channels = None |
||
325 | self.parsable_prefix = 'p' |
||
326 | super(MessagesHandler, self).__init__(*args, **kwargs) |
||
327 | self.webrtc_ids = {} |
||
328 | self.ip = None |
||
329 | from chat import global_redis |
||
330 | self.async_redis_publisher = global_redis.async_redis_publisher |
||
331 | self.sync_redis = global_redis.sync_redis |
||
332 | self.channels = [] |
||
333 | self._logger = None |
||
334 | self.async_redis = tornadoredis.Client(port=TORNADO_REDIS_PORT) |
||
335 | self.patch_tornadoredis() |
||
336 | self.pre_process_message = { |
||
337 | Actions.GET_MESSAGES: self.process_get_messages, |
||
338 | Actions.SEND_MESSAGE: self.process_send_message, |
||
339 | Actions.WEBRTC: self.proxy_webrtc, |
||
340 | Actions.CLOSE_FILE_CONNECTION: self.close_file_connection, |
||
341 | Actions.CLOSE_CALL_CONNECTION: self.close_call_connection, |
||
342 | Actions.CANCEL_CALL_CONNECTION: self.cancel_call_connection, |
||
343 | Actions.ACCEPT_CALL: self.accept_call, |
||
344 | Actions.ACCEPT_FILE: self.accept_file, |
||
345 | Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel, |
||
346 | Actions.DELETE_ROOM: self.delete_channel, |
||
347 | Actions.EDIT_MESSAGE: self.edit_message, |
||
348 | Actions.CREATE_ROOM_CHANNEL: self.create_new_room, |
||
349 | Actions.INVITE_USER: self.invite_user, |
||
350 | Actions.OFFER_FILE_CONNECTION: self.offer_webrtc_connection, |
||
351 | Actions.OFFER_CALL_CONNECTION: self.offer_webrtc_connection, |
||
352 | Actions.REPLY_FILE_CONNECTION: self.reply_file_connection, |
||
353 | Actions.REPLY_CALL_CONNECTION: self.reply_call_connection, |
||
354 | } |
||
355 | self.post_process_message = { |
||
356 | Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel, |
||
357 | Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel, |
||
358 | Actions.DELETE_ROOM: self.send_client_delete_channel, |
||
359 | Actions.INVITE_USER: self.send_client_new_channel, |
||
360 | Actions.OFFER_FILE_CONNECTION: self.set_opponent_call_channel, |
||
361 | Actions.OFFER_CALL_CONNECTION: self.set_opponent_call_channel |
||
362 | } |
||
363 | |||
364 | def patch_tornadoredis(self): # TODO remove this |
||
365 | fabric = type(self.async_redis.connection.readline) |
||
366 | self.async_redis.connection.old_read = self.async_redis.connection.readline |
||
367 | def new_read(new_self, callback=None): |
||
368 | try: |
||
369 | return new_self.old_read(callback=callback) |
||
370 | except Exception as e: |
||
371 | current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL) |
||
372 | self.logger.error(e) |
||
373 | self.logger.error( |
||
374 | "Exception info: " |
||
375 | "self.id: %s ;;; " |
||
376 | "self.connected = '%s';;; " |
||
377 | "Redis default channel online = '%s';;; " |
||
378 | "self.channels = '%s';;; " |
||
379 | "self.closed_channels = '%s';;;", |
||
380 | self.id, self.connected, current_online, self.channels, self.closed_channels |
||
381 | ) |
||
382 | raise e |
||
383 | |||
384 | self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection) |
||
385 | |||
386 | @property |
||
387 | def connected(self): |
||
388 | raise NotImplemented |
||
389 | |||
390 | @connected.setter |
||
391 | def connected(self, value): |
||
392 | raise NotImplemented |
||
393 | |||
394 | @tornado.gen.engine |
||
395 | def listen(self, channels): |
||
396 | yield tornado.gen.Task( |
||
397 | self.async_redis.subscribe, channels) |
||
398 | self.async_redis.listen(self.pub_sub_message) |
||
399 | |||
400 | @property |
||
401 | def logger(self): |
||
402 | return self._logger if self._logger else base_logger |
||
403 | |||
404 | @tornado.gen.engine |
||
405 | def add_channel(self, channel): |
||
406 | self.channels.append(channel) |
||
407 | yield tornado.gen.Task( |
||
408 | self.async_redis.subscribe, (channel,)) |
||
409 | |||
410 | def evaluate(self, query_set): |
||
411 | self.do_db(len, query_set) |
||
412 | return query_set |
||
413 | |||
414 | def do_db(self, callback, *args, **kwargs): |
||
415 | try: |
||
416 | return callback(*args, **kwargs) |
||
417 | except (OperationalError, InterfaceError) as e: |
||
418 | if 'MySQL server has gone away' in str(e): |
||
419 | self.logger.warning('%s, reconnecting' % e) |
||
420 | connection.close() |
||
421 | return callback(*args, **kwargs) |
||
422 | else: |
||
423 | raise e |
||
424 | |||
425 | def execute_query(self, query, *args, **kwargs): |
||
426 | cursor = connection.cursor() |
||
427 | cursor.execute(query, *args, **kwargs) |
||
428 | desc = cursor.description |
||
429 | return [ |
||
430 | dict(zip([col[0] for col in desc], row)) |
||
431 | for row in cursor.fetchall() |
||
432 | ] |
||
433 | |||
434 | def get_online_from_redis(self, channel, check_self_online=False): |
||
435 | """ |
||
436 | :rtype : dict |
||
437 | returns (dict, bool) if check_type is present |
||
438 | """ |
||
439 | online = self.sync_redis.smembers(channel) |
||
440 | self.logger.debug('!! channel %s redis online: %s', channel, online) |
||
441 | result = set() |
||
442 | user_is_online = False |
||
443 | # redis stores8 REDIS_USER_FORMAT, so parse them |
||
444 | if online: |
||
445 | for raw in online: # py2 iteritems |
||
446 | decoded = raw.decode('utf-8') |
||
447 | # : char specified in cookies_middleware.py.create_id |
||
448 | user_id = int(decoded.split(':')[0]) |
||
449 | if user_id == self.user_id and decoded != self.id: |
||
450 | user_is_online = True |
||
451 | result.add(user_id) |
||
452 | result = list(result) |
||
453 | return (result, user_is_online) if check_self_online else result |
||
454 | |||
455 | def add_online_user(self, room_id, offline_messages=None): |
||
456 | """ |
||
457 | adds to redis |
||
458 | online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
||
459 | :return: |
||
460 | """ |
||
461 | self.async_redis_publisher.sadd(room_id, self.id) |
||
462 | # since we add user to online first, latest trigger will always show correct online |
||
463 | online, is_online = self.get_online_from_redis(room_id, True) |
||
464 | if not is_online: # if a new tab has been opened |
||
465 | online.append(self.user_id) |
||
466 | online_user_names_mes = self.room_online( |
||
467 | online, |
||
468 | Actions.LOGIN, |
||
469 | room_id |
||
470 | ) |
||
471 | self.logger.info('!! First tab, sending refresh online for all') |
||
472 | self.publish(online_user_names_mes, room_id) |
||
473 | if offline_messages: |
||
474 | self.ws_write(self.load_offline_message(offline_messages, room_id)) |
||
475 | else: # Send user names to self |
||
476 | online_user_names_mes = self.room_online( |
||
477 | online, |
||
478 | Actions.REFRESH_USER, |
||
479 | room_id |
||
480 | ) |
||
481 | self.logger.info('!! Second tab, retrieving online for self') |
||
482 | self.ws_write(online_user_names_mes) |
||
483 | |||
484 | def publish(self, message, channel, parsable=False): |
||
485 | jsoned_mess = json.dumps(message) |
||
486 | self.logger.debug('<%s> %s', channel, jsoned_mess) |
||
487 | if parsable: |
||
488 | jsoned_mess = self.encode(jsoned_mess) |
||
489 | self.async_redis_publisher.publish(channel, jsoned_mess) |
||
490 | |||
491 | def encode(self, message): |
||
492 | """ |
||
493 | Marks message with prefix to specify that |
||
494 | it should be decoded and proccesed before sending to client |
||
495 | @param message: message to mark |
||
496 | @return: marked message |
||
497 | """ |
||
498 | return self.parsable_prefix + message |
||
499 | |||
500 | def remove_parsable_prefix(self, message): |
||
501 | if message.startswith(self.parsable_prefix): |
||
502 | return message[1:] |
||
503 | |||
504 | def pub_sub_message(self, message): |
||
505 | data = message.body |
||
506 | if isinstance(data, str_type): # subscribe event |
||
507 | prefixless_str = self.remove_parsable_prefix(data) |
||
508 | if prefixless_str: |
||
509 | dict_message = json.loads(prefixless_str) |
||
510 | res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message) |
||
511 | if not res: |
||
512 | self.ws_write(prefixless_str) |
||
513 | else: |
||
514 | self.ws_write(data) |
||
515 | |||
516 | def ws_write(self, message): |
||
517 | raise NotImplementedError('WebSocketHandler implements') |
||
518 | |||
519 | def process_send_message(self, message): |
||
520 | """ |
||
521 | :type message: dict |
||
522 | """ |
||
523 | channel = message[VarNames.CHANNEL] |
||
524 | message_db = Message( |
||
525 | sender_id=self.user_id, |
||
526 | content=message[VarNames.CONTENT] |
||
527 | ) |
||
528 | message_db.room_id = channel |
||
529 | self.do_db(message_db.save) |
||
530 | res_imgs = self.parse_imgs(message.get(VarNames.IMG), message_db.id) |
||
531 | prepared_message = self.create_send_message(message_db, Actions.PRINT_MESSAGE, self.prepare_img(res_imgs, message_db.id)) |
||
532 | self.publish(prepared_message, channel) |
||
533 | |||
534 | def parse_imgs(self, imgs, mess_id): |
||
535 | res_imgs = [] |
||
536 | View Code Duplication | if imgs: |
|
|
|||
537 | fetch = False |
||
538 | for k in imgs: |
||
539 | b64 =imgs[k].get(VarNames.IMG_B64) |
||
540 | if b64: |
||
541 | img = extract_photo(imgs[k][VarNames.IMG_B64], imgs[k][VarNames.IMG_FILE_NAME]) |
||
542 | res_imgs.append(Image(message_id=mess_id, img=img, symbol=k)) |
||
543 | else: |
||
544 | fetch = True |
||
545 | fetched_messages = None |
||
546 | if fetch: |
||
547 | fetched_messages = Image.objects.filter(message_id=mess_id) |
||
548 | len(fetched_messages) # fetch |
||
549 | if res_imgs: |
||
550 | Image.objects.bulk_create(res_imgs) |
||
551 | if fetched_messages: |
||
552 | for m in fetched_messages: |
||
553 | res_imgs.append(m) |
||
554 | return res_imgs |
||
555 | |||
556 | View Code Duplication | def close_file_connection(self, in_message): |
|
557 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
558 | self_channel_status = self.sync_redis.shget(connection_id, self.id) |
||
559 | if not self_channel_status: |
||
560 | raise Exception("Access Denied") |
||
561 | if self_channel_status != WebRtcRedisStates.CLOSED: |
||
562 | sender_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
563 | if sender_id == self.id: |
||
564 | self.close_file_sender(connection_id) |
||
565 | else: |
||
566 | self.close_file_receiver(connection_id, in_message, sender_id) |
||
567 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED) |
||
568 | |||
569 | def close_call_connection(self, in_message): |
||
570 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
571 | conn_users = self.sync_redis.shgetall(connection_id) |
||
572 | if conn_users[self.id] in [WebRtcRedisStates.READY, WebRtcRedisStates.RESPONDED]: |
||
573 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.CLOSED) |
||
574 | del conn_users[self.id] |
||
575 | message = { |
||
576 | VarNames.EVENT: Actions.CLOSE_CALL_CONNECTION, |
||
577 | VarNames.CONNECTION_ID: connection_id, |
||
578 | VarNames.USER_ID: self.user_id, |
||
579 | VarNames.WEBRTC_OPPONENT_ID: self.id, |
||
580 | VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION, |
||
581 | } |
||
582 | for user in conn_users: |
||
583 | if conn_users[user] != WebRtcRedisStates.CLOSED: |
||
584 | self.publish(message, user) |
||
585 | else: |
||
586 | raise ValidationError("Invalid channel status.") |
||
587 | |||
588 | def cancel_call_connection(self, in_message, reply_action): |
||
589 | self.send_call_answer(in_message, WebRtcRedisStates.CLOSED, Actions.CANCEL_CALL_CONNECTION) |
||
590 | |||
591 | def close_file_receiver(self, connection_id, in_message, sender_id): |
||
592 | sender_status = self.sync_redis.shget(connection_id, sender_id) |
||
593 | if not sender_status: |
||
594 | raise Exception("Access denied") |
||
595 | if sender_status != WebRtcRedisStates.CLOSED: |
||
596 | in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id |
||
597 | in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION |
||
598 | self.publish(in_message, sender_id) |
||
599 | |||
600 | def close_file_sender(self, connection_id): |
||
601 | values = self.sync_redis.shgetall(connection_id) |
||
602 | del values[self.id] |
||
603 | for ws_id in values: |
||
604 | if values[ws_id] == WebRtcRedisStates.CLOSED: |
||
605 | continue |
||
606 | self.publish({ |
||
607 | VarNames.EVENT: Actions.CLOSE_FILE_CONNECTION, |
||
608 | VarNames.CONNECTION_ID: connection_id, |
||
609 | VarNames.WEBRTC_OPPONENT_ID: self.id, |
||
610 | VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER, |
||
611 | }, ws_id) |
||
612 | |||
613 | def accept_file(self, in_message): |
||
614 | connection_id = in_message[VarNames.CONNECTION_ID] # TODO accept all if call |
||
615 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
616 | sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id) |
||
617 | self_ws_status = self.sync_redis.shget(connection_id, self.id) |
||
618 | if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.RESPONDED: |
||
619 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
620 | self.publish({ |
||
621 | VarNames.EVENT: Actions.ACCEPT_FILE, |
||
622 | VarNames.CONNECTION_ID: connection_id, |
||
623 | VarNames.WEBRTC_OPPONENT_ID: self.id, |
||
624 | VarNames.HANDLER_NAME: HandlerNames.PEER_CONNECTION, |
||
625 | View Code Duplication | }, sender_ws_id) |
|
626 | else: |
||
627 | raise ValidationError("Invalid channel status") |
||
628 | |||
629 | # todo if we shgetall and only then do async hset |
||
630 | # todo we can catch an issue when 2 concurrent users accepted the call |
||
631 | # todo but we didn't send them ACCEPT_CALL as they both were in status 'offered' |
||
632 | # def accept_call(self, in_message): |
||
633 | # connection_id = in_message[VarNames.CONNECTION_ID] |
||
634 | # channel_status = self.sync_redis.shgetall(connection_id) |
||
635 | # if channel_status and channel_status[self.id] == WebRtcRedisStates.RESPONDED: |
||
636 | # self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
637 | # for key in channel_status: # del channel_status[self.id] not needed as self in responded |
||
638 | # if channel_status[key] == WebRtcRedisStates.READY: |
||
639 | # self.publish({ |
||
640 | # VarNames.EVENT: Actions.ACCEPT_CALL, |
||
641 | # VarNames.CONNECTION_ID: connection_id, |
||
642 | # VarNames.WEBRTC_OPPONENT_ID: self.id, |
||
643 | # VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER, |
||
644 | # }, key) |
||
645 | View Code Duplication | # else: |
|
646 | # raise ValidationError("Invalid channel status") |
||
647 | |||
648 | def accept_call(self, in_message): |
||
649 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
650 | self_status = self.sync_redis.shget(connection_id, self.id) |
||
651 | if self_status == WebRtcRedisStates.RESPONDED: |
||
652 | self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
653 | channel_status = self.sync_redis.shgetall(connection_id) |
||
654 | del channel_status[self.id] |
||
655 | message = { |
||
656 | VarNames.EVENT: Actions.ACCEPT_CALL, |
||
657 | VarNames.USER_ID: self.user_id, |
||
658 | VarNames.CONNECTION_ID: connection_id, |
||
659 | VarNames.WEBRTC_OPPONENT_ID: self.id, |
||
660 | VarNames.HANDLER_NAME: HandlerNames.WEBRTC_TRANSFER, |
||
661 | } |
||
662 | for key in channel_status: |
||
663 | if channel_status[key] != WebRtcRedisStates.CLOSED: |
||
664 | self.publish(message, key) |
||
665 | else: |
||
666 | raise ValidationError("Invalid channel status") |
||
667 | |||
668 | def offer_webrtc_connection(self, in_message): |
||
669 | room_id = in_message[VarNames.CHANNEL] |
||
670 | content = in_message.get(VarNames.CONTENT) |
||
671 | qued_id = in_message[VarNames.WEBRTC_QUED_ID] |
||
672 | connection_id = id_generator(RedisPrefix.CONNECTION_ID_LENGTH) |
||
673 | # use list because sets dont have 1st element which is offerer |
||
674 | self.async_redis_publisher.hset(WEBRTC_CONNECTION, connection_id, self.id) |
||
675 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.READY) |
||
676 | opponents_message = self.offer_webrtc(content, connection_id, room_id, in_message[VarNames.EVENT]) |
||
677 | self_message = self.set_connection_id(qued_id, connection_id) |
||
678 | self.ws_write(self_message) |
||
679 | self.logger.info('!! Offering a webrtc, connection_id %s', connection_id) |
||
680 | self.publish(opponents_message, room_id, True) |
||
681 | |||
682 | def reply_call_connection(self, in_message): |
||
683 | self.send_call_answer(in_message, WebRtcRedisStates.RESPONDED, Actions.REPLY_CALL_CONNECTION) |
||
684 | |||
685 | def send_call_answer(self, in_message, status_set, reply_action): |
||
686 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
687 | conn_users = self.sync_redis.shgetall(connection_id) |
||
688 | if conn_users[self.id] == WebRtcRedisStates.OFFERED: |
||
689 | self.async_redis_publisher.hset(connection_id, self.id, status_set) |
||
690 | del conn_users[self.id] |
||
691 | message = self.reply_webrtc(reply_action, connection_id) |
||
692 | for user in conn_users: |
||
693 | if conn_users[user] != WebRtcRedisStates.CLOSED: |
||
694 | self.publish(message, user) |
||
695 | else: |
||
696 | raise ValidationError("Invalid channel status.") |
||
697 | |||
698 | def reply_file_connection(self, in_message): |
||
699 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
700 | sender_ws_id = self.sync_redis.shget(WEBRTC_CONNECTION, connection_id) |
||
701 | sender_ws_status = self.sync_redis.shget(connection_id, sender_ws_id) |
||
702 | self_ws_status = self.sync_redis.shget(connection_id, self.id) |
||
703 | if sender_ws_status == WebRtcRedisStates.READY and self_ws_status == WebRtcRedisStates.OFFERED: |
||
704 | self.async_redis_publisher.hset(connection_id, self.id, WebRtcRedisStates.RESPONDED) |
||
705 | self.publish(self.reply_webrtc(Actions.REPLY_FILE_CONNECTION, connection_id), sender_ws_id) |
||
706 | else: |
||
707 | raise ValidationError("Invalid channel status.") |
||
708 | |||
709 | def proxy_webrtc(self, in_message): |
||
710 | """ |
||
711 | :type in_message: dict |
||
712 | """ |
||
713 | connection_id = in_message[VarNames.CONNECTION_ID] |
||
714 | channel = in_message.get(VarNames.WEBRTC_OPPONENT_ID) |
||
715 | self_channel_status = self.sync_redis.shget(connection_id, self.id) |
||
716 | opponent_channel_status = self.sync_redis.shget(connection_id, channel) |
||
717 | if not (self_channel_status == WebRtcRedisStates.READY and opponent_channel_status == WebRtcRedisStates.READY): |
||
718 | raise ValidationError('Error in connection status, your status is {} while opponent is {}'.format( |
||
719 | self_channel_status, opponent_channel_status |
||
720 | )) # todo receiver should only accept proxy_webrtc from sender, sender can accept all |
||
721 | # I mean somebody if there're 3 ppl in 1 channel and first is initing transfer to 2nd and 3rd, |
||
722 | # 2nd guy can fraud 3rd guy webrtc traffic, which is allowed during the call, but not while transering file |
||
723 | in_message[VarNames.WEBRTC_OPPONENT_ID] = self.id |
||
724 | in_message[VarNames.HANDLER_NAME] = HandlerNames.PEER_CONNECTION |
||
725 | self.logger.debug("Forwarding message to channel %s, self %s, other status %s", |
||
726 | channel, self_channel_status, opponent_channel_status |
||
727 | ) |
||
728 | self.publish(in_message, channel) |
||
729 | |||
730 | def create_new_room(self, message): |
||
731 | room_name = message[VarNames.ROOM_NAME] |
||
732 | if not room_name or len(room_name) > 16: |
||
733 | raise ValidationError('Incorrect room name "{}"'.format(room_name)) |
||
734 | room = Room(name=room_name) |
||
735 | self.do_db(room.save) |
||
736 | RoomUsers(room_id=room.id, user_id=self.user_id).save() |
||
737 | subscribe_message = self.subscribe_room_channel_message(room.id, room_name) |
||
738 | self.publish(subscribe_message, self.channel, True) |
||
739 | |||
740 | def invite_user(self, message): |
||
741 | room_id = message[VarNames.ROOM_ID] |
||
742 | user_id = message[VarNames.USER_ID] |
||
743 | if room_id not in self.channels: |
||
744 | raise ValidationError("Access denied, only allowed for channels {}".format(self.channels)) |
||
745 | room = self.do_db(Room.objects.get, id=room_id) |
||
746 | if room.is_private: |
||
747 | raise ValidationError("You can't add users to direct room, create a new room instead") |
||
748 | try: |
||
749 | Room.users.through.objects.create(room_id=room_id, user_id=user_id) |
||
750 | except IntegrityError: |
||
751 | raise ValidationError("User is already in channel") |
||
752 | users_in_room = {} |
||
753 | for user in room.users.all(): |
||
754 | self.set_js_user_structure(users_in_room, user.id, user.username, user.sex) |
||
755 | self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id) |
||
756 | subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room) |
||
757 | self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True) |
||
758 | |||
759 | def create_room(self, user_rooms, user_id): |
||
760 | if self.user_id == user_id: |
||
761 | room_ids = list([room['room_id'] for room in self.evaluate(user_rooms)]) |
||
762 | query_res = self.execute_query(SELECT_SELF_ROOM, [room_ids, ]) |
||
763 | else: |
||
764 | rooms_query = RoomUsers.objects.filter(user_id=user_id, room__in=user_rooms) |
||
765 | query_res = rooms_query.values('room__id', 'room__disabled') |
||
766 | try: |
||
767 | room = self.do_db(query_res.get) |
||
768 | room_id = room['room__id'] |
||
769 | self.update_room(room_id, room['room__disabled']) |
||
770 | except RoomUsers.DoesNotExist: |
||
771 | room = Room() |
||
772 | room.save() |
||
773 | room_id = room.id |
||
774 | if self.user_id == user_id: |
||
775 | RoomUsers(user_id=self.user_id, room_id=room_id).save() |
||
776 | else: |
||
777 | RoomUsers.objects.bulk_create([ |
||
778 | RoomUsers(user_id=user_id, room_id=room_id), |
||
779 | RoomUsers(user_id=self.user_id, room_id=room_id), |
||
780 | ]) |
||
781 | return room_id |
||
782 | |||
783 | def update_room(self, room_id, disabled): |
||
784 | if not disabled: |
||
785 | raise ValidationError('This room already exist') |
||
786 | else: |
||
787 | Room.objects.filter(id=room_id).update(disabled=False) |
||
788 | |||
789 | def create_user_channel(self, message): |
||
790 | user_id = message[VarNames.USER_ID] |
||
791 | # get all self private rooms ids |
||
792 | user_rooms = Room.users.through.objects.filter(user_id=self.user_id, room__name__isnull=True).values('room_id') |
||
793 | # get private room that contains another user from rooms above |
||
794 | room_id = self.create_room(user_rooms, user_id) |
||
795 | subscribe_message = self.subscribe_direct_channel_message(room_id, user_id) |
||
796 | self.publish(subscribe_message, self.channel, True) |
||
797 | other_channel = RedisPrefix.generate_user(user_id) |
||
798 | if self.channel != other_channel: |
||
799 | self.publish(subscribe_message, other_channel, True) |
||
800 | |||
801 | def delete_channel(self, message): |
||
802 | room_id = message[VarNames.ROOM_ID] |
||
803 | if room_id not in self.channels or room_id == ALL_ROOM_ID: |
||
804 | raise ValidationError('You are not allowed to exit this room') |
||
805 | room = self.do_db(Room.objects.get, id=room_id) |
||
806 | if room.disabled: |
||
807 | raise ValidationError('Room is already deleted') |
||
808 | if room.name is None: # if private then disable |
||
809 | room.disabled = True |
||
810 | else: # if public -> leave the room, delete the link |
||
811 | RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete() |
||
812 | online = self.get_online_from_redis(room_id) |
||
813 | online.remove(self.user_id) |
||
814 | self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id) |
||
815 | room.save() |
||
816 | message = self.unsubscribe_direct_message(room_id) |
||
817 | self.publish(message, room_id, True) |
||
818 | |||
819 | def edit_message(self, data): |
||
820 | message_id = data[VarNames.MESSAGE_ID] |
||
821 | message = Message.objects.get(id=message_id) |
||
822 | if message.sender_id != self.user_id: |
||
823 | raise ValidationError("You can only edit your messages") |
||
824 | if message.time + 600000 < get_milliseconds(): |
||
825 | raise ValidationError("You can only edit messages that were send not more than 10 min ago") |
||
826 | if message.deleted: |
||
827 | raise ValidationError("Already deleted") |
||
828 | message.content = data[VarNames.CONTENT] |
||
829 | selector = Message.objects.filter(id=message_id) |
||
830 | if message.content is None: |
||
831 | imgs = None |
||
832 | selector.update(deleted=True) |
||
833 | action = Actions.DELETE_MESSAGE |
||
834 | else: |
||
835 | imgs = self.parse_imgs(data.get(VarNames.IMG), message.id) |
||
836 | prep_imgs = self.prepare_img(imgs, message_id) |
||
837 | action = Actions.EDIT_MESSAGE |
||
838 | selector.update(content=message.content) |
||
839 | self.publish(self.create_send_message(message, action, prep_imgs), message.room_id) |
||
840 | |||
841 | def send_client_new_channel(self, message): |
||
842 | room_id = message[VarNames.ROOM_ID] |
||
843 | self.add_channel(room_id) |
||
844 | self.add_online_user(room_id) |
||
845 | |||
846 | def set_opponent_call_channel(self, message): |
||
847 | connection_id = message[VarNames.CONNECTION_ID] |
||
848 | if message[VarNames.WEBRTC_OPPONENT_ID] == self.id: |
||
849 | return True |
||
850 | self.sync_redis.hset(connection_id, self.id, WebRtcRedisStates.OFFERED) |
||
851 | |||
852 | def send_client_delete_channel(self, message): |
||
853 | room_id = message[VarNames.ROOM_ID] |
||
854 | self.async_redis.unsubscribe((room_id,)) |
||
855 | self.async_redis_publisher.hdel(room_id, self.id) |
||
856 | self.channels.remove(room_id) |
||
857 | |||
858 | def process_get_messages(self, data): |
||
859 | """ |
||
860 | :type data: dict |
||
861 | """ |
||
862 | header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None) |
||
863 | count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10)) |
||
864 | room_id = data[VarNames.CHANNEL] |
||
865 | self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
||
866 | if header_id is None: |
||
867 | messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
868 | else: |
||
869 | messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
870 | images = self.do_db(self.get_message_images, messages) |
||
871 | response = self.get_messages(messages, room_id, images) |
||
872 | self.ws_write(response) |
||
873 | |||
874 | def get_message_images(self, messages): |
||
875 | ids =[message.id for message in messages] |
||
876 | images = Image.objects.filter(message_id__in=ids) |
||
877 | self.logger.info('!! Messages have %d images', len(images)) |
||
878 | return images |
||
879 | |||
880 | def get_offline_messages(self): |
||
881 | res = {} |
||
882 | off_mess = Message.objects.filter( |
||
883 | id__gt=F('room__roomusers__last_read_message_id'), |
||
884 | deleted=False, |
||
885 | room__roomusers__user_id=self.user_id |
||
886 | ) |
||
887 | images = self.do_db(self.get_message_images, off_mess) |
||
888 | for message in off_mess: |
||
889 | prep_m = self.create_message(message, self.prepare_img(images, message.id)) |
||
890 | res.setdefault(message.room_id, []).append(prep_m) |
||
891 | return res |
||
892 | |||
893 | def get_users_in_current_user_rooms(self): |
||
894 | """ |
||
895 | { |
||
896 | "ROOM_ID:1": { |
||
897 | "name": "All", |
||
898 | "users": { |
||
899 | "USER_ID:admin": { |
||
900 | "name": "USER_NAME:admin", |
||
901 | "sex": "SEX:Secret" |
||
902 | }, |
||
903 | "USER_ID_2": { |
||
904 | "name": "USER_NAME:Mike", |
||
905 | "sex": "Male" |
||
906 | } |
||
907 | }, |
||
908 | "isPrivate": true |
||
909 | } |
||
910 | } |
||
911 | """ |
||
912 | user_rooms = Room.objects.filter(users__id=self.user_id, disabled=False).values('id', 'name') |
||
913 | res = {room['id']: { |
||
914 | VarNames.ROOM_NAME: room['name'], |
||
915 | VarNames.ROOM_USERS: {} |
||
916 | } for room in user_rooms} |
||
917 | room_ids = (room_id for room_id in res) |
||
918 | rooms_users = User.objects.filter(rooms__in=room_ids).values('id', 'username', 'sex', 'rooms__id') |
||
919 | for user in rooms_users: |
||
920 | self.set_js_user_structure(res[user['rooms__id']][VarNames.ROOM_USERS], user['id'], user['username'], user['sex']) |
||
921 | return res |
||
922 | |||
923 | def set_js_user_structure(self, user_dict, user_id, name, sex): |
||
924 | user_dict[user_id] = { |
||
925 | VarNames.USER: name, |
||
926 | VarNames.GENDER: GENDERS[sex] |
||
927 | } |
||
928 | |||
929 | def save_ip(self): |
||
930 | if (self.do_db(UserJoinedInfo.objects.filter( |
||
931 | Q(ip__ip=self.ip) & Q(user_id=self.user_id)).exists)): |
||
932 | return |
||
933 | ip_address = get_or_create_ip(self.ip, self.logger) |
||
934 | UserJoinedInfo.objects.create( |
||
935 | ip=ip_address, |
||
936 | user_id=self.user_id |
||
937 | ) |
||
938 | |||
939 | def publish_logout(self, channel, log_data): |
||
940 | # seems like async solves problem with connection lost and wrong data status |
||
941 | # http://programmers.stackexchange.com/questions/294663/how-to-store-online-status |
||
942 | online, is_online = self.get_online_from_redis(channel, True) |
||
943 | log_data[channel] = {'online': online, 'is_online': is_online} |
||
944 | if not is_online: |
||
945 | message = self.room_online(online, Actions.LOGOUT, channel) |
||
946 | self.publish(message, channel) |
||
947 | return True |
||
948 | |||
1118 |