Total Complexity | 68 |
Total Lines | 347 |
Duplicated Lines | 4.32 % |
Changes | 5 | ||
Bugs | 0 | Features | 1 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like MessagesHandler often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import json |
||
36 | class MessagesHandler(MessagesCreator): |
||
37 | |||
38 | def __init__(self, *args, **kwargs): |
||
39 | self.closed_channels = None |
||
40 | self.parsable_prefix = 'p' |
||
41 | super(MessagesHandler, self).__init__() |
||
42 | self.webrtc_ids = {} |
||
43 | self.id = None # child init |
||
44 | self.sex = None |
||
45 | self.sender_name = None |
||
46 | self.user_id = 0 # anonymous by default |
||
47 | self.ip = None |
||
48 | from chat import global_redis |
||
49 | self.async_redis_publisher = global_redis.async_redis_publisher |
||
50 | self.sync_redis = global_redis.sync_redis |
||
51 | self.channels = [] |
||
52 | self._logger = None |
||
53 | self.async_redis = Client(port=TORNADO_REDIS_PORT) |
||
54 | self.patch_tornadoredis() |
||
55 | self.pre_process_message = { |
||
56 | Actions.GET_MESSAGES: self.process_get_messages, |
||
57 | Actions.SEND_MESSAGE: self.process_send_message, |
||
58 | Actions.CREATE_DIRECT_CHANNEL: self.create_user_channel, |
||
59 | Actions.DELETE_ROOM: self.delete_channel, |
||
60 | Actions.EDIT_MESSAGE: self.edit_message, |
||
61 | Actions.CREATE_ROOM_CHANNEL: self.create_new_room, |
||
62 | Actions.INVITE_USER: self.invite_user, |
||
63 | Actions.PING: self.respond_ping |
||
64 | } |
||
65 | self.post_process_message = { |
||
66 | Actions.CREATE_DIRECT_CHANNEL: self.send_client_new_channel, |
||
67 | Actions.CREATE_ROOM_CHANNEL: self.send_client_new_channel, |
||
68 | Actions.DELETE_ROOM: self.send_client_delete_channel, |
||
69 | Actions.INVITE_USER: self.send_client_new_channel |
||
70 | } |
||
71 | |||
72 | def patch_tornadoredis(self): # TODO remove this |
||
73 | fabric = type(self.async_redis.connection.readline) |
||
74 | self.async_redis.connection.old_read = self.async_redis.connection.readline |
||
75 | |||
76 | def new_read(new_self, callback=None): |
||
77 | try: |
||
78 | return new_self.old_read(callback=callback) |
||
79 | except Exception as e: |
||
80 | current_online = self.get_online_from_redis(RedisPrefix.DEFAULT_CHANNEL) |
||
81 | self.logger.error(e) |
||
82 | self.logger.error( |
||
83 | "Exception info: " |
||
84 | "self.id: %s ;;; " |
||
85 | "self.connected = '%s';;; " |
||
86 | "Redis default channel online = '%s';;; " |
||
87 | "self.channels = '%s';;; " |
||
88 | "self.closed_channels = '%s';;;", |
||
89 | self.id, self.connected, current_online, self.channels, self.closed_channels |
||
90 | ) |
||
91 | raise e |
||
92 | |||
93 | self.async_redis.connection.readline = fabric(new_read, self.async_redis.connection) |
||
94 | |||
95 | @property |
||
96 | def connected(self): |
||
97 | raise NotImplemented |
||
98 | |||
99 | @connected.setter |
||
100 | def connected(self, value): |
||
101 | raise NotImplemented |
||
102 | |||
103 | @property |
||
104 | def http_client(self): |
||
105 | raise NotImplemented |
||
106 | |||
107 | @engine |
||
108 | def listen(self, channels): |
||
109 | yield Task( |
||
110 | self.async_redis.subscribe, channels) |
||
111 | self.async_redis.listen(self.pub_sub_message) |
||
112 | |||
113 | @property |
||
114 | def logger(self): |
||
115 | return self._logger if self._logger else base_logger |
||
116 | |||
117 | @engine |
||
118 | def add_channel(self, channel): |
||
119 | self.channels.append(channel) |
||
120 | yield Task(self.async_redis.subscribe, (channel,)) |
||
121 | |||
122 | def get_online_from_redis(self, channel): |
||
123 | return self.get_online_and_status_from_redis(channel)[1] |
||
124 | |||
125 | def get_online_and_status_from_redis(self, channel): |
||
126 | """ |
||
127 | :rtype : (bool, list) |
||
128 | """ |
||
129 | online = self.sync_redis.ssmembers(channel) |
||
130 | self.logger.debug('!! channel %s redis online: %s', channel, online) |
||
131 | return self.parse_redis_online(online) if online else (False, []) |
||
132 | |||
133 | def parse_redis_online(self, online): |
||
134 | """ |
||
135 | :rtype : (bool, list) |
||
136 | """ |
||
137 | result = set() |
||
138 | user_is_online = False |
||
139 | for decoded in online: # py2 iteritems |
||
140 | # : char specified in cookies_middleware.py.create_id |
||
141 | user_id = int(decoded.split(':')[0]) |
||
142 | if user_id == self.user_id and decoded != self.id: |
||
143 | user_is_online = True |
||
144 | result.add(user_id) |
||
145 | return user_is_online, list(result) |
||
146 | |||
147 | def add_online_user(self, room_id, offline_messages=None): |
||
148 | """ |
||
149 | adds to redis |
||
150 | online_users = { connection_hash1 = stored_redis_user1, connection_hash_2 = stored_redis_user2 } |
||
151 | :return: |
||
152 | """ |
||
153 | self.async_redis_publisher.sadd(room_id, self.id) |
||
154 | # since we add user to online first, latest trigger will always show correct online |
||
155 | is_online, online = self.get_online_and_status_from_redis(room_id) |
||
156 | if is_online: # Send user names to self |
||
157 | online_user_names_mes = self.room_online(online, Actions.REFRESH_USER, room_id) |
||
158 | self.logger.info('!! Second tab, retrieving online for self') |
||
159 | self.ws_write(online_user_names_mes) |
||
160 | else: # if a new tab has been opened |
||
161 | online.append(self.user_id) |
||
162 | online_user_names_mes = self.room_online(online, Actions.LOGIN, room_id) |
||
163 | self.logger.info('!! First tab, sending refresh online for all') |
||
164 | self.publish(online_user_names_mes, room_id) |
||
165 | if offline_messages: |
||
166 | self.ws_write(self.load_offline_message(offline_messages, room_id)) |
||
167 | |||
168 | def publish(self, message, channel, parsable=False): |
||
169 | jsoned_mess = json.dumps(message) |
||
170 | self.logger.debug('<%s> %s', channel, jsoned_mess) |
||
171 | if parsable: |
||
172 | jsoned_mess = self.encode(jsoned_mess) |
||
173 | self.async_redis_publisher.publish(channel, jsoned_mess) |
||
174 | |||
175 | def encode(self, message): |
||
176 | """ |
||
177 | Marks message with prefix to specify that |
||
178 | it should be decoded and proccesed before sending to client |
||
179 | @param message: message to mark |
||
180 | @return: marked message |
||
181 | """ |
||
182 | return self.parsable_prefix + message |
||
183 | |||
184 | def remove_parsable_prefix(self, message): |
||
185 | if message.startswith(self.parsable_prefix): |
||
186 | return message[1:] |
||
187 | |||
188 | def pub_sub_message(self, message): |
||
189 | data = message.body |
||
190 | if isinstance(data, str_type): # subscribe event |
||
191 | prefixless_str = self.remove_parsable_prefix(data) |
||
192 | if prefixless_str: |
||
193 | dict_message = json.loads(prefixless_str) |
||
194 | res = self.post_process_message[dict_message[VarNames.EVENT]](dict_message) |
||
195 | if not res: |
||
196 | self.ws_write(prefixless_str) |
||
197 | else: |
||
198 | self.ws_write(data) |
||
199 | |||
200 | def ws_write(self, message): |
||
201 | raise NotImplementedError('WebSocketHandler implements') |
||
202 | |||
203 | @asynchronous |
||
204 | def search_giphy(self, message, query, cb): |
||
205 | self.logger.debug("!! Asking giphy for: %s", query) |
||
206 | def on_giphy_reply(response): |
||
207 | try: |
||
208 | self.logger.debug("!! Got giphy response: " + str(response.body)) |
||
209 | res = json.loads(response.body) |
||
210 | giphy = res['data']['image_url'] |
||
211 | except: |
||
212 | giphy = None |
||
213 | cb(message, giphy) |
||
214 | url = GIPHY_URL.format(GIPHY_API_KEY, quote(query, safe='')) |
||
215 | self.http_client.fetch(url, callback=on_giphy_reply) |
||
216 | |||
217 | def notify_offline(self, channel): |
||
218 | if FIREBASE_API_KEY is None: |
||
219 | return |
||
220 | online = self.get_online_from_redis(channel) |
||
221 | if channel == ALL_ROOM_ID: |
||
222 | return |
||
223 | offline_users = UserProfile.objects.filter(rooms__id=channel, notifications=True).exclude(id__in=online).only('id') |
||
224 | reg_ids = evaluate(Subscription.objects.filter(user__in=offline_users).values_list('registration_id', flat=True)) |
||
225 | if len(reg_ids) == 0: |
||
226 | return |
||
227 | self.post_firebase(list(reg_ids)) |
||
228 | |||
229 | @asynchronous |
||
230 | def post_firebase(self, reg_ids): |
||
231 | def on_reply(response): |
||
232 | try: |
||
233 | self.logger.debug("!! FireBase response: " + str(response.body)) |
||
234 | except Exception as e: |
||
235 | self.logger.error("Unable to parse response" + str(e)) |
||
236 | pass |
||
237 | |||
238 | headers = {"Content-Type": "application/json", "Authorization": "key=%s" % FIREBASE_API_KEY} |
||
239 | body = json.dumps({"registration_ids": reg_ids}) |
||
240 | self.logger.debug("!! post_fire_message %s", body) |
||
241 | r = HTTPRequest(FIREBASE_URL, method="POST", headers=headers, body=body) |
||
242 | self.http_client.fetch(r, callback=on_reply) |
||
243 | |||
244 | def isGiphy(self, content): |
||
245 | if GIPHY_API_KEY is not None: |
||
246 | giphy_match = re.search(GIPHY_REGEX, content) |
||
247 | return giphy_match.group(1) if giphy_match is not None else None |
||
248 | |||
249 | def process_send_message(self, message): |
||
250 | """ |
||
251 | :type message: dict |
||
252 | """ |
||
253 | content = message.get(VarNames.CONTENT) |
||
254 | giphy_match = self.isGiphy(content) |
||
255 | def send_message(message, giphy=None): |
||
256 | raw_imgs = message.get(VarNames.IMG) |
||
257 | channel = message[VarNames.CHANNEL] |
||
258 | message_db = Message( |
||
259 | sender_id=self.user_id, |
||
260 | content=message[VarNames.CONTENT], |
||
261 | symbol=get_max_key(raw_imgs), |
||
262 | giphy=giphy |
||
263 | ) |
||
264 | message_db.room_id = channel |
||
265 | do_db(message_db.save) |
||
266 | db_images = save_images(raw_imgs, message_db.id) |
||
267 | prepared_message = self.create_send_message( |
||
268 | message_db, |
||
269 | Actions.PRINT_MESSAGE, |
||
270 | prepare_img(db_images, message_db.id) |
||
271 | ) |
||
272 | self.publish(prepared_message, channel) |
||
273 | self.notify_offline(channel) |
||
274 | if giphy_match is not None: |
||
275 | self.search_giphy(message, giphy_match, send_message) |
||
276 | else: |
||
277 | send_message(message) |
||
278 | |||
279 | def create_new_room(self, message): |
||
280 | room_name = message[VarNames.ROOM_NAME] |
||
281 | if not room_name or len(room_name) > 16: |
||
282 | raise ValidationError('Incorrect room name "{}"'.format(room_name)) |
||
283 | room = Room(name=room_name) |
||
284 | do_db(room.save) |
||
285 | RoomUsers(room_id=room.id, user_id=self.user_id).save() |
||
286 | subscribe_message = self.subscribe_room_channel_message(room.id, room_name) |
||
287 | self.publish(subscribe_message, self.channel, True) |
||
288 | |||
289 | def invite_user(self, message): |
||
290 | room_id = message[VarNames.ROOM_ID] |
||
291 | user_id = message[VarNames.USER_ID] |
||
292 | room = get_or_create_room(self.channels, room_id, user_id) |
||
293 | users_in_room = { |
||
294 | user.id: RedisPrefix.set_js_user_structure(user.username, user.sex) |
||
295 | for user in room.users.all() |
||
296 | } |
||
297 | self.publish(self.add_user_to_room(room_id, user_id, users_in_room[user_id]), room_id) |
||
298 | subscribe_message = self.invite_room_channel_message(room_id, user_id, room.name, users_in_room) |
||
299 | self.publish(subscribe_message, RedisPrefix.generate_user(user_id), True) |
||
300 | |||
301 | def respond_ping(self, message): |
||
302 | self.ws_write(self.responde_pong()) |
||
303 | |||
304 | def create_user_channel(self, message): |
||
305 | user_id = message[VarNames.USER_ID] |
||
306 | room_id = create_room(self.user_id, user_id) |
||
307 | subscribe_message = self.subscribe_direct_channel_message(room_id, user_id) |
||
308 | self.publish(subscribe_message, self.channel, True) |
||
309 | other_channel = RedisPrefix.generate_user(user_id) |
||
310 | if self.channel != other_channel: |
||
311 | self.publish(subscribe_message, other_channel, True) |
||
312 | |||
313 | def delete_channel(self, message): |
||
314 | room_id = message[VarNames.ROOM_ID] |
||
315 | if room_id not in self.channels or room_id == ALL_ROOM_ID: |
||
316 | raise ValidationError('You are not allowed to exit this room') |
||
317 | room = do_db(Room.objects.get, id=room_id) |
||
318 | if room.disabled: |
||
319 | raise ValidationError('Room is already deleted') |
||
320 | if room.name is None: # if private then disable |
||
321 | room.disabled = True |
||
322 | else: # if public -> leave the room, delete the link |
||
323 | RoomUsers.objects.filter(room_id=room.id, user_id=self.user_id).delete() |
||
324 | online = self.get_online_from_redis(room_id) |
||
325 | online.remove(self.user_id) |
||
326 | self.publish(self.room_online(online, Actions.LOGOUT, room_id), room_id) |
||
327 | room.save() |
||
328 | message = self.unsubscribe_direct_message(room_id) |
||
329 | self.publish(message, room_id, True) |
||
330 | |||
331 | |||
332 | def edit_message(self, data): |
||
333 | message_id = data[VarNames.MESSAGE_ID] |
||
334 | message = do_db(Message.objects.get, id=message_id) |
||
335 | validate_edit_message(self.user_id, message) |
||
336 | message.content = data[VarNames.CONTENT] |
||
337 | selector = Message.objects.filter(id=message_id) |
||
338 | giphy_match = self.isGiphy(data[VarNames.CONTENT]) |
||
339 | if message.content is None: |
||
340 | action = Actions.DELETE_MESSAGE |
||
341 | prep_imgs = None |
||
342 | selector.update(deleted=True) |
||
343 | elif giphy_match is not None: |
||
344 | def edit_glyphy(message, giphy): |
||
345 | do_db(selector.update, content=message.content, symbol=message.symbol, giphy=giphy) |
||
346 | message.giphy = giphy |
||
347 | self.publish(self.create_send_message(message, Actions.EDIT_MESSAGE, None), message.room_id) |
||
348 | self.search_giphy(message, giphy_match, edit_glyphy) |
||
349 | return |
||
350 | else: |
||
351 | action = Actions.EDIT_MESSAGE |
||
352 | message.giphy = None |
||
353 | prep_imgs = process_images(data.get(VarNames.IMG), message) |
||
354 | selector.update(content=message.content, symbol=message.symbol, giphy=None) |
||
355 | self.publish(self.create_send_message(message, action, prep_imgs), message.room_id) |
||
356 | |||
357 | def send_client_new_channel(self, message): |
||
358 | room_id = message[VarNames.ROOM_ID] |
||
359 | self.add_channel(room_id) |
||
360 | self.add_online_user(room_id) |
||
361 | View Code Duplication | ||
1 ignored issue
–
show
|
|||
362 | def send_client_delete_channel(self, message): |
||
363 | room_id = message[VarNames.ROOM_ID] |
||
364 | self.async_redis.unsubscribe((room_id,)) |
||
365 | self.async_redis_publisher.hdel(room_id, self.id) |
||
366 | self.channels.remove(room_id) |
||
367 | |||
368 | def process_get_messages(self, data): |
||
369 | """ |
||
370 | :type data: dict |
||
371 | """ |
||
372 | header_id = data.get(VarNames.GET_MESSAGES_HEADER_ID, None) |
||
373 | count = int(data.get(VarNames.GET_MESSAGES_COUNT, 10)) |
||
374 | room_id = data[VarNames.CHANNEL] |
||
375 | self.logger.info('!! Fetching %d messages starting from %s', count, header_id) |
||
376 | if header_id is None: |
||
377 | messages = Message.objects.filter(Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
378 | else: |
||
379 | messages = Message.objects.filter(Q(id__lt=header_id), Q(room_id=room_id), Q(deleted=False)).order_by('-pk')[:count] |
||
380 | images = do_db(get_message_images, messages) |
||
381 | response = self.get_messages(messages, room_id, images, prepare_img) |
||
382 | self.ws_write(response) |
||
383 | |||
585 | self.publish(message, user) |