| Total Complexity | 44 |
| Total Lines | 218 |
| Duplicated Lines | 0 % |
Complex classes like SlackSensor often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | import json |
||
| 21 | |||
| 22 | class SlackSensor(PollingSensor): |
||
| 23 | DATASTORE_KEY_NAME = 'last_message_timestamp' |
||
| 24 | |||
| 25 | def __init__(self, sensor_service, config=None, poll_interval=None): |
||
| 26 | super(SlackSensor, self).__init__(sensor_service=sensor_service, |
||
| 27 | config=config, |
||
| 28 | poll_interval=poll_interval) |
||
| 29 | self._logger = self._sensor_service.get_logger(__name__) |
||
| 30 | self._token = self._config['sensor']['token'] |
||
| 31 | self._strip_formatting = self._config['sensor'].get('strip_formatting', |
||
| 32 | False) |
||
| 33 | self._handlers = { |
||
| 34 | 'message': self._handle_message_ignore_errors, |
||
| 35 | } |
||
| 36 | |||
| 37 | self._user_info_cache = {} |
||
| 38 | self._channel_info_cache = {} |
||
| 39 | self._group_info_cache = {} |
||
| 40 | |||
| 41 | self._last_message_timestamp = None |
||
| 42 | |||
| 43 | def setup(self): |
||
| 44 | self._client = SlackClient(self._token) |
||
| 45 | data = self._client.rtm_connect() |
||
| 46 | |||
| 47 | if not data: |
||
| 48 | msg = 'Failed to connect to the Slack API. Invalid token?' |
||
| 49 | raise Exception(msg) |
||
| 50 | |||
| 51 | self._populate_cache(user_data=self._api_call('users.list'), |
||
| 52 | channel_data=self._api_call('channels.list'), |
||
| 53 | group_data=self._api_call('groups.list'),) |
||
| 54 | |||
| 55 | def poll(self): |
||
| 56 | result = self._client.rtm_read() |
||
| 57 | |||
| 58 | if not result: |
||
| 59 | return |
||
| 60 | |||
| 61 | last_message_timestamp = self._handle_result(result=result) |
||
| 62 | |||
| 63 | if last_message_timestamp: |
||
| 64 | self._set_last_message_timestamp( |
||
| 65 | last_message_timestamp=last_message_timestamp) |
||
| 66 | |||
| 67 | def cleanup(self): |
||
| 68 | pass |
||
| 69 | |||
| 70 | def add_trigger(self, trigger): |
||
| 71 | pass |
||
| 72 | |||
| 73 | def update_trigger(self, trigger): |
||
| 74 | pass |
||
| 75 | |||
| 76 | def remove_trigger(self, trigger): |
||
| 77 | pass |
||
| 78 | |||
| 79 | def _get_last_message_timestamp(self): |
||
| 80 | """ |
||
| 81 | :rtype: ``int`` |
||
| 82 | """ |
||
| 83 | if not self._last_message_timestamp: |
||
| 84 | name = self.DATASTORE_KEY_NAME |
||
| 85 | value = self._sensor_service.get_value(name=name) |
||
| 86 | self._last_message_timestamp = int(value) if value else 0 |
||
| 87 | |||
| 88 | return self._last_message_timestamp |
||
| 89 | |||
| 90 | def _set_last_message_timestamp(self, last_message_timestamp): |
||
| 91 | self._last_message_timestamp = last_message_timestamp |
||
| 92 | name = self.DATASTORE_KEY_NAME |
||
| 93 | value = str(last_message_timestamp) |
||
| 94 | self._sensor_service.set_value(name=name, value=value) |
||
| 95 | return last_message_timestamp |
||
| 96 | |||
| 97 | def _populate_cache(self, user_data, channel_data, group_data): |
||
| 98 | """ |
||
| 99 | Populate users, channels and group cache from info which is returned on |
||
| 100 | rtm.start |
||
| 101 | """ |
||
| 102 | |||
| 103 | for user in user_data.get('members', []): |
||
| 104 | self._user_info_cache[user['id']] = user |
||
| 105 | |||
| 106 | for channel in channel_data.get('channels', []): |
||
| 107 | self._channel_info_cache[channel['id']] = channel |
||
| 108 | |||
| 109 | for group in group_data.get('groups', []): |
||
| 110 | self._group_info_cache[group['id']] = group |
||
| 111 | |||
| 112 | def _handle_result(self, result): |
||
| 113 | """ |
||
| 114 | Handle / process the result and return timestamp of the last message. |
||
| 115 | """ |
||
| 116 | existing_last_message_timestamp = self._get_last_message_timestamp() |
||
| 117 | new_last_message_timestamp = existing_last_message_timestamp |
||
| 118 | |||
| 119 | for item in result: |
||
| 120 | item_type = item['type'] |
||
| 121 | item_timestamp = int(item.get('ts', 0)) |
||
| 122 | |||
| 123 | if (existing_last_message_timestamp and |
||
| 124 | item_timestamp <= existing_last_message_timestamp): |
||
| 125 | # We have already seen this message, skip it |
||
| 126 | continue |
||
| 127 | |||
| 128 | if item_timestamp > new_last_message_timestamp: |
||
| 129 | new_last_message_timestamp = item_timestamp |
||
| 130 | |||
| 131 | handler_func = self._handlers.get(item_type, lambda data: data) |
||
| 132 | handler_func(data=item) |
||
| 133 | |||
| 134 | return new_last_message_timestamp |
||
| 135 | |||
| 136 | def _handle_message(self, data): |
||
| 137 | trigger = 'slack.message' |
||
| 138 | event_type = data['type'] |
||
| 139 | |||
| 140 | if event_type not in EVENT_TYPE_WHITELIST or 'subtype' in data: |
||
| 141 | # Skip unsupported event |
||
| 142 | return |
||
| 143 | |||
| 144 | # Note: We resolve user and channel information to provide more context |
||
| 145 | user_info = self._get_user_info(user_id=data['user']) |
||
| 146 | channel_info = None |
||
| 147 | channel_id = data.get('channel', '') |
||
| 148 | # Grabbing info based on the type of channel the message is in. |
||
| 149 | if channel_id.startswith('C'): |
||
| 150 | channel_info = self._get_channel_info(channel_id=channel_id) |
||
| 151 | elif channel_id.startswith('G'): |
||
| 152 | channel_info = self._get_group_info(group_id=channel_id) |
||
| 153 | |||
| 154 | if not user_info or not channel_info: |
||
| 155 | # Deleted user or channel |
||
| 156 | return |
||
| 157 | |||
| 158 | # Removes formatting from messages if enabled by the user in config |
||
| 159 | if self._strip_formatting: |
||
| 160 | text = re.sub("<http.*[|](.*)>", "\\1", data['text']) |
||
| 161 | else: |
||
| 162 | text = data['text'] |
||
| 163 | |||
| 164 | payload = { |
||
| 165 | 'user': { |
||
| 166 | 'id': user_info['id'], |
||
| 167 | 'name': user_info['name'], |
||
| 168 | 'first_name': user_info['profile'].get('first_name', |
||
| 169 | 'Unknown'), |
||
| 170 | 'last_name': user_info['profile'].get('last_name', |
||
| 171 | 'Unknown'), |
||
| 172 | 'real_name': user_info['profile'].get('real_name', |
||
| 173 | 'Unknown'), |
||
| 174 | 'is_admin': user_info['is_admin'], |
||
| 175 | 'is_owner': user_info['is_owner'] |
||
| 176 | }, |
||
| 177 | 'channel': { |
||
| 178 | 'id': channel_info['id'], |
||
| 179 | 'name': channel_info['name'], |
||
| 180 | 'topic': channel_info['topic']['value'], |
||
| 181 | 'is_group': channel_info.get('is_group', False), |
||
| 182 | }, |
||
| 183 | 'timestamp': int(string.replace(data['ts'], '.', ''))), |
||
| 184 | 'timestamp_raw': data['ts'], |
||
| 185 | 'text': text |
||
| 186 | } |
||
| 187 | |||
| 188 | self._sensor_service.dispatch(trigger=trigger, payload=payload) |
||
| 189 | |||
| 190 | def _handle_message_ignore_errors(self, data): |
||
| 191 | try: |
||
| 192 | self._handle_message(data) |
||
| 193 | except Exception as exc: |
||
| 194 | self._logger.info("Slack sensor encountered an error " |
||
| 195 | "handling message: %s" % exc) |
||
| 196 | pass |
||
| 197 | |||
| 198 | def _get_user_info(self, user_id): |
||
| 199 | if user_id not in self._user_info_cache: |
||
| 200 | result = self._api_call('users.info', user=user_id) |
||
| 201 | |||
| 202 | if 'user' not in result: |
||
| 203 | # User doesn't exist or other error |
||
| 204 | return None |
||
| 205 | |||
| 206 | result = result['user'] |
||
| 207 | self._user_info_cache[user_id] = result |
||
| 208 | |||
| 209 | return self._user_info_cache[user_id] |
||
| 210 | |||
| 211 | def _get_channel_info(self, channel_id): |
||
| 212 | if channel_id not in self._channel_info_cache: |
||
| 213 | result = self._api_call('channels.info', channel=channel_id) |
||
| 214 | |||
| 215 | if 'channel' not in result: |
||
| 216 | # Channel doesn't exist or other error |
||
| 217 | return None |
||
| 218 | |||
| 219 | result = result['channel'] |
||
| 220 | self._channel_info_cache[channel_id] = result |
||
| 221 | |||
| 222 | return self._channel_info_cache[channel_id] |
||
| 223 | |||
| 224 | def _get_group_info(self, group_id): |
||
| 225 | if group_id not in self._group_info_cache: |
||
| 226 | result = self._api_call('groups.info', channel=group_id) |
||
| 227 | self._logger.warn('GROUP DATA: %s' % result) |
||
| 228 | if 'group' not in result: |
||
| 229 | # Group doesn't exist or other error |
||
| 230 | return None |
||
| 231 | |||
| 232 | result = result['group'] |
||
| 233 | self._group_info_cache[group_id] = result |
||
| 234 | |||
| 235 | return self._group_info_cache[group_id] |
||
| 236 | |||
| 237 | def _api_call(self, method, **kwargs): |
||
| 238 | result = self._client.api_call(method, **kwargs) |
||
| 239 | result = json.loads(result) |
||
| 241 |