| Total Complexity | 47 | 
| Total Lines | 378 | 
| Duplicated Lines | 0 % | 
Complex classes like st2reactor.container.ProcessSensorContainer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more | 
            ||
| 73 | class ProcessSensorContainer(object):  | 
            ||
| 74 | """  | 
            ||
| 75 | Sensor container which runs sensors in a separate process.  | 
            ||
| 76 | """  | 
            ||
| 77 | |||
| 78 | def __init__(self, sensors, poll_interval=5, dispatcher=None):  | 
            ||
| 79 | """  | 
            ||
| 80 | :param sensors: A list of sensor dicts.  | 
            ||
| 81 | :type sensors: ``list`` of ``dict``  | 
            ||
| 82 | |||
| 83 | :param poll_interval: How long to sleep between each poll for running / dead sensors.  | 
            ||
| 84 | :type poll_interval: ``float``  | 
            ||
| 85 | """  | 
            ||
| 86 | self._poll_interval = poll_interval  | 
            ||
| 87 | |||
| 88 |         self._sensors = {}  # maps sensor_id -> sensor object | 
            ||
| 89 |         self._processes = {}  # maps sensor_id -> sensor process | 
            ||
| 90 | |||
| 91 | if not dispatcher:  | 
            ||
| 92 | dispatcher = TriggerDispatcher(LOG)  | 
            ||
| 93 | self._dispatcher = dispatcher  | 
            ||
| 94 | |||
| 95 | self._stopped = False  | 
            ||
| 96 | |||
| 97 | sensors = sensors or []  | 
            ||
| 98 | for sensor_obj in sensors:  | 
            ||
| 99 | sensor_id = self._get_sensor_id(sensor=sensor_obj)  | 
            ||
| 100 | self._sensors[sensor_id] = sensor_obj  | 
            ||
| 101 | |||
| 102 | # Stores information needed for respawning dead sensors  | 
            ||
| 103 |         self._sensor_start_times = {}  # maps sensor_id -> sensor start time | 
            ||
| 104 | self._sensor_respawn_counts = defaultdict(int) # maps sensor_id -> number of respawns  | 
            ||
| 105 | |||
| 106 | # A list of all the instance variables which hold internal state information about a  | 
            ||
| 107 | # particular_sensor  | 
            ||
| 108 | # Note: We don't clear respawn counts since we want to track this through the whole life  | 
            ||
| 109 | # cycle of the container manager  | 
            ||
| 110 | self._internal_sensor_state_variables = [  | 
            ||
| 111 | self._processes,  | 
            ||
| 112 | self._sensors,  | 
            ||
| 113 | self._sensor_start_times,  | 
            ||
| 114 | ]  | 
            ||
| 115 | |||
| 116 | def run(self):  | 
            ||
| 117 | self._run_all_sensors()  | 
            ||
| 118 | |||
| 119 | try:  | 
            ||
| 120 | while not self._stopped:  | 
            ||
| 121 | # Poll for all running processes  | 
            ||
| 122 | sensor_ids = self._sensors.keys()  | 
            ||
| 123 | |||
| 124 | if len(sensor_ids) >= 1:  | 
            ||
| 125 |                     LOG.debug('%d active sensor(s)' % (len(sensor_ids))) | 
            ||
| 126 | self._poll_sensors_for_results(sensor_ids)  | 
            ||
| 127 | else:  | 
            ||
| 128 |                     LOG.debug('No active sensors') | 
            ||
| 129 | |||
| 130 | eventlet.sleep(self._poll_interval)  | 
            ||
| 131 | except greenlet.GreenletExit:  | 
            ||
| 132 | # This exception is thrown when sensor container manager  | 
            ||
| 133 | # kills the thread which runs process container. Not sure  | 
            ||
| 134 | # if this is the best thing to do.  | 
            ||
| 135 | self._stopped = True  | 
            ||
| 136 | return SUCCESS_EXIT_CODE  | 
            ||
| 137 | except:  | 
            ||
| 138 |             LOG.exception('Container failed to run sensors.') | 
            ||
| 139 | self._stopped = True  | 
            ||
| 140 | return FAILURE_EXIT_CODE  | 
            ||
| 141 | |||
| 142 | self._stopped = True  | 
            ||
| 143 |         LOG.error('Process container quit. It shouldn\'t.') | 
            ||
| 144 | return SUCCESS_EXIT_CODE  | 
            ||
| 145 | |||
| 146 | def _poll_sensors_for_results(self, sensor_ids):  | 
            ||
| 147 | """  | 
            ||
| 148 | Main loop which polls sensor for results and detects dead sensors.  | 
            ||
| 149 | """  | 
            ||
| 150 | for sensor_id in sensor_ids:  | 
            ||
| 151 | now = int(time.time())  | 
            ||
| 152 | |||
| 153 | process = self._processes[sensor_id]  | 
            ||
| 154 | status = process.poll()  | 
            ||
| 155 | |||
| 156 | if status is not None:  | 
            ||
| 157 | # Dead process detected  | 
            ||
| 158 |                 LOG.info('Process for sensor %s has exited with code %s', sensor_id, status) | 
            ||
| 159 | |||
| 160 | sensor = self._sensors[sensor_id]  | 
            ||
| 161 | self._delete_sensor(sensor_id)  | 
            ||
| 162 | |||
| 163 | self._dispatch_trigger_for_sensor_exit(sensor=sensor,  | 
            ||
| 164 | exit_code=status)  | 
            ||
| 165 | |||
| 166 | # Try to respawn a dead process (maybe it was a simple failure which can be  | 
            ||
| 167 | # resolved with a restart)  | 
            ||
| 168 | eventlet.spawn_n(self._respawn_sensor, sensor_id=sensor_id, sensor=sensor,  | 
            ||
| 169 | exit_code=status)  | 
            ||
| 170 | else:  | 
            ||
| 171 | sensor_start_time = self._sensor_start_times[sensor_id]  | 
            ||
| 172 | sensor_respawn_count = self._sensor_respawn_counts[sensor_id]  | 
            ||
| 173 | successfuly_started = (now - sensor_start_time) >= SENSOR_SUCCESSFUL_START_THRESHOLD  | 
            ||
| 174 | |||
| 175 | if successfuly_started and sensor_respawn_count >= 1:  | 
            ||
| 176 | # Sensor has been successfully running more than threshold seconds, clear the  | 
            ||
| 177 | # respawn counter so we can try to restart the sensor if it dies later on  | 
            ||
| 178 | self._sensor_respawn_counts[sensor_id] = 0  | 
            ||
| 179 | |||
| 180 | def running(self):  | 
            ||
| 181 | return len(self._processes)  | 
            ||
| 182 | |||
| 183 | def stopped(self):  | 
            ||
| 184 | return self._stopped  | 
            ||
| 185 | |||
| 186 | def shutdown(self, force=False):  | 
            ||
| 187 |         LOG.info('Container shutting down. Invoking cleanup on sensors.') | 
            ||
| 188 | self._stopped = True  | 
            ||
| 189 | |||
| 190 | if force:  | 
            ||
| 191 | exit_timeout = 0  | 
            ||
| 192 | else:  | 
            ||
| 193 | exit_timeout = PROCESS_EXIT_TIMEOUT  | 
            ||
| 194 | |||
| 195 | sensor_ids = self._sensors.keys()  | 
            ||
| 196 | for sensor_id in sensor_ids:  | 
            ||
| 197 | self._stop_sensor_process(sensor_id=sensor_id, exit_timeout=exit_timeout)  | 
            ||
| 198 | |||
| 199 |         LOG.info('All sensors are shut down.') | 
            ||
| 200 | |||
| 201 |         self._sensors = {} | 
            ||
| 202 |         self._processes = {} | 
            ||
| 203 | |||
| 204 | def add_sensor(self, sensor):  | 
            ||
| 205 | """  | 
            ||
| 206 | Add a new sensor to the container.  | 
            ||
| 207 | |||
| 208 | :type sensor: ``dict``  | 
            ||
| 209 | """  | 
            ||
| 210 | sensor_id = self._get_sensor_id(sensor=sensor)  | 
            ||
| 211 | |||
| 212 | if sensor_id in self._sensors:  | 
            ||
| 213 |             LOG.warning('Sensor %s already exists and running.', sensor_id) | 
            ||
| 214 | return False  | 
            ||
| 215 | |||
| 216 | self._spawn_sensor_process(sensor=sensor)  | 
            ||
| 217 |         LOG.debug('Sensor %s started.', sensor_id) | 
            ||
| 218 | self._sensors[sensor_id] = sensor  | 
            ||
| 219 | return True  | 
            ||
| 220 | |||
| 221 | def remove_sensor(self, sensor):  | 
            ||
| 222 | """  | 
            ||
| 223 | Remove an existing sensor from the container.  | 
            ||
| 224 | |||
| 225 | :type sensor: ``dict``  | 
            ||
| 226 | """  | 
            ||
| 227 | sensor_id = self._get_sensor_id(sensor=sensor)  | 
            ||
| 228 | |||
| 229 | if sensor_id not in self._sensors:  | 
            ||
| 230 |             LOG.warning('Sensor %s isn\'t running in this container.', sensor_id) | 
            ||
| 231 | return False  | 
            ||
| 232 | |||
| 233 | self._stop_sensor_process(sensor_id=sensor_id)  | 
            ||
| 234 |         LOG.debug('Sensor %s stopped.', sensor_id) | 
            ||
| 235 | return True  | 
            ||
| 236 | |||
| 237 | def _run_all_sensors(self):  | 
            ||
| 238 | sensor_ids = self._sensors.keys()  | 
            ||
| 239 | |||
| 240 | for sensor_id in sensor_ids:  | 
            ||
| 241 | sensor_obj = self._sensors[sensor_id]  | 
            ||
| 242 |             LOG.info('Running sensor %s', sensor_id) | 
            ||
| 243 | |||
| 244 | try:  | 
            ||
| 245 | self._spawn_sensor_process(sensor=sensor_obj)  | 
            ||
| 246 | except Exception as e:  | 
            ||
| 247 | LOG.warning(e.message, exc_info=True)  | 
            ||
| 248 | |||
| 249 | # Disable sensor which we are unable to start  | 
            ||
| 250 | del self._sensors[sensor_id]  | 
            ||
| 251 | continue  | 
            ||
| 252 | |||
| 253 |             LOG.info('Sensor %s started' % sensor_id) | 
            ||
| 254 | |||
| 255 | def _spawn_sensor_process(self, sensor):  | 
            ||
| 256 | """  | 
            ||
| 257 | Spawn a new process for the provided sensor.  | 
            ||
| 258 | |||
| 259 | New process uses isolated Python binary from a virtual environment  | 
            ||
| 260 | belonging to the sensor pack.  | 
            ||
| 261 | """  | 
            ||
| 262 | sensor_id = self._get_sensor_id(sensor=sensor)  | 
            ||
| 263 | virtualenv_path = get_sandbox_virtualenv_path(pack=sensor['pack'])  | 
            ||
| 264 | python_path = get_sandbox_python_binary_path(pack=sensor['pack'])  | 
            ||
| 265 | |||
| 266 | if virtualenv_path and not os.path.isdir(virtualenv_path):  | 
            ||
| 267 |             format_values = {'pack': sensor['pack'], 'virtualenv_path': virtualenv_path} | 
            ||
| 268 | msg = PACK_VIRTUALENV_DOESNT_EXIST % format_values  | 
            ||
| 269 | raise Exception(msg)  | 
            ||
| 270 | |||
| 271 | trigger_type_refs = sensor['trigger_types'] or []  | 
            ||
| 272 | trigger_type_refs = ','.join(trigger_type_refs)  | 
            ||
| 273 | |||
| 274 | parent_args = json.dumps(sys.argv[1:])  | 
            ||
| 275 | |||
| 276 | args = [  | 
            ||
| 277 | python_path,  | 
            ||
| 278 | WRAPPER_SCRIPT_PATH,  | 
            ||
| 279 | '--pack=%s' % (sensor['pack']),  | 
            ||
| 280 | '--file-path=%s' % (sensor['file_path']),  | 
            ||
| 281 | '--class-name=%s' % (sensor['class_name']),  | 
            ||
| 282 | '--trigger-type-refs=%s' % (trigger_type_refs),  | 
            ||
| 283 | '--parent-args=%s' % (parent_args)  | 
            ||
| 284 | ]  | 
            ||
| 285 | |||
| 286 | if sensor['poll_interval']:  | 
            ||
| 287 |             args.append('--poll-interval=%s' % (sensor['poll_interval'])) | 
            ||
| 288 | |||
| 289 | env = os.environ.copy()  | 
            ||
| 290 | env['PYTHONPATH'] = get_sandbox_python_path(inherit_from_parent=True,  | 
            ||
| 291 | inherit_parent_virtualenv=True)  | 
            ||
| 292 | |||
| 293 | # Include full api URL and API token specific to that sensor  | 
            ||
| 294 | ttl = (24 * 60 * 60)  | 
            ||
| 295 | temporary_token = create_token(username='sensors_container', ttl=ttl)  | 
            ||
| 296 | |||
| 297 | env[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url()  | 
            ||
| 298 | env[AUTH_TOKEN_ENV_VARIABLE_NAME] = temporary_token.token  | 
            ||
| 299 | |||
| 300 | # TODO 1: Purge temporary token when service stops or sensor process dies  | 
            ||
| 301 | # TODO 2: Store metadata (wrapper process id) with the token and delete  | 
            ||
| 302 | # tokens for old, dead processes on startup  | 
            ||
| 303 | cmd = ' '.join(args)  | 
            ||
| 304 |         LOG.debug('Running sensor subprocess (cmd="%s")', cmd) | 
            ||
| 305 | |||
| 306 | # TODO: Intercept stdout and stderr for aggregated logging purposes  | 
            ||
| 307 | try:  | 
            ||
| 308 | process = subprocess.Popen(args=args, stdin=None, stdout=None,  | 
            ||
| 309 | stderr=None, shell=False, env=env,  | 
            ||
| 310 |                                        preexec_fn=on_parent_exit('SIGTERM')) | 
            ||
| 311 | except Exception as e:  | 
            ||
| 312 | cmd = ' '.join(args)  | 
            ||
| 313 |             message = ('Failed to spawn process for sensor %s ("%s"): %s' % | 
            ||
| 314 | (sensor_id, cmd, str(e)))  | 
            ||
| 315 | raise Exception(message)  | 
            ||
| 316 | |||
| 317 | self._processes[sensor_id] = process  | 
            ||
| 318 | self._sensors[sensor_id] = sensor  | 
            ||
| 319 | self._sensor_start_times[sensor_id] = int(time.time())  | 
            ||
| 320 | |||
| 321 | self._dispatch_trigger_for_sensor_spawn(sensor=sensor, process=process, cmd=cmd)  | 
            ||
| 322 | |||
| 323 | return process  | 
            ||
| 324 | |||
| 325 | def _stop_sensor_process(self, sensor_id, exit_timeout=PROCESS_EXIT_TIMEOUT):  | 
            ||
| 326 | """  | 
            ||
| 327 | Stop a sensor process for the provided sensor.  | 
            ||
| 328 | |||
| 329 | :param sensor_id: Sensor ID.  | 
            ||
| 330 | :type sensor_id: ``str``  | 
            ||
| 331 | |||
| 332 | :param exit_timeout: How long to wait for process to exit after  | 
            ||
| 333 | sending SIGTERM signal. If the process doesn't  | 
            ||
| 334 | exit in this amount of seconds, SIGKILL signal  | 
            ||
| 335 | will be sent to the process.  | 
            ||
| 336 | :type exit__timeout: ``int``  | 
            ||
| 337 | """  | 
            ||
| 338 | process = self._processes[sensor_id]  | 
            ||
| 339 | |||
| 340 | # Delete sensor before terminating process so that it will not be  | 
            ||
| 341 | # respawned during termination  | 
            ||
| 342 | self._delete_sensor(sensor_id)  | 
            ||
| 343 | |||
| 344 | # Terminate the process and wait for up to stop_timeout seconds for the  | 
            ||
| 345 | # process to exit  | 
            ||
| 346 | process.terminate()  | 
            ||
| 347 | |||
| 348 | timeout = 0  | 
            ||
| 349 | sleep_delay = 1  | 
            ||
| 350 | while timeout < exit_timeout:  | 
            ||
| 351 | status = process.poll()  | 
            ||
| 352 | |||
| 353 | if status is not None:  | 
            ||
| 354 | # Process has exited  | 
            ||
| 355 | break  | 
            ||
| 356 | |||
| 357 | timeout += sleep_delay  | 
            ||
| 358 | time.sleep(sleep_delay)  | 
            ||
| 359 | |||
| 360 | if status is None:  | 
            ||
| 361 | # Process hasn't exited yet, forcefully kill it  | 
            ||
| 362 | process.kill()  | 
            ||
| 363 | |||
| 364 | def _respawn_sensor(self, sensor_id, sensor, exit_code):  | 
            ||
| 365 | """  | 
            ||
| 366 | Method for respawning a sensor which died with a non-zero exit code.  | 
            ||
| 367 | """  | 
            ||
| 368 |         extra = {'sensor_id': sensor_id, 'sensor': sensor} | 
            ||
| 369 | |||
| 370 | if self._stopped:  | 
            ||
| 371 |             LOG.debug('Stopped, not respawning a dead sensor', extra=extra) | 
            ||
| 372 | return  | 
            ||
| 373 | |||
| 374 | should_respawn = self._should_respawn_sensor(sensor_id=sensor_id, sensor=sensor,  | 
            ||
| 375 | exit_code=exit_code)  | 
            ||
| 376 | |||
| 377 | if not should_respawn:  | 
            ||
| 378 |             LOG.debug('Not respawning a dead sensor', extra=extra) | 
            ||
| 379 | return  | 
            ||
| 380 | |||
| 381 |         LOG.debug('Respawning dead sensor', extra=extra) | 
            ||
| 382 | |||
| 383 | self._sensor_respawn_counts[sensor_id] += 1  | 
            ||
| 384 | sleep_delay = (SENSOR_RESPAWN_DELAY * self._sensor_respawn_counts[sensor_id])  | 
            ||
| 385 | eventlet.sleep(sleep_delay)  | 
            ||
| 386 | |||
| 387 | try:  | 
            ||
| 388 | self._spawn_sensor_process(sensor=sensor)  | 
            ||
| 389 | except Exception as e:  | 
            ||
| 390 | LOG.warning(e.message, exc_info=True)  | 
            ||
| 391 | |||
| 392 | # Disable sensor which we are unable to start  | 
            ||
| 393 | del self._sensors[sensor_id]  | 
            ||
| 394 | |||
| 395 | def _should_respawn_sensor(self, sensor_id, sensor, exit_code):  | 
            ||
| 396 | """  | 
            ||
| 397 | Return True if the provided sensor should be respawned, False otherwise.  | 
            ||
| 398 | """  | 
            ||
| 399 | if exit_code == 0:  | 
            ||
| 400 | # We only try to respawn sensors which exited with non-zero status code  | 
            ||
| 401 | return False  | 
            ||
| 402 | |||
| 403 | respawn_count = self._sensor_respawn_counts[sensor_id]  | 
            ||
| 404 | if respawn_count >= SENSOR_MAX_RESPAWN_COUNTS:  | 
            ||
| 405 |             LOG.debug('Sensor has already been respawned max times, giving up') | 
            ||
| 406 | return False  | 
            ||
| 407 | |||
| 408 | return True  | 
            ||
| 409 | |||
| 410 | def _get_sensor_id(self, sensor):  | 
            ||
| 411 | """  | 
            ||
| 412 | Return unique identifier for the provider sensor dict.  | 
            ||
| 413 | |||
| 414 | :type sensor: ``dict``  | 
            ||
| 415 | """  | 
            ||
| 416 | sensor_id = sensor['ref']  | 
            ||
| 417 | return sensor_id  | 
            ||
| 418 | |||
| 419 | def _dispatch_trigger_for_sensor_spawn(self, sensor, process, cmd):  | 
            ||
| 420 | trigger = ResourceReference.to_string_reference(  | 
            ||
| 421 | name=SENSOR_SPAWN_TRIGGER['name'],  | 
            ||
| 422 | pack=SENSOR_SPAWN_TRIGGER['pack'])  | 
            ||
| 423 | now = int(time.time())  | 
            ||
| 424 |         payload = { | 
            ||
| 425 | 'id': sensor['class_name'],  | 
            ||
| 426 | 'timestamp': now,  | 
            ||
| 427 | 'pid': process.pid,  | 
            ||
| 428 | 'cmd': cmd  | 
            ||
| 429 | }  | 
            ||
| 430 | self._dispatcher.dispatch(trigger, payload=payload)  | 
            ||
| 431 | |||
| 432 | def _dispatch_trigger_for_sensor_exit(self, sensor, exit_code):  | 
            ||
| 433 | trigger = ResourceReference.to_string_reference(  | 
            ||
| 434 | name=SENSOR_EXIT_TRIGGER['name'],  | 
            ||
| 435 | pack=SENSOR_EXIT_TRIGGER['pack'])  | 
            ||
| 436 | now = int(time.time())  | 
            ||
| 437 |         payload = { | 
            ||
| 438 | 'id': sensor['class_name'],  | 
            ||
| 439 | 'timestamp': now,  | 
            ||
| 440 | 'exit_code': exit_code  | 
            ||
| 441 | }  | 
            ||
| 442 | self._dispatcher.dispatch(trigger, payload=payload)  | 
            ||
| 443 | |||
| 444 | def _delete_sensor(self, sensor_id):  | 
            ||
| 445 | """  | 
            ||
| 446 | Delete / reset all the internal state about a particular sensor.  | 
            ||
| 447 | """  | 
            ||
| 448 | for var in self._internal_sensor_state_variables:  | 
            ||
| 449 | if sensor_id in var:  | 
            ||
| 450 | del var[sensor_id]  | 
            ||
| 451 |