| Total Complexity | 47 |
| Total Lines | 378 |
| Duplicated Lines | 0 % |
Complex classes like st2reactor.container.ProcessSensorContainer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
| 73 | class ProcessSensorContainer(object): |
||
| 74 | """ |
||
| 75 | Sensor container which runs sensors in a separate process. |
||
| 76 | """ |
||
| 77 | |||
| 78 | def __init__(self, sensors, poll_interval=5, dispatcher=None): |
||
| 79 | """ |
||
| 80 | :param sensors: A list of sensor dicts. |
||
| 81 | :type sensors: ``list`` of ``dict`` |
||
| 82 | |||
| 83 | :param poll_interval: How long to sleep between each poll for running / dead sensors. |
||
| 84 | :type poll_interval: ``float`` |
||
| 85 | """ |
||
| 86 | self._poll_interval = poll_interval |
||
| 87 | |||
| 88 | self._sensors = {} # maps sensor_id -> sensor object |
||
| 89 | self._processes = {} # maps sensor_id -> sensor process |
||
| 90 | |||
| 91 | if not dispatcher: |
||
| 92 | dispatcher = TriggerDispatcher(LOG) |
||
| 93 | self._dispatcher = dispatcher |
||
| 94 | |||
| 95 | self._stopped = False |
||
| 96 | |||
| 97 | sensors = sensors or [] |
||
| 98 | for sensor_obj in sensors: |
||
| 99 | sensor_id = self._get_sensor_id(sensor=sensor_obj) |
||
| 100 | self._sensors[sensor_id] = sensor_obj |
||
| 101 | |||
| 102 | # Stores information needed for respawning dead sensors |
||
| 103 | self._sensor_start_times = {} # maps sensor_id -> sensor start time |
||
| 104 | self._sensor_respawn_counts = defaultdict(int) # maps sensor_id -> number of respawns |
||
| 105 | |||
| 106 | # A list of all the instance variables which hold internal state information about a |
||
| 107 | # particular_sensor |
||
| 108 | # Note: We don't clear respawn counts since we want to track this through the whole life |
||
| 109 | # cycle of the container manager |
||
| 110 | self._internal_sensor_state_variables = [ |
||
| 111 | self._processes, |
||
| 112 | self._sensors, |
||
| 113 | self._sensor_start_times, |
||
| 114 | ] |
||
| 115 | |||
| 116 | def run(self): |
||
| 117 | self._run_all_sensors() |
||
| 118 | |||
| 119 | try: |
||
| 120 | while not self._stopped: |
||
| 121 | # Poll for all running processes |
||
| 122 | sensor_ids = self._sensors.keys() |
||
| 123 | |||
| 124 | if len(sensor_ids) >= 1: |
||
| 125 | LOG.debug('%d active sensor(s)' % (len(sensor_ids))) |
||
| 126 | self._poll_sensors_for_results(sensor_ids) |
||
| 127 | else: |
||
| 128 | LOG.debug('No active sensors') |
||
| 129 | |||
| 130 | eventlet.sleep(self._poll_interval) |
||
| 131 | except greenlet.GreenletExit: |
||
| 132 | # This exception is thrown when sensor container manager |
||
| 133 | # kills the thread which runs process container. Not sure |
||
| 134 | # if this is the best thing to do. |
||
| 135 | self._stopped = True |
||
| 136 | return SUCCESS_EXIT_CODE |
||
| 137 | except: |
||
| 138 | LOG.exception('Container failed to run sensors.') |
||
| 139 | self._stopped = True |
||
| 140 | return FAILURE_EXIT_CODE |
||
| 141 | |||
| 142 | self._stopped = True |
||
| 143 | LOG.error('Process container quit. It shouldn\'t.') |
||
| 144 | return SUCCESS_EXIT_CODE |
||
| 145 | |||
| 146 | def _poll_sensors_for_results(self, sensor_ids): |
||
| 147 | """ |
||
| 148 | Main loop which polls sensor for results and detects dead sensors. |
||
| 149 | """ |
||
| 150 | for sensor_id in sensor_ids: |
||
| 151 | now = int(time.time()) |
||
| 152 | |||
| 153 | process = self._processes[sensor_id] |
||
| 154 | status = process.poll() |
||
| 155 | |||
| 156 | if status is not None: |
||
| 157 | # Dead process detected |
||
| 158 | LOG.info('Process for sensor %s has exited with code %s', sensor_id, status) |
||
| 159 | |||
| 160 | sensor = self._sensors[sensor_id] |
||
| 161 | self._delete_sensor(sensor_id) |
||
| 162 | |||
| 163 | self._dispatch_trigger_for_sensor_exit(sensor=sensor, |
||
| 164 | exit_code=status) |
||
| 165 | |||
| 166 | # Try to respawn a dead process (maybe it was a simple failure which can be |
||
| 167 | # resolved with a restart) |
||
| 168 | eventlet.spawn_n(self._respawn_sensor, sensor_id=sensor_id, sensor=sensor, |
||
| 169 | exit_code=status) |
||
| 170 | else: |
||
| 171 | sensor_start_time = self._sensor_start_times[sensor_id] |
||
| 172 | sensor_respawn_count = self._sensor_respawn_counts[sensor_id] |
||
| 173 | successfuly_started = (now - sensor_start_time) >= SENSOR_SUCCESSFUL_START_THRESHOLD |
||
| 174 | |||
| 175 | if successfuly_started and sensor_respawn_count >= 1: |
||
| 176 | # Sensor has been successfully running more than threshold seconds, clear the |
||
| 177 | # respawn counter so we can try to restart the sensor if it dies later on |
||
| 178 | self._sensor_respawn_counts[sensor_id] = 0 |
||
| 179 | |||
| 180 | def running(self): |
||
| 181 | return len(self._processes) |
||
| 182 | |||
| 183 | def stopped(self): |
||
| 184 | return self._stopped |
||
| 185 | |||
| 186 | def shutdown(self, force=False): |
||
| 187 | LOG.info('Container shutting down. Invoking cleanup on sensors.') |
||
| 188 | self._stopped = True |
||
| 189 | |||
| 190 | if force: |
||
| 191 | exit_timeout = 0 |
||
| 192 | else: |
||
| 193 | exit_timeout = PROCESS_EXIT_TIMEOUT |
||
| 194 | |||
| 195 | sensor_ids = self._sensors.keys() |
||
| 196 | for sensor_id in sensor_ids: |
||
| 197 | self._stop_sensor_process(sensor_id=sensor_id, exit_timeout=exit_timeout) |
||
| 198 | |||
| 199 | LOG.info('All sensors are shut down.') |
||
| 200 | |||
| 201 | self._sensors = {} |
||
| 202 | self._processes = {} |
||
| 203 | |||
| 204 | def add_sensor(self, sensor): |
||
| 205 | """ |
||
| 206 | Add a new sensor to the container. |
||
| 207 | |||
| 208 | :type sensor: ``dict`` |
||
| 209 | """ |
||
| 210 | sensor_id = self._get_sensor_id(sensor=sensor) |
||
| 211 | |||
| 212 | if sensor_id in self._sensors: |
||
| 213 | LOG.warning('Sensor %s already exists and running.', sensor_id) |
||
| 214 | return False |
||
| 215 | |||
| 216 | self._spawn_sensor_process(sensor=sensor) |
||
| 217 | LOG.debug('Sensor %s started.', sensor_id) |
||
| 218 | self._sensors[sensor_id] = sensor |
||
| 219 | return True |
||
| 220 | |||
| 221 | def remove_sensor(self, sensor): |
||
| 222 | """ |
||
| 223 | Remove an existing sensor from the container. |
||
| 224 | |||
| 225 | :type sensor: ``dict`` |
||
| 226 | """ |
||
| 227 | sensor_id = self._get_sensor_id(sensor=sensor) |
||
| 228 | |||
| 229 | if sensor_id not in self._sensors: |
||
| 230 | LOG.warning('Sensor %s isn\'t running in this container.', sensor_id) |
||
| 231 | return False |
||
| 232 | |||
| 233 | self._stop_sensor_process(sensor_id=sensor_id) |
||
| 234 | LOG.debug('Sensor %s stopped.', sensor_id) |
||
| 235 | return True |
||
| 236 | |||
| 237 | def _run_all_sensors(self): |
||
| 238 | sensor_ids = self._sensors.keys() |
||
| 239 | |||
| 240 | for sensor_id in sensor_ids: |
||
| 241 | sensor_obj = self._sensors[sensor_id] |
||
| 242 | LOG.info('Running sensor %s', sensor_id) |
||
| 243 | |||
| 244 | try: |
||
| 245 | self._spawn_sensor_process(sensor=sensor_obj) |
||
| 246 | except Exception as e: |
||
| 247 | LOG.warning(e.message, exc_info=True) |
||
| 248 | |||
| 249 | # Disable sensor which we are unable to start |
||
| 250 | del self._sensors[sensor_id] |
||
| 251 | continue |
||
| 252 | |||
| 253 | LOG.info('Sensor %s started' % sensor_id) |
||
| 254 | |||
| 255 | def _spawn_sensor_process(self, sensor): |
||
| 256 | """ |
||
| 257 | Spawn a new process for the provided sensor. |
||
| 258 | |||
| 259 | New process uses isolated Python binary from a virtual environment |
||
| 260 | belonging to the sensor pack. |
||
| 261 | """ |
||
| 262 | sensor_id = self._get_sensor_id(sensor=sensor) |
||
| 263 | virtualenv_path = get_sandbox_virtualenv_path(pack=sensor['pack']) |
||
| 264 | python_path = get_sandbox_python_binary_path(pack=sensor['pack']) |
||
| 265 | |||
| 266 | if virtualenv_path and not os.path.isdir(virtualenv_path): |
||
| 267 | format_values = {'pack': sensor['pack'], 'virtualenv_path': virtualenv_path} |
||
| 268 | msg = PACK_VIRTUALENV_DOESNT_EXIST % format_values |
||
| 269 | raise Exception(msg) |
||
| 270 | |||
| 271 | trigger_type_refs = sensor['trigger_types'] or [] |
||
| 272 | trigger_type_refs = ','.join(trigger_type_refs) |
||
| 273 | |||
| 274 | parent_args = json.dumps(sys.argv[1:]) |
||
| 275 | |||
| 276 | args = [ |
||
| 277 | python_path, |
||
| 278 | WRAPPER_SCRIPT_PATH, |
||
| 279 | '--pack=%s' % (sensor['pack']), |
||
| 280 | '--file-path=%s' % (sensor['file_path']), |
||
| 281 | '--class-name=%s' % (sensor['class_name']), |
||
| 282 | '--trigger-type-refs=%s' % (trigger_type_refs), |
||
| 283 | '--parent-args=%s' % (parent_args) |
||
| 284 | ] |
||
| 285 | |||
| 286 | if sensor['poll_interval']: |
||
| 287 | args.append('--poll-interval=%s' % (sensor['poll_interval'])) |
||
| 288 | |||
| 289 | env = os.environ.copy() |
||
| 290 | env['PYTHONPATH'] = get_sandbox_python_path(inherit_from_parent=True, |
||
| 291 | inherit_parent_virtualenv=True) |
||
| 292 | |||
| 293 | # Include full api URL and API token specific to that sensor |
||
| 294 | ttl = (24 * 60 * 60) |
||
| 295 | temporary_token = create_token(username='sensors_container', ttl=ttl) |
||
| 296 | |||
| 297 | env[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url() |
||
| 298 | env[AUTH_TOKEN_ENV_VARIABLE_NAME] = temporary_token.token |
||
| 299 | |||
| 300 | # TODO 1: Purge temporary token when service stops or sensor process dies |
||
| 301 | # TODO 2: Store metadata (wrapper process id) with the token and delete |
||
| 302 | # tokens for old, dead processes on startup |
||
| 303 | cmd = ' '.join(args) |
||
| 304 | LOG.debug('Running sensor subprocess (cmd="%s")', cmd) |
||
| 305 | |||
| 306 | # TODO: Intercept stdout and stderr for aggregated logging purposes |
||
| 307 | try: |
||
| 308 | process = subprocess.Popen(args=args, stdin=None, stdout=None, |
||
| 309 | stderr=None, shell=False, env=env, |
||
| 310 | preexec_fn=on_parent_exit('SIGTERM')) |
||
| 311 | except Exception as e: |
||
| 312 | cmd = ' '.join(args) |
||
| 313 | message = ('Failed to spawn process for sensor %s ("%s"): %s' % |
||
| 314 | (sensor_id, cmd, str(e))) |
||
| 315 | raise Exception(message) |
||
| 316 | |||
| 317 | self._processes[sensor_id] = process |
||
| 318 | self._sensors[sensor_id] = sensor |
||
| 319 | self._sensor_start_times[sensor_id] = int(time.time()) |
||
| 320 | |||
| 321 | self._dispatch_trigger_for_sensor_spawn(sensor=sensor, process=process, cmd=cmd) |
||
| 322 | |||
| 323 | return process |
||
| 324 | |||
| 325 | def _stop_sensor_process(self, sensor_id, exit_timeout=PROCESS_EXIT_TIMEOUT): |
||
| 326 | """ |
||
| 327 | Stop a sensor process for the provided sensor. |
||
| 328 | |||
| 329 | :param sensor_id: Sensor ID. |
||
| 330 | :type sensor_id: ``str`` |
||
| 331 | |||
| 332 | :param exit_timeout: How long to wait for process to exit after |
||
| 333 | sending SIGTERM signal. If the process doesn't |
||
| 334 | exit in this amount of seconds, SIGKILL signal |
||
| 335 | will be sent to the process. |
||
| 336 | :type exit__timeout: ``int`` |
||
| 337 | """ |
||
| 338 | process = self._processes[sensor_id] |
||
| 339 | |||
| 340 | # Delete sensor before terminating process so that it will not be |
||
| 341 | # respawned during termination |
||
| 342 | self._delete_sensor(sensor_id) |
||
| 343 | |||
| 344 | # Terminate the process and wait for up to stop_timeout seconds for the |
||
| 345 | # process to exit |
||
| 346 | process.terminate() |
||
| 347 | |||
| 348 | timeout = 0 |
||
| 349 | sleep_delay = 1 |
||
| 350 | while timeout < exit_timeout: |
||
| 351 | status = process.poll() |
||
| 352 | |||
| 353 | if status is not None: |
||
| 354 | # Process has exited |
||
| 355 | break |
||
| 356 | |||
| 357 | timeout += sleep_delay |
||
| 358 | time.sleep(sleep_delay) |
||
| 359 | |||
| 360 | if status is None: |
||
| 361 | # Process hasn't exited yet, forcefully kill it |
||
| 362 | process.kill() |
||
| 363 | |||
| 364 | def _respawn_sensor(self, sensor_id, sensor, exit_code): |
||
| 365 | """ |
||
| 366 | Method for respawning a sensor which died with a non-zero exit code. |
||
| 367 | """ |
||
| 368 | extra = {'sensor_id': sensor_id, 'sensor': sensor} |
||
| 369 | |||
| 370 | if self._stopped: |
||
| 371 | LOG.debug('Stopped, not respawning a dead sensor', extra=extra) |
||
| 372 | return |
||
| 373 | |||
| 374 | should_respawn = self._should_respawn_sensor(sensor_id=sensor_id, sensor=sensor, |
||
| 375 | exit_code=exit_code) |
||
| 376 | |||
| 377 | if not should_respawn: |
||
| 378 | LOG.debug('Not respawning a dead sensor', extra=extra) |
||
| 379 | return |
||
| 380 | |||
| 381 | LOG.debug('Respawning dead sensor', extra=extra) |
||
| 382 | |||
| 383 | self._sensor_respawn_counts[sensor_id] += 1 |
||
| 384 | sleep_delay = (SENSOR_RESPAWN_DELAY * self._sensor_respawn_counts[sensor_id]) |
||
| 385 | eventlet.sleep(sleep_delay) |
||
| 386 | |||
| 387 | try: |
||
| 388 | self._spawn_sensor_process(sensor=sensor) |
||
| 389 | except Exception as e: |
||
| 390 | LOG.warning(e.message, exc_info=True) |
||
| 391 | |||
| 392 | # Disable sensor which we are unable to start |
||
| 393 | del self._sensors[sensor_id] |
||
| 394 | |||
| 395 | def _should_respawn_sensor(self, sensor_id, sensor, exit_code): |
||
| 396 | """ |
||
| 397 | Return True if the provided sensor should be respawned, False otherwise. |
||
| 398 | """ |
||
| 399 | if exit_code == 0: |
||
| 400 | # We only try to respawn sensors which exited with non-zero status code |
||
| 401 | return False |
||
| 402 | |||
| 403 | respawn_count = self._sensor_respawn_counts[sensor_id] |
||
| 404 | if respawn_count >= SENSOR_MAX_RESPAWN_COUNTS: |
||
| 405 | LOG.debug('Sensor has already been respawned max times, giving up') |
||
| 406 | return False |
||
| 407 | |||
| 408 | return True |
||
| 409 | |||
| 410 | def _get_sensor_id(self, sensor): |
||
| 411 | """ |
||
| 412 | Return unique identifier for the provider sensor dict. |
||
| 413 | |||
| 414 | :type sensor: ``dict`` |
||
| 415 | """ |
||
| 416 | sensor_id = sensor['ref'] |
||
| 417 | return sensor_id |
||
| 418 | |||
| 419 | def _dispatch_trigger_for_sensor_spawn(self, sensor, process, cmd): |
||
| 420 | trigger = ResourceReference.to_string_reference( |
||
| 421 | name=SENSOR_SPAWN_TRIGGER['name'], |
||
| 422 | pack=SENSOR_SPAWN_TRIGGER['pack']) |
||
| 423 | now = int(time.time()) |
||
| 424 | payload = { |
||
| 425 | 'id': sensor['class_name'], |
||
| 426 | 'timestamp': now, |
||
| 427 | 'pid': process.pid, |
||
| 428 | 'cmd': cmd |
||
| 429 | } |
||
| 430 | self._dispatcher.dispatch(trigger, payload=payload) |
||
| 431 | |||
| 432 | def _dispatch_trigger_for_sensor_exit(self, sensor, exit_code): |
||
| 433 | trigger = ResourceReference.to_string_reference( |
||
| 434 | name=SENSOR_EXIT_TRIGGER['name'], |
||
| 435 | pack=SENSOR_EXIT_TRIGGER['pack']) |
||
| 436 | now = int(time.time()) |
||
| 437 | payload = { |
||
| 438 | 'id': sensor['class_name'], |
||
| 439 | 'timestamp': now, |
||
| 440 | 'exit_code': exit_code |
||
| 441 | } |
||
| 442 | self._dispatcher.dispatch(trigger, payload=payload) |
||
| 443 | |||
| 444 | def _delete_sensor(self, sensor_id): |
||
| 445 | """ |
||
| 446 | Delete / reset all the internal state about a particular sensor. |
||
| 447 | """ |
||
| 448 | for var in self._internal_sensor_state_variables: |
||
| 449 | if sensor_id in var: |
||
| 450 | del var[sensor_id] |
||
| 451 |