Completed
Push — master ( d1f0a7...3b2ece )
by Edward
21:04 queued 05:38
created

st2reactor.container.ProcessSensorContainer   B

Complexity

Total Complexity 47

Size/Duplication

Total Lines 378
Duplicated Lines 0 %
Metric Value
wmc 47
dl 0
loc 378
rs 8.439

17 Methods

Rating   Name   Duplication   Size   Complexity  
B _spawn_sensor_process() 0 69 5
A _dispatch_trigger_for_sensor_spawn() 0 12 1
A running() 0 2 1
A _dispatch_trigger_for_sensor_exit() 0 11 1
A stopped() 0 2 1
B __init__() 0 36 3
A _should_respawn_sensor() 0 14 3
A shutdown() 0 17 3
B _respawn_sensor() 0 30 4
B run() 0 29 5
A _run_all_sensors() 0 17 3
A add_sensor() 0 16 2
A _delete_sensor() 0 7 3
B _stop_sensor_process() 0 38 4
A remove_sensor() 0 15 2
A _get_sensor_id() 0 8 1
B _poll_sensors_for_results() 0 33 5

How to fix   Complexity   

Complex Class

Complex classes like st2reactor.container.ProcessSensorContainer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import sys
18
import time
19
import json
20
import subprocess
21
22
from collections import defaultdict
23
24
import eventlet
25
from eventlet.support import greenlets as greenlet
26
27
from st2common import log as logging
28
from st2common.constants.error_messages import PACK_VIRTUALENV_DOESNT_EXIST
29
from st2common.constants.system import API_URL_ENV_VARIABLE_NAME
30
from st2common.constants.system import AUTH_TOKEN_ENV_VARIABLE_NAME
31
from st2common.constants.triggers import (SENSOR_SPAWN_TRIGGER, SENSOR_EXIT_TRIGGER)
32
from st2common.models.system.common import ResourceReference
33
from st2common.services.access import create_token
34
from st2common.transport.reactor import TriggerDispatcher
35
from st2common.util.api import get_full_public_api_url
36
from st2common.util.shell import on_parent_exit
37
from st2common.util.sandboxing import get_sandbox_python_path
38
from st2common.util.sandboxing import get_sandbox_python_binary_path
39
from st2common.util.sandboxing import get_sandbox_virtualenv_path
40
41
__all__ = [
42
    'ProcessSensorContainer'
43
]
44
45
LOG = logging.getLogger('st2reactor.process_sensor_container')
46
47
SUCCESS_EXIT_CODE = 0
48
FAILURE_EXIT_CODE = 1
49
50
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
51
WRAPPER_SCRIPT_NAME = 'sensor_wrapper.py'
52
WRAPPER_SCRIPT_PATH = os.path.join(BASE_DIR, WRAPPER_SCRIPT_NAME)
53
54
# How many times to try to subsequently respawn a sensor after a non-zero exit before giving up
55
SENSOR_MAX_RESPAWN_COUNTS = 2
56
57
# How many seconds after the sensor has been started we should wait before considering sensor as
58
# being started and running successfully
59
SENSOR_SUCCESSFUL_START_THRESHOLD = 10
60
61
# How long to wait (in seconds) before respawning a dead process
62
SENSOR_RESPAWN_DELAY = 2.5
63
64
# How long to wait for process to exit after sending SIGTERM signal. If the process doesn't
65
# exit in this amount of seconds, SIGKILL signal will be sent to the process.
66
PROCESS_EXIT_TIMEOUT = 5
67
68
# TODO: Allow multiple instances of the same sensor with different configuration
69
# options - we need to update sensors for that and add "get_id" or similar
70
# method to the sensor class
71
72
73
class ProcessSensorContainer(object):
74
    """
75
    Sensor container which runs sensors in a separate process.
76
    """
77
78
    def __init__(self, sensors, poll_interval=5, dispatcher=None):
79
        """
80
        :param sensors: A list of sensor dicts.
81
        :type sensors: ``list`` of ``dict``
82
83
        :param poll_interval: How long to sleep between each poll for running / dead sensors.
84
        :type poll_interval: ``float``
85
        """
86
        self._poll_interval = poll_interval
87
88
        self._sensors = {}  # maps sensor_id -> sensor object
89
        self._processes = {}  # maps sensor_id -> sensor process
90
91
        if not dispatcher:
92
            dispatcher = TriggerDispatcher(LOG)
93
        self._dispatcher = dispatcher
94
95
        self._stopped = False
96
97
        sensors = sensors or []
98
        for sensor_obj in sensors:
99
            sensor_id = self._get_sensor_id(sensor=sensor_obj)
100
            self._sensors[sensor_id] = sensor_obj
101
102
        # Stores information needed for respawning dead sensors
103
        self._sensor_start_times = {}  # maps sensor_id -> sensor start time
104
        self._sensor_respawn_counts = defaultdict(int)  # maps sensor_id -> number of respawns
105
106
        # A list of all the instance variables which hold internal state information about a
107
        # particular_sensor
108
        # Note: We don't clear respawn counts since we want to track this through the whole life
109
        # cycle of the container manager
110
        self._internal_sensor_state_variables = [
111
            self._processes,
112
            self._sensors,
113
            self._sensor_start_times,
114
        ]
115
116
    def run(self):
117
        self._run_all_sensors()
118
119
        try:
120
            while not self._stopped:
121
                # Poll for all running processes
122
                sensor_ids = self._sensors.keys()
123
124
                if len(sensor_ids) >= 1:
125
                    LOG.debug('%d active sensor(s)' % (len(sensor_ids)))
126
                    self._poll_sensors_for_results(sensor_ids)
127
                else:
128
                    LOG.debug('No active sensors')
129
130
                eventlet.sleep(self._poll_interval)
131
        except greenlet.GreenletExit:
132
            # This exception is thrown when sensor container manager
133
            # kills the thread which runs process container. Not sure
134
            # if this is the best thing to do.
135
            self._stopped = True
136
            return SUCCESS_EXIT_CODE
137
        except:
138
            LOG.exception('Container failed to run sensors.')
139
            self._stopped = True
140
            return FAILURE_EXIT_CODE
141
142
        self._stopped = True
143
        LOG.error('Process container quit. It shouldn\'t.')
144
        return SUCCESS_EXIT_CODE
145
146
    def _poll_sensors_for_results(self, sensor_ids):
147
        """
148
        Main loop which polls sensor for results and detects dead sensors.
149
        """
150
        for sensor_id in sensor_ids:
151
            now = int(time.time())
152
153
            process = self._processes[sensor_id]
154
            status = process.poll()
155
156
            if status is not None:
157
                # Dead process detected
158
                LOG.info('Process for sensor %s has exited with code %s', sensor_id, status)
159
160
                sensor = self._sensors[sensor_id]
161
                self._delete_sensor(sensor_id)
162
163
                self._dispatch_trigger_for_sensor_exit(sensor=sensor,
164
                                                       exit_code=status)
165
166
                # Try to respawn a dead process (maybe it was a simple failure which can be
167
                # resolved with a restart)
168
                eventlet.spawn_n(self._respawn_sensor, sensor_id=sensor_id, sensor=sensor,
169
                                 exit_code=status)
170
            else:
171
                sensor_start_time = self._sensor_start_times[sensor_id]
172
                sensor_respawn_count = self._sensor_respawn_counts[sensor_id]
173
                successfuly_started = (now - sensor_start_time) >= SENSOR_SUCCESSFUL_START_THRESHOLD
174
175
                if successfuly_started and sensor_respawn_count >= 1:
176
                    # Sensor has been successfully running more than threshold seconds, clear the
177
                    # respawn counter so we can try to restart the sensor if it dies later on
178
                    self._sensor_respawn_counts[sensor_id] = 0
179
180
    def running(self):
181
        return len(self._processes)
182
183
    def stopped(self):
184
        return self._stopped
185
186
    def shutdown(self, force=False):
187
        LOG.info('Container shutting down. Invoking cleanup on sensors.')
188
        self._stopped = True
189
190
        if force:
191
            exit_timeout = 0
192
        else:
193
            exit_timeout = PROCESS_EXIT_TIMEOUT
194
195
        sensor_ids = self._sensors.keys()
196
        for sensor_id in sensor_ids:
197
            self._stop_sensor_process(sensor_id=sensor_id, exit_timeout=exit_timeout)
198
199
        LOG.info('All sensors are shut down.')
200
201
        self._sensors = {}
202
        self._processes = {}
203
204
    def add_sensor(self, sensor):
205
        """
206
        Add a new sensor to the container.
207
208
        :type sensor: ``dict``
209
        """
210
        sensor_id = self._get_sensor_id(sensor=sensor)
211
212
        if sensor_id in self._sensors:
213
            LOG.warning('Sensor %s already exists and running.', sensor_id)
214
            return False
215
216
        self._spawn_sensor_process(sensor=sensor)
217
        LOG.debug('Sensor %s started.', sensor_id)
218
        self._sensors[sensor_id] = sensor
219
        return True
220
221
    def remove_sensor(self, sensor):
222
        """
223
        Remove an existing sensor from the container.
224
225
        :type sensor: ``dict``
226
        """
227
        sensor_id = self._get_sensor_id(sensor=sensor)
228
229
        if sensor_id not in self._sensors:
230
            LOG.warning('Sensor %s isn\'t running in this container.', sensor_id)
231
            return False
232
233
        self._stop_sensor_process(sensor_id=sensor_id)
234
        LOG.debug('Sensor %s stopped.', sensor_id)
235
        return True
236
237
    def _run_all_sensors(self):
238
        sensor_ids = self._sensors.keys()
239
240
        for sensor_id in sensor_ids:
241
            sensor_obj = self._sensors[sensor_id]
242
            LOG.info('Running sensor %s', sensor_id)
243
244
            try:
245
                self._spawn_sensor_process(sensor=sensor_obj)
246
            except Exception as e:
247
                LOG.warning(e.message, exc_info=True)
248
249
                # Disable sensor which we are unable to start
250
                del self._sensors[sensor_id]
251
                continue
252
253
            LOG.info('Sensor %s started' % sensor_id)
254
255
    def _spawn_sensor_process(self, sensor):
256
        """
257
        Spawn a new process for the provided sensor.
258
259
        New process uses isolated Python binary from a virtual environment
260
        belonging to the sensor pack.
261
        """
262
        sensor_id = self._get_sensor_id(sensor=sensor)
263
        virtualenv_path = get_sandbox_virtualenv_path(pack=sensor['pack'])
264
        python_path = get_sandbox_python_binary_path(pack=sensor['pack'])
265
266
        if virtualenv_path and not os.path.isdir(virtualenv_path):
267
            format_values = {'pack': sensor['pack'], 'virtualenv_path': virtualenv_path}
268
            msg = PACK_VIRTUALENV_DOESNT_EXIST % format_values
269
            raise Exception(msg)
270
271
        trigger_type_refs = sensor['trigger_types'] or []
272
        trigger_type_refs = ','.join(trigger_type_refs)
273
274
        parent_args = json.dumps(sys.argv[1:])
275
276
        args = [
277
            python_path,
278
            WRAPPER_SCRIPT_PATH,
279
            '--pack=%s' % (sensor['pack']),
280
            '--file-path=%s' % (sensor['file_path']),
281
            '--class-name=%s' % (sensor['class_name']),
282
            '--trigger-type-refs=%s' % (trigger_type_refs),
283
            '--parent-args=%s' % (parent_args)
284
        ]
285
286
        if sensor['poll_interval']:
287
            args.append('--poll-interval=%s' % (sensor['poll_interval']))
288
289
        env = os.environ.copy()
290
        env['PYTHONPATH'] = get_sandbox_python_path(inherit_from_parent=True,
291
                                                    inherit_parent_virtualenv=True)
292
293
        # Include full api URL and API token specific to that sensor
294
        ttl = (24 * 60 * 60)
295
        temporary_token = create_token(username='sensors_container', ttl=ttl)
296
297
        env[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url()
298
        env[AUTH_TOKEN_ENV_VARIABLE_NAME] = temporary_token.token
299
300
        # TODO 1: Purge temporary token when service stops or sensor process dies
301
        # TODO 2: Store metadata (wrapper process id) with the token and delete
302
        # tokens for old, dead processes on startup
303
        cmd = ' '.join(args)
304
        LOG.debug('Running sensor subprocess (cmd="%s")', cmd)
305
306
        # TODO: Intercept stdout and stderr for aggregated logging purposes
307
        try:
308
            process = subprocess.Popen(args=args, stdin=None, stdout=None,
309
                                       stderr=None, shell=False, env=env,
310
                                       preexec_fn=on_parent_exit('SIGTERM'))
311
        except Exception as e:
312
            cmd = ' '.join(args)
313
            message = ('Failed to spawn process for sensor %s ("%s"): %s' %
314
                       (sensor_id, cmd, str(e)))
315
            raise Exception(message)
316
317
        self._processes[sensor_id] = process
318
        self._sensors[sensor_id] = sensor
319
        self._sensor_start_times[sensor_id] = int(time.time())
320
321
        self._dispatch_trigger_for_sensor_spawn(sensor=sensor, process=process, cmd=cmd)
322
323
        return process
324
325
    def _stop_sensor_process(self, sensor_id, exit_timeout=PROCESS_EXIT_TIMEOUT):
326
        """
327
        Stop a sensor process for the provided sensor.
328
329
        :param sensor_id: Sensor ID.
330
        :type sensor_id: ``str``
331
332
        :param exit_timeout: How long to wait for process to exit after
333
                             sending SIGTERM signal. If the process doesn't
334
                             exit in this amount of seconds, SIGKILL signal
335
                             will be sent to the process.
336
        :type exit__timeout: ``int``
337
        """
338
        process = self._processes[sensor_id]
339
340
        # Delete sensor before terminating process so that it will not be
341
        # respawned during termination
342
        self._delete_sensor(sensor_id)
343
344
        # Terminate the process and wait for up to stop_timeout seconds for the
345
        # process to exit
346
        process.terminate()
347
348
        timeout = 0
349
        sleep_delay = 1
350
        while timeout < exit_timeout:
351
            status = process.poll()
352
353
            if status is not None:
354
                # Process has exited
355
                break
356
357
            timeout += sleep_delay
358
            time.sleep(sleep_delay)
359
360
        if status is None:
361
            # Process hasn't exited yet, forcefully kill it
362
            process.kill()
363
364
    def _respawn_sensor(self, sensor_id, sensor, exit_code):
365
        """
366
        Method for respawning a sensor which died with a non-zero exit code.
367
        """
368
        extra = {'sensor_id': sensor_id, 'sensor': sensor}
369
370
        if self._stopped:
371
            LOG.debug('Stopped, not respawning a dead sensor', extra=extra)
372
            return
373
374
        should_respawn = self._should_respawn_sensor(sensor_id=sensor_id, sensor=sensor,
375
                                                     exit_code=exit_code)
376
377
        if not should_respawn:
378
            LOG.debug('Not respawning a dead sensor', extra=extra)
379
            return
380
381
        LOG.debug('Respawning dead sensor', extra=extra)
382
383
        self._sensor_respawn_counts[sensor_id] += 1
384
        sleep_delay = (SENSOR_RESPAWN_DELAY * self._sensor_respawn_counts[sensor_id])
385
        eventlet.sleep(sleep_delay)
386
387
        try:
388
            self._spawn_sensor_process(sensor=sensor)
389
        except Exception as e:
390
            LOG.warning(e.message, exc_info=True)
391
392
            # Disable sensor which we are unable to start
393
            del self._sensors[sensor_id]
394
395
    def _should_respawn_sensor(self, sensor_id, sensor, exit_code):
396
        """
397
        Return True if the provided sensor should be respawned, False otherwise.
398
        """
399
        if exit_code == 0:
400
            # We only try to respawn sensors which exited with non-zero status code
401
            return False
402
403
        respawn_count = self._sensor_respawn_counts[sensor_id]
404
        if respawn_count >= SENSOR_MAX_RESPAWN_COUNTS:
405
            LOG.debug('Sensor has already been respawned max times, giving up')
406
            return False
407
408
        return True
409
410
    def _get_sensor_id(self, sensor):
411
        """
412
        Return unique identifier for the provider sensor dict.
413
414
        :type sensor: ``dict``
415
        """
416
        sensor_id = sensor['ref']
417
        return sensor_id
418
419
    def _dispatch_trigger_for_sensor_spawn(self, sensor, process, cmd):
420
        trigger = ResourceReference.to_string_reference(
421
            name=SENSOR_SPAWN_TRIGGER['name'],
422
            pack=SENSOR_SPAWN_TRIGGER['pack'])
423
        now = int(time.time())
424
        payload = {
425
            'id': sensor['class_name'],
426
            'timestamp': now,
427
            'pid': process.pid,
428
            'cmd': cmd
429
        }
430
        self._dispatcher.dispatch(trigger, payload=payload)
431
432
    def _dispatch_trigger_for_sensor_exit(self, sensor, exit_code):
433
        trigger = ResourceReference.to_string_reference(
434
            name=SENSOR_EXIT_TRIGGER['name'],
435
            pack=SENSOR_EXIT_TRIGGER['pack'])
436
        now = int(time.time())
437
        payload = {
438
            'id': sensor['class_name'],
439
            'timestamp': now,
440
            'exit_code': exit_code
441
        }
442
        self._dispatcher.dispatch(trigger, payload=payload)
443
444
    def _delete_sensor(self, sensor_id):
445
        """
446
        Delete / reset all the internal state about a particular sensor.
447
        """
448
        for var in self._internal_sensor_state_variables:
449
            if sensor_id in var:
450
                del var[sensor_id]
451