Total Complexity | 47 |
Total Lines | 378 |
Duplicated Lines | 0 % |
Complex classes like st2reactor.container.ProcessSensorContainer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
73 | class ProcessSensorContainer(object): |
||
74 | """ |
||
75 | Sensor container which runs sensors in a separate process. |
||
76 | """ |
||
77 | |||
78 | def __init__(self, sensors, poll_interval=5, dispatcher=None): |
||
79 | """ |
||
80 | :param sensors: A list of sensor dicts. |
||
81 | :type sensors: ``list`` of ``dict`` |
||
82 | |||
83 | :param poll_interval: How long to sleep between each poll for running / dead sensors. |
||
84 | :type poll_interval: ``float`` |
||
85 | """ |
||
86 | self._poll_interval = poll_interval |
||
87 | |||
88 | self._sensors = {} # maps sensor_id -> sensor object |
||
89 | self._processes = {} # maps sensor_id -> sensor process |
||
90 | |||
91 | if not dispatcher: |
||
92 | dispatcher = TriggerDispatcher(LOG) |
||
93 | self._dispatcher = dispatcher |
||
94 | |||
95 | self._stopped = False |
||
96 | |||
97 | sensors = sensors or [] |
||
98 | for sensor_obj in sensors: |
||
99 | sensor_id = self._get_sensor_id(sensor=sensor_obj) |
||
100 | self._sensors[sensor_id] = sensor_obj |
||
101 | |||
102 | # Stores information needed for respawning dead sensors |
||
103 | self._sensor_start_times = {} # maps sensor_id -> sensor start time |
||
104 | self._sensor_respawn_counts = defaultdict(int) # maps sensor_id -> number of respawns |
||
105 | |||
106 | # A list of all the instance variables which hold internal state information about a |
||
107 | # particular_sensor |
||
108 | # Note: We don't clear respawn counts since we want to track this through the whole life |
||
109 | # cycle of the container manager |
||
110 | self._internal_sensor_state_variables = [ |
||
111 | self._processes, |
||
112 | self._sensors, |
||
113 | self._sensor_start_times, |
||
114 | ] |
||
115 | |||
116 | def run(self): |
||
117 | self._run_all_sensors() |
||
118 | |||
119 | try: |
||
120 | while not self._stopped: |
||
121 | # Poll for all running processes |
||
122 | sensor_ids = self._sensors.keys() |
||
123 | |||
124 | if len(sensor_ids) >= 1: |
||
125 | LOG.debug('%d active sensor(s)' % (len(sensor_ids))) |
||
126 | self._poll_sensors_for_results(sensor_ids) |
||
127 | else: |
||
128 | LOG.debug('No active sensors') |
||
129 | |||
130 | eventlet.sleep(self._poll_interval) |
||
131 | except greenlet.GreenletExit: |
||
132 | # This exception is thrown when sensor container manager |
||
133 | # kills the thread which runs process container. Not sure |
||
134 | # if this is the best thing to do. |
||
135 | self._stopped = True |
||
136 | return SUCCESS_EXIT_CODE |
||
137 | except: |
||
138 | LOG.exception('Container failed to run sensors.') |
||
139 | self._stopped = True |
||
140 | return FAILURE_EXIT_CODE |
||
141 | |||
142 | self._stopped = True |
||
143 | LOG.error('Process container quit. It shouldn\'t.') |
||
144 | return SUCCESS_EXIT_CODE |
||
145 | |||
146 | def _poll_sensors_for_results(self, sensor_ids): |
||
147 | """ |
||
148 | Main loop which polls sensor for results and detects dead sensors. |
||
149 | """ |
||
150 | for sensor_id in sensor_ids: |
||
151 | now = int(time.time()) |
||
152 | |||
153 | process = self._processes[sensor_id] |
||
154 | status = process.poll() |
||
155 | |||
156 | if status is not None: |
||
157 | # Dead process detected |
||
158 | LOG.info('Process for sensor %s has exited with code %s', sensor_id, status) |
||
159 | |||
160 | sensor = self._sensors[sensor_id] |
||
161 | self._delete_sensor(sensor_id) |
||
162 | |||
163 | self._dispatch_trigger_for_sensor_exit(sensor=sensor, |
||
164 | exit_code=status) |
||
165 | |||
166 | # Try to respawn a dead process (maybe it was a simple failure which can be |
||
167 | # resolved with a restart) |
||
168 | eventlet.spawn_n(self._respawn_sensor, sensor_id=sensor_id, sensor=sensor, |
||
169 | exit_code=status) |
||
170 | else: |
||
171 | sensor_start_time = self._sensor_start_times[sensor_id] |
||
172 | sensor_respawn_count = self._sensor_respawn_counts[sensor_id] |
||
173 | successfuly_started = (now - sensor_start_time) >= SENSOR_SUCCESSFUL_START_THRESHOLD |
||
174 | |||
175 | if successfuly_started and sensor_respawn_count >= 1: |
||
176 | # Sensor has been successfully running more than threshold seconds, clear the |
||
177 | # respawn counter so we can try to restart the sensor if it dies later on |
||
178 | self._sensor_respawn_counts[sensor_id] = 0 |
||
179 | |||
180 | def running(self): |
||
181 | return len(self._processes) |
||
182 | |||
183 | def stopped(self): |
||
184 | return self._stopped |
||
185 | |||
186 | def shutdown(self, force=False): |
||
187 | LOG.info('Container shutting down. Invoking cleanup on sensors.') |
||
188 | self._stopped = True |
||
189 | |||
190 | if force: |
||
191 | exit_timeout = 0 |
||
192 | else: |
||
193 | exit_timeout = PROCESS_EXIT_TIMEOUT |
||
194 | |||
195 | sensor_ids = self._sensors.keys() |
||
196 | for sensor_id in sensor_ids: |
||
197 | self._stop_sensor_process(sensor_id=sensor_id, exit_timeout=exit_timeout) |
||
198 | |||
199 | LOG.info('All sensors are shut down.') |
||
200 | |||
201 | self._sensors = {} |
||
202 | self._processes = {} |
||
203 | |||
204 | def add_sensor(self, sensor): |
||
205 | """ |
||
206 | Add a new sensor to the container. |
||
207 | |||
208 | :type sensor: ``dict`` |
||
209 | """ |
||
210 | sensor_id = self._get_sensor_id(sensor=sensor) |
||
211 | |||
212 | if sensor_id in self._sensors: |
||
213 | LOG.warning('Sensor %s already exists and running.', sensor_id) |
||
214 | return False |
||
215 | |||
216 | self._spawn_sensor_process(sensor=sensor) |
||
217 | LOG.debug('Sensor %s started.', sensor_id) |
||
218 | self._sensors[sensor_id] = sensor |
||
219 | return True |
||
220 | |||
221 | def remove_sensor(self, sensor): |
||
222 | """ |
||
223 | Remove an existing sensor from the container. |
||
224 | |||
225 | :type sensor: ``dict`` |
||
226 | """ |
||
227 | sensor_id = self._get_sensor_id(sensor=sensor) |
||
228 | |||
229 | if sensor_id not in self._sensors: |
||
230 | LOG.warning('Sensor %s isn\'t running in this container.', sensor_id) |
||
231 | return False |
||
232 | |||
233 | self._stop_sensor_process(sensor_id=sensor_id) |
||
234 | LOG.debug('Sensor %s stopped.', sensor_id) |
||
235 | return True |
||
236 | |||
237 | def _run_all_sensors(self): |
||
238 | sensor_ids = self._sensors.keys() |
||
239 | |||
240 | for sensor_id in sensor_ids: |
||
241 | sensor_obj = self._sensors[sensor_id] |
||
242 | LOG.info('Running sensor %s', sensor_id) |
||
243 | |||
244 | try: |
||
245 | self._spawn_sensor_process(sensor=sensor_obj) |
||
246 | except Exception as e: |
||
247 | LOG.warning(e.message, exc_info=True) |
||
248 | |||
249 | # Disable sensor which we are unable to start |
||
250 | del self._sensors[sensor_id] |
||
251 | continue |
||
252 | |||
253 | LOG.info('Sensor %s started' % sensor_id) |
||
254 | |||
255 | def _spawn_sensor_process(self, sensor): |
||
256 | """ |
||
257 | Spawn a new process for the provided sensor. |
||
258 | |||
259 | New process uses isolated Python binary from a virtual environment |
||
260 | belonging to the sensor pack. |
||
261 | """ |
||
262 | sensor_id = self._get_sensor_id(sensor=sensor) |
||
263 | virtualenv_path = get_sandbox_virtualenv_path(pack=sensor['pack']) |
||
264 | python_path = get_sandbox_python_binary_path(pack=sensor['pack']) |
||
265 | |||
266 | if virtualenv_path and not os.path.isdir(virtualenv_path): |
||
267 | format_values = {'pack': sensor['pack'], 'virtualenv_path': virtualenv_path} |
||
268 | msg = PACK_VIRTUALENV_DOESNT_EXIST % format_values |
||
269 | raise Exception(msg) |
||
270 | |||
271 | trigger_type_refs = sensor['trigger_types'] or [] |
||
272 | trigger_type_refs = ','.join(trigger_type_refs) |
||
273 | |||
274 | parent_args = json.dumps(sys.argv[1:]) |
||
275 | |||
276 | args = [ |
||
277 | python_path, |
||
278 | WRAPPER_SCRIPT_PATH, |
||
279 | '--pack=%s' % (sensor['pack']), |
||
280 | '--file-path=%s' % (sensor['file_path']), |
||
281 | '--class-name=%s' % (sensor['class_name']), |
||
282 | '--trigger-type-refs=%s' % (trigger_type_refs), |
||
283 | '--parent-args=%s' % (parent_args) |
||
284 | ] |
||
285 | |||
286 | if sensor['poll_interval']: |
||
287 | args.append('--poll-interval=%s' % (sensor['poll_interval'])) |
||
288 | |||
289 | env = os.environ.copy() |
||
290 | env['PYTHONPATH'] = get_sandbox_python_path(inherit_from_parent=True, |
||
291 | inherit_parent_virtualenv=True) |
||
292 | |||
293 | # Include full api URL and API token specific to that sensor |
||
294 | ttl = (24 * 60 * 60) |
||
295 | temporary_token = create_token(username='sensors_container', ttl=ttl) |
||
296 | |||
297 | env[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url() |
||
298 | env[AUTH_TOKEN_ENV_VARIABLE_NAME] = temporary_token.token |
||
299 | |||
300 | # TODO 1: Purge temporary token when service stops or sensor process dies |
||
301 | # TODO 2: Store metadata (wrapper process id) with the token and delete |
||
302 | # tokens for old, dead processes on startup |
||
303 | cmd = ' '.join(args) |
||
304 | LOG.debug('Running sensor subprocess (cmd="%s")', cmd) |
||
305 | |||
306 | # TODO: Intercept stdout and stderr for aggregated logging purposes |
||
307 | try: |
||
308 | process = subprocess.Popen(args=args, stdin=None, stdout=None, |
||
309 | stderr=None, shell=False, env=env, |
||
310 | preexec_fn=on_parent_exit('SIGTERM')) |
||
311 | except Exception as e: |
||
312 | cmd = ' '.join(args) |
||
313 | message = ('Failed to spawn process for sensor %s ("%s"): %s' % |
||
314 | (sensor_id, cmd, str(e))) |
||
315 | raise Exception(message) |
||
316 | |||
317 | self._processes[sensor_id] = process |
||
318 | self._sensors[sensor_id] = sensor |
||
319 | self._sensor_start_times[sensor_id] = int(time.time()) |
||
320 | |||
321 | self._dispatch_trigger_for_sensor_spawn(sensor=sensor, process=process, cmd=cmd) |
||
322 | |||
323 | return process |
||
324 | |||
325 | def _stop_sensor_process(self, sensor_id, exit_timeout=PROCESS_EXIT_TIMEOUT): |
||
326 | """ |
||
327 | Stop a sensor process for the provided sensor. |
||
328 | |||
329 | :param sensor_id: Sensor ID. |
||
330 | :type sensor_id: ``str`` |
||
331 | |||
332 | :param exit_timeout: How long to wait for process to exit after |
||
333 | sending SIGTERM signal. If the process doesn't |
||
334 | exit in this amount of seconds, SIGKILL signal |
||
335 | will be sent to the process. |
||
336 | :type exit__timeout: ``int`` |
||
337 | """ |
||
338 | process = self._processes[sensor_id] |
||
339 | |||
340 | # Delete sensor before terminating process so that it will not be |
||
341 | # respawned during termination |
||
342 | self._delete_sensor(sensor_id) |
||
343 | |||
344 | # Terminate the process and wait for up to stop_timeout seconds for the |
||
345 | # process to exit |
||
346 | process.terminate() |
||
347 | |||
348 | timeout = 0 |
||
349 | sleep_delay = 1 |
||
350 | while timeout < exit_timeout: |
||
351 | status = process.poll() |
||
352 | |||
353 | if status is not None: |
||
354 | # Process has exited |
||
355 | break |
||
356 | |||
357 | timeout += sleep_delay |
||
358 | time.sleep(sleep_delay) |
||
359 | |||
360 | if status is None: |
||
361 | # Process hasn't exited yet, forcefully kill it |
||
362 | process.kill() |
||
363 | |||
364 | def _respawn_sensor(self, sensor_id, sensor, exit_code): |
||
365 | """ |
||
366 | Method for respawning a sensor which died with a non-zero exit code. |
||
367 | """ |
||
368 | extra = {'sensor_id': sensor_id, 'sensor': sensor} |
||
369 | |||
370 | if self._stopped: |
||
371 | LOG.debug('Stopped, not respawning a dead sensor', extra=extra) |
||
372 | return |
||
373 | |||
374 | should_respawn = self._should_respawn_sensor(sensor_id=sensor_id, sensor=sensor, |
||
375 | exit_code=exit_code) |
||
376 | |||
377 | if not should_respawn: |
||
378 | LOG.debug('Not respawning a dead sensor', extra=extra) |
||
379 | return |
||
380 | |||
381 | LOG.debug('Respawning dead sensor', extra=extra) |
||
382 | |||
383 | self._sensor_respawn_counts[sensor_id] += 1 |
||
384 | sleep_delay = (SENSOR_RESPAWN_DELAY * self._sensor_respawn_counts[sensor_id]) |
||
385 | eventlet.sleep(sleep_delay) |
||
386 | |||
387 | try: |
||
388 | self._spawn_sensor_process(sensor=sensor) |
||
389 | except Exception as e: |
||
390 | LOG.warning(e.message, exc_info=True) |
||
391 | |||
392 | # Disable sensor which we are unable to start |
||
393 | del self._sensors[sensor_id] |
||
394 | |||
395 | def _should_respawn_sensor(self, sensor_id, sensor, exit_code): |
||
396 | """ |
||
397 | Return True if the provided sensor should be respawned, False otherwise. |
||
398 | """ |
||
399 | if exit_code == 0: |
||
400 | # We only try to respawn sensors which exited with non-zero status code |
||
401 | return False |
||
402 | |||
403 | respawn_count = self._sensor_respawn_counts[sensor_id] |
||
404 | if respawn_count >= SENSOR_MAX_RESPAWN_COUNTS: |
||
405 | LOG.debug('Sensor has already been respawned max times, giving up') |
||
406 | return False |
||
407 | |||
408 | return True |
||
409 | |||
410 | def _get_sensor_id(self, sensor): |
||
411 | """ |
||
412 | Return unique identifier for the provider sensor dict. |
||
413 | |||
414 | :type sensor: ``dict`` |
||
415 | """ |
||
416 | sensor_id = sensor['ref'] |
||
417 | return sensor_id |
||
418 | |||
419 | def _dispatch_trigger_for_sensor_spawn(self, sensor, process, cmd): |
||
420 | trigger = ResourceReference.to_string_reference( |
||
421 | name=SENSOR_SPAWN_TRIGGER['name'], |
||
422 | pack=SENSOR_SPAWN_TRIGGER['pack']) |
||
423 | now = int(time.time()) |
||
424 | payload = { |
||
425 | 'id': sensor['class_name'], |
||
426 | 'timestamp': now, |
||
427 | 'pid': process.pid, |
||
428 | 'cmd': cmd |
||
429 | } |
||
430 | self._dispatcher.dispatch(trigger, payload=payload) |
||
431 | |||
432 | def _dispatch_trigger_for_sensor_exit(self, sensor, exit_code): |
||
433 | trigger = ResourceReference.to_string_reference( |
||
434 | name=SENSOR_EXIT_TRIGGER['name'], |
||
435 | pack=SENSOR_EXIT_TRIGGER['pack']) |
||
436 | now = int(time.time()) |
||
437 | payload = { |
||
438 | 'id': sensor['class_name'], |
||
439 | 'timestamp': now, |
||
440 | 'exit_code': exit_code |
||
441 | } |
||
442 | self._dispatcher.dispatch(trigger, payload=payload) |
||
443 | |||
444 | def _delete_sensor(self, sensor_id): |
||
445 | """ |
||
446 | Delete / reset all the internal state about a particular sensor. |
||
447 | """ |
||
448 | for var in self._internal_sensor_state_variables: |
||
449 | if sensor_id in var: |
||
450 | del var[sensor_id] |
||
451 |