| Total Complexity | 47 |
| Total Lines | 245 |
| Duplicated Lines | 0 % |
Complex classes like SlackSensor often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | import json |
||
| 21 | class SlackSensor(PollingSensor): |
||
| 22 | DATASTORE_KEY_NAME = 'last_message_timestamp' |
||
| 23 | |||
| 24 | def __init__(self, sensor_service, config=None, poll_interval=None): |
||
| 25 | super(SlackSensor, self).__init__(sensor_service=sensor_service, |
||
| 26 | config=config, |
||
| 27 | poll_interval=poll_interval) |
||
| 28 | self._logger = self._sensor_service.get_logger(__name__) |
||
| 29 | self._token = self._config['sensor']['token'] |
||
| 30 | self._strip_formatting = self._config['sensor'].get('strip_formatting', |
||
| 31 | False) |
||
| 32 | self._handlers = { |
||
| 33 | 'message': self._handle_message_ignore_errors, |
||
| 34 | } |
||
| 35 | self._message_subtype_handlers = { |
||
| 36 | 'channel_join': self._handle_message_channel_join, |
||
| 37 | } |
||
| 38 | |||
| 39 | self._user_info_cache = {} |
||
| 40 | self._channel_info_cache = {} |
||
| 41 | self._group_info_cache = {} |
||
| 42 | |||
| 43 | self._last_message_timestamp = None |
||
| 44 | |||
| 45 | def setup(self): |
||
| 46 | self._client = SlackClient(self._token) |
||
| 47 | data = self._client.rtm_connect() |
||
| 48 | |||
| 49 | if not data: |
||
| 50 | msg = 'Failed to connect to the Slack API. Invalid token?' |
||
| 51 | raise Exception(msg) |
||
| 52 | |||
| 53 | self._populate_cache(user_data=self._api_call('users.list'), |
||
| 54 | channel_data=self._api_call('channels.list'), |
||
| 55 | group_data=self._api_call('groups.list'),) |
||
| 56 | |||
| 57 | def poll(self): |
||
| 58 | result = self._client.rtm_read() |
||
| 59 | |||
| 60 | if not result: |
||
| 61 | return |
||
| 62 | |||
| 63 | last_message_timestamp = self._handle_result(result=result) |
||
| 64 | |||
| 65 | if last_message_timestamp: |
||
| 66 | self._set_last_message_timestamp( |
||
| 67 | last_message_timestamp=last_message_timestamp) |
||
| 68 | |||
| 69 | def cleanup(self): |
||
| 70 | pass |
||
| 71 | |||
| 72 | def add_trigger(self, trigger): |
||
| 73 | pass |
||
| 74 | |||
| 75 | def update_trigger(self, trigger): |
||
| 76 | pass |
||
| 77 | |||
| 78 | def remove_trigger(self, trigger): |
||
| 79 | pass |
||
| 80 | |||
| 81 | def _get_last_message_timestamp(self): |
||
| 82 | """ |
||
| 83 | :rtype: ``int`` |
||
| 84 | """ |
||
| 85 | if not self._last_message_timestamp: |
||
| 86 | name = self.DATASTORE_KEY_NAME |
||
| 87 | value = self._sensor_service.get_value(name=name) |
||
| 88 | self._last_message_timestamp = int(value) if value else 0 |
||
| 89 | |||
| 90 | return self._last_message_timestamp |
||
| 91 | |||
| 92 | def _set_last_message_timestamp(self, last_message_timestamp): |
||
| 93 | self._last_message_timestamp = last_message_timestamp |
||
| 94 | name = self.DATASTORE_KEY_NAME |
||
| 95 | value = str(last_message_timestamp) |
||
| 96 | self._sensor_service.set_value(name=name, value=value) |
||
| 97 | return last_message_timestamp |
||
| 98 | |||
| 99 | def _populate_cache(self, user_data, channel_data, group_data): |
||
| 100 | """ |
||
| 101 | Populate users, channels and group cache from info which is returned on |
||
| 102 | rtm.start |
||
| 103 | """ |
||
| 104 | |||
| 105 | for user in user_data.get('members', []): |
||
| 106 | self._user_info_cache[user['id']] = user |
||
| 107 | |||
| 108 | for channel in channel_data.get('channels', []): |
||
| 109 | self._channel_info_cache[channel['id']] = channel |
||
| 110 | |||
| 111 | for group in group_data.get('groups', []): |
||
| 112 | self._group_info_cache[group['id']] = group |
||
| 113 | |||
| 114 | def _handle_result(self, result): |
||
| 115 | """ |
||
| 116 | Handle / process the result and return timestamp of the last message. |
||
| 117 | """ |
||
| 118 | existing_last_message_timestamp = self._get_last_message_timestamp() |
||
| 119 | new_last_message_timestamp = existing_last_message_timestamp |
||
| 120 | |||
| 121 | for item in result: |
||
| 122 | item_type = item['type'] |
||
| 123 | item_timestamp = int(float(item.get('ts', 0))) |
||
| 124 | |||
| 125 | if (existing_last_message_timestamp and |
||
| 126 | item_timestamp <= existing_last_message_timestamp): |
||
| 127 | # We have already seen this message, skip it |
||
| 128 | continue |
||
| 129 | |||
| 130 | if item_timestamp > new_last_message_timestamp: |
||
| 131 | new_last_message_timestamp = item_timestamp |
||
| 132 | |||
| 133 | handler_func = self._handlers.get(item_type, lambda data: data) |
||
| 134 | handler_func(data=item) |
||
| 135 | |||
| 136 | return new_last_message_timestamp |
||
| 137 | |||
| 138 | def _handle_message(self, data): |
||
| 139 | trigger = 'slack.message' |
||
| 140 | event_type = data['type'] |
||
| 141 | |||
| 142 | if event_type not in EVENT_TYPE_WHITELIST or 'subtype' in data: |
||
| 143 | # Skip unsupported event |
||
| 144 | return |
||
| 145 | |||
| 146 | subtype = data.get('subtype', None) |
||
| 147 | if subtype in self._message_subtype_handlers: |
||
| 148 | subtype = data['subtype'] |
||
| 149 | handler = self._message_subtype_handlers.get(subtype) |
||
| 150 | handler(data) |
||
| 151 | else: |
||
| 152 | # Note: We resolve user and channel information to provide more context |
||
| 153 | user_info = self._get_user_info(user_id=data['user']) |
||
| 154 | channel_info = None |
||
| 155 | channel_id = data.get('channel', '') |
||
| 156 | # Grabbing info based on the type of channel the message is in. |
||
| 157 | if channel_id.startswith('C'): |
||
| 158 | channel_info = self._get_channel_info(channel_id=channel_id) |
||
| 159 | elif channel_id.startswith('G'): |
||
| 160 | channel_info = self._get_group_info(group_id=channel_id) |
||
| 161 | |||
| 162 | if not user_info or not channel_info: |
||
| 163 | # Deleted user or channel |
||
| 164 | return |
||
| 165 | |||
| 166 | # Removes formatting from messages if enabled by the user in config |
||
| 167 | if self._strip_formatting: |
||
| 168 | text = re.sub("<http.*[|](.*)>", "\\1", data['text']) |
||
| 169 | else: |
||
| 170 | text = data['text'] |
||
| 171 | |||
| 172 | common_payload = self._get_message_common_payload(data) |
||
| 173 | channel_payload = { |
||
| 174 | 'channel': { |
||
| 175 | 'id': channel_info['id'], |
||
| 176 | 'name': channel_info['name'], |
||
| 177 | 'topic': channel_info['topic']['value'], |
||
| 178 | 'is_group': channel_info.get('is_group', False), |
||
| 179 | }, |
||
| 180 | } |
||
| 181 | payload = common_payload.copy() |
||
| 182 | payload.update(channel_payload) |
||
| 183 | |||
| 184 | self._sensor_service.dispatch(trigger=trigger, payload=payload) |
||
| 185 | |||
| 186 | def _handle_message_ignore_errors(self, data): |
||
| 187 | try: |
||
| 188 | self._handle_message(data) |
||
| 189 | except Exception as exc: |
||
| 190 | self._logger.info("Slack sensor encountered an error " |
||
| 191 | "handling message: %s" % exc) |
||
| 192 | pass |
||
| 193 | |||
| 194 | def _get_user_info(self, user_id): |
||
| 195 | if user_id not in self._user_info_cache: |
||
| 196 | result = self._api_call('users.info', user=user_id) |
||
| 197 | |||
| 198 | if 'user' not in result: |
||
| 199 | # User doesn't exist or other error |
||
| 200 | return None |
||
| 201 | |||
| 202 | result = result['user'] |
||
| 203 | self._user_info_cache[user_id] = result |
||
| 204 | |||
| 205 | return self._user_info_cache[user_id] |
||
| 206 | |||
| 207 | def _get_channel_info(self, channel_id): |
||
| 208 | if channel_id not in self._channel_info_cache: |
||
| 209 | result = self._api_call('channels.info', channel=channel_id) |
||
| 210 | |||
| 211 | if 'channel' not in result: |
||
| 212 | # Channel doesn't exist or other error |
||
| 213 | return None |
||
| 214 | |||
| 215 | result = result['channel'] |
||
| 216 | self._channel_info_cache[channel_id] = result |
||
| 217 | |||
| 218 | return self._channel_info_cache[channel_id] |
||
| 219 | |||
| 220 | def _get_group_info(self, group_id): |
||
| 221 | if group_id not in self._group_info_cache: |
||
| 222 | result = self._api_call('groups.info', channel=group_id) |
||
| 223 | self._logger.warn('GROUP DATA: %s' % result) |
||
| 224 | if 'group' not in result: |
||
| 225 | # Group doesn't exist or other error |
||
| 226 | return None |
||
| 227 | |||
| 228 | result = result['group'] |
||
| 229 | self._group_info_cache[group_id] = result |
||
| 230 | |||
| 231 | return self._group_info_cache[group_id] |
||
| 232 | |||
| 233 | def _api_call(self, method, **kwargs): |
||
| 234 | result = self._client.api_call(method, **kwargs) |
||
| 235 | result = json.loads(result) |
||
| 236 | return result |
||
| 237 | |||
| 238 | def _handle_message_channel_join(self, data): |
||
| 239 | trigger = 'slack.message_channel_join' |
||
| 240 | user_info = self._get_user_info(user_id=data['user']) |
||
| 241 | payload = self._get_message_common_payload(data) |
||
| 242 | |||
| 243 | self._sensor_service.dispatch(trigger=trigger, payload=payload) |
||
| 244 | |||
| 245 | def _get_message_common_payload(self, data): |
||
| 246 | user_info = self._get_user_info(user_id=data['user']) |
||
| 247 | payload = { |
||
| 248 | 'user': { |
||
| 249 | 'id': user_info['id'], |
||
| 250 | 'name': user_info['name'], |
||
| 251 | 'first_name': user_info['profile'].get('first_name', |
||
| 252 | 'Unknown'), |
||
| 253 | 'last_name': user_info['profile'].get('last_name', |
||
| 254 | 'Unknown'), |
||
| 255 | 'real_name': user_info['profile'].get('real_name', |
||
| 256 | 'Unknown'), |
||
| 257 | 'is_admin': user_info['is_admin'], |
||
| 258 | 'is_owner': user_info['is_owner'] |
||
| 259 | }, |
||
| 260 | 'timestamp': int(float(data['ts'])), |
||
| 261 | 'timestamp_raw': data['ts'], |
||
| 262 | 'text': text |
||
| 263 | } |
||
| 264 | |||
| 265 | return payload |
||
| 266 |