Passed
Push — master ( cf341e...dcdf4c )
by
unknown
03:07
created

ProcessSensorContainer   B

Complexity

Total Complexity 49

Size/Duplication

Total Lines 396
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 396
rs 8.5454
wmc 49

17 Methods

Rating   Name   Duplication   Size   Complexity  
A _dispatch_trigger_for_sensor_exit() 0 11 1
A running() 0 2 1
C _spawn_sensor_process() 0 85 7
B _stop_sensor_process() 0 38 4
A _get_sensor_id() 0 8 1
A _delete_sensor() 0 7 3
A add_sensor() 0 16 2
A remove_sensor() 0 15 2
A _run_all_sensors() 0 17 3
A shutdown() 0 17 3
B run() 0 29 5
A _should_respawn_sensor() 0 14 3
A _dispatch_trigger_for_sensor_spawn() 0 12 1
B _poll_sensors_for_results() 0 33 5
A stopped() 0 2 1
B _respawn_sensor() 0 30 4
B __init__() 0 39 3

How to fix   Complexity   

Complex Class

Complex classes like ProcessSensorContainer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import sys
18
import time
19
import json
20
import subprocess
21
22
from collections import defaultdict
23
24
import eventlet
25
from eventlet.support import greenlets as greenlet
26
from oslo_config import cfg
27
28
from st2common import log as logging
29
from st2common.constants.error_messages import PACK_VIRTUALENV_DOESNT_EXIST
30
from st2common.constants.system import API_URL_ENV_VARIABLE_NAME
31
from st2common.constants.system import AUTH_TOKEN_ENV_VARIABLE_NAME
32
from st2common.constants.triggers import (SENSOR_SPAWN_TRIGGER, SENSOR_EXIT_TRIGGER)
33
from st2common.constants.exit_codes import SUCCESS_EXIT_CODE
34
from st2common.constants.exit_codes import FAILURE_EXIT_CODE
35
from st2common.models.system.common import ResourceReference
36
from st2common.persistence.pack import Pack
37
from st2common.services.access import create_token
38
from st2common.transport.reactor import TriggerDispatcher
39
from st2common.util.api import get_full_public_api_url
40
from st2common.util.pack import get_pack_common_libs_path
41
from st2common.util.shell import on_parent_exit
42
from st2common.util.sandboxing import get_sandbox_python_path
43
from st2common.util.sandboxing import get_sandbox_python_binary_path
44
from st2common.util.sandboxing import get_sandbox_virtualenv_path
45
46
__all__ = [
47
    'ProcessSensorContainer'
48
]
49
50
LOG = logging.getLogger('st2reactor.process_sensor_container')
51
52
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
53
WRAPPER_SCRIPT_NAME = 'sensor_wrapper.py'
54
WRAPPER_SCRIPT_PATH = os.path.join(BASE_DIR, WRAPPER_SCRIPT_NAME)
55
56
# How many times to try to subsequently respawn a sensor after a non-zero exit before giving up
57
SENSOR_MAX_RESPAWN_COUNTS = 2
58
59
# How many seconds after the sensor has been started we should wait before considering sensor as
60
# being started and running successfully
61
SENSOR_SUCCESSFUL_START_THRESHOLD = 10
62
63
# How long to wait (in seconds) before respawning a dead process
64
SENSOR_RESPAWN_DELAY = 2.5
65
66
# How long to wait for process to exit after sending SIGTERM signal. If the process doesn't
67
# exit in this amount of seconds, SIGKILL signal will be sent to the process.
68
PROCESS_EXIT_TIMEOUT = 5
69
70
# TODO: Allow multiple instances of the same sensor with different configuration
71
# options - we need to update sensors for that and add "get_id" or similar
72
# method to the sensor class
73
74
75
class ProcessSensorContainer(object):
76
    """
77
    Sensor container which runs sensors in a separate process.
78
    """
79
80
    def __init__(self, sensors, poll_interval=5, dispatcher=None):
81
        """
82
        :param sensors: A list of sensor dicts.
83
        :type sensors: ``list`` of ``dict``
84
85
        :param poll_interval: How long to sleep between each poll for running / dead sensors.
86
        :type poll_interval: ``float``
87
        """
88
        self._poll_interval = poll_interval
89
90
        self._sensors = {}  # maps sensor_id -> sensor object
91
        self._processes = {}  # maps sensor_id -> sensor process
92
93
        if not dispatcher:
94
            dispatcher = TriggerDispatcher(LOG)
95
        self._dispatcher = dispatcher
96
97
        self._stopped = False
98
99
        sensors = sensors or []
100
        for sensor_obj in sensors:
101
            sensor_id = self._get_sensor_id(sensor=sensor_obj)
102
            self._sensors[sensor_id] = sensor_obj
103
104
        # Stores information needed for respawning dead sensors
105
        self._sensor_start_times = {}  # maps sensor_id -> sensor start time
106
        self._sensor_respawn_counts = defaultdict(int)  # maps sensor_id -> number of respawns
107
108
        # A list of all the instance variables which hold internal state information about a
109
        # particular_sensor
110
        # Note: We don't clear respawn counts since we want to track this through the whole life
111
        # cycle of the container manager
112
        self._internal_sensor_state_variables = [
113
            self._processes,
114
            self._sensors,
115
            self._sensor_start_times,
116
        ]
117
118
        self._enable_common_pack_libs = cfg.CONF.packs.enable_common_libs or False
119
120
    def run(self):
121
        self._run_all_sensors()
122
123
        try:
124
            while not self._stopped:
125
                # Poll for all running processes
126
                sensor_ids = self._sensors.keys()
127
128
                if len(sensor_ids) >= 1:
129
                    LOG.debug('%d active sensor(s)' % (len(sensor_ids)))
130
                    self._poll_sensors_for_results(sensor_ids)
131
                else:
132
                    LOG.debug('No active sensors')
133
134
                eventlet.sleep(self._poll_interval)
135
        except greenlet.GreenletExit:
136
            # This exception is thrown when sensor container manager
137
            # kills the thread which runs process container. Not sure
138
            # if this is the best thing to do.
139
            self._stopped = True
140
            return SUCCESS_EXIT_CODE
141
        except:
142
            LOG.exception('Container failed to run sensors.')
143
            self._stopped = True
144
            return FAILURE_EXIT_CODE
145
146
        self._stopped = True
147
        LOG.error('Process container quit. It shouldn\'t.')
148
        return SUCCESS_EXIT_CODE
149
150
    def _poll_sensors_for_results(self, sensor_ids):
151
        """
152
        Main loop which polls sensor for results and detects dead sensors.
153
        """
154
        for sensor_id in sensor_ids:
155
            now = int(time.time())
156
157
            process = self._processes[sensor_id]
158
            status = process.poll()
159
160
            if status is not None:
161
                # Dead process detected
162
                LOG.info('Process for sensor %s has exited with code %s', sensor_id, status)
163
164
                sensor = self._sensors[sensor_id]
165
                self._delete_sensor(sensor_id)
166
167
                self._dispatch_trigger_for_sensor_exit(sensor=sensor,
168
                                                       exit_code=status)
169
170
                # Try to respawn a dead process (maybe it was a simple failure which can be
171
                # resolved with a restart)
172
                eventlet.spawn_n(self._respawn_sensor, sensor_id=sensor_id, sensor=sensor,
173
                                 exit_code=status)
174
            else:
175
                sensor_start_time = self._sensor_start_times[sensor_id]
176
                sensor_respawn_count = self._sensor_respawn_counts[sensor_id]
177
                successfuly_started = (now - sensor_start_time) >= SENSOR_SUCCESSFUL_START_THRESHOLD
178
179
                if successfuly_started and sensor_respawn_count >= 1:
180
                    # Sensor has been successfully running more than threshold seconds, clear the
181
                    # respawn counter so we can try to restart the sensor if it dies later on
182
                    self._sensor_respawn_counts[sensor_id] = 0
183
184
    def running(self):
185
        return len(self._processes)
186
187
    def stopped(self):
188
        return self._stopped
189
190
    def shutdown(self, force=False):
191
        LOG.info('Container shutting down. Invoking cleanup on sensors.')
192
        self._stopped = True
193
194
        if force:
195
            exit_timeout = 0
196
        else:
197
            exit_timeout = PROCESS_EXIT_TIMEOUT
198
199
        sensor_ids = self._sensors.keys()
200
        for sensor_id in sensor_ids:
201
            self._stop_sensor_process(sensor_id=sensor_id, exit_timeout=exit_timeout)
202
203
        LOG.info('All sensors are shut down.')
204
205
        self._sensors = {}
206
        self._processes = {}
207
208
    def add_sensor(self, sensor):
209
        """
210
        Add a new sensor to the container.
211
212
        :type sensor: ``dict``
213
        """
214
        sensor_id = self._get_sensor_id(sensor=sensor)
215
216
        if sensor_id in self._sensors:
217
            LOG.warning('Sensor %s already exists and running.', sensor_id)
218
            return False
219
220
        self._spawn_sensor_process(sensor=sensor)
221
        LOG.debug('Sensor %s started.', sensor_id)
222
        self._sensors[sensor_id] = sensor
223
        return True
224
225
    def remove_sensor(self, sensor):
226
        """
227
        Remove an existing sensor from the container.
228
229
        :type sensor: ``dict``
230
        """
231
        sensor_id = self._get_sensor_id(sensor=sensor)
232
233
        if sensor_id not in self._sensors:
234
            LOG.warning('Sensor %s isn\'t running in this container.', sensor_id)
235
            return False
236
237
        self._stop_sensor_process(sensor_id=sensor_id)
238
        LOG.debug('Sensor %s stopped.', sensor_id)
239
        return True
240
241
    def _run_all_sensors(self):
242
        sensor_ids = self._sensors.keys()
243
244
        for sensor_id in sensor_ids:
245
            sensor_obj = self._sensors[sensor_id]
246
            LOG.info('Running sensor %s', sensor_id)
247
248
            try:
249
                self._spawn_sensor_process(sensor=sensor_obj)
250
            except Exception as e:
251
                LOG.warning(e.message, exc_info=True)
252
253
                # Disable sensor which we are unable to start
254
                del self._sensors[sensor_id]
255
                continue
256
257
            LOG.info('Sensor %s started' % sensor_id)
258
259
    def _spawn_sensor_process(self, sensor):
260
        """
261
        Spawn a new process for the provided sensor.
262
263
        New process uses isolated Python binary from a virtual environment
264
        belonging to the sensor pack.
265
        """
266
        sensor_id = self._get_sensor_id(sensor=sensor)
267
        pack_ref = sensor['pack']
268
        pack_db = Pack.get_by_ref(pack_ref)
269
270
        virtualenv_path = get_sandbox_virtualenv_path(pack=pack_ref)
271
        python_path = get_sandbox_python_binary_path(pack=pack_ref)
272
273
        if virtualenv_path and not os.path.isdir(virtualenv_path):
274
            format_values = {'pack': sensor['pack'], 'virtualenv_path': virtualenv_path}
275
            msg = PACK_VIRTUALENV_DOESNT_EXIST % format_values
276
            raise Exception(msg)
277
278
        trigger_type_refs = sensor['trigger_types'] or []
279
        trigger_type_refs = ','.join(trigger_type_refs)
280
281
        parent_args = json.dumps(sys.argv[1:])
282
283
        args = [
284
            python_path,
285
            WRAPPER_SCRIPT_PATH,
286
            '--pack=%s' % (sensor['pack']),
287
            '--file-path=%s' % (sensor['file_path']),
288
            '--class-name=%s' % (sensor['class_name']),
289
            '--trigger-type-refs=%s' % (trigger_type_refs),
290
            '--parent-args=%s' % (parent_args)
291
        ]
292
293
        if sensor['poll_interval']:
294
            args.append('--poll-interval=%s' % (sensor['poll_interval']))
295
296
        sandbox_python_path = get_sandbox_python_path(inherit_from_parent=True,
297
                                                      inherit_parent_virtualenv=True)
298
        pack_common_libs_path = get_pack_common_libs_path(pack_db=pack_db)
299
300
        env = os.environ.copy()
301
302
        if self._enable_common_pack_libs and pack_common_libs_path:
303
            env['PYTHONPATH'] = pack_common_libs_path + ':' + sandbox_python_path
304
        else:
305
            env['PYTHONPATH'] = sandbox_python_path
306
307
        # Include full api URL and API token specific to that sensor
308
        ttl = cfg.CONF.auth.service_token_ttl
309
        metadata = {
310
            'service': 'sensors_container',
311
            'sensor_path': sensor['file_path'],
312
            'sensor_class': sensor['class_name']
313
        }
314
        temporary_token = create_token(username='sensors_container', ttl=ttl, metadata=metadata,
315
                                       service=True)
316
317
        env[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url()
318
        env[AUTH_TOKEN_ENV_VARIABLE_NAME] = temporary_token.token
319
320
        # TODO 1: Purge temporary token when service stops or sensor process dies
321
        # TODO 2: Store metadata (wrapper process id) with the token and delete
322
        # tokens for old, dead processes on startup
323
        cmd = ' '.join(args)
324
        LOG.debug('Running sensor subprocess (cmd="%s")', cmd)
325
326
        # TODO: Intercept stdout and stderr for aggregated logging purposes
327
        try:
328
            process = subprocess.Popen(args=args, stdin=None, stdout=None,
329
                                       stderr=None, shell=False, env=env,
330
                                       preexec_fn=on_parent_exit('SIGTERM'))
331
        except Exception as e:
332
            cmd = ' '.join(args)
333
            message = ('Failed to spawn process for sensor %s ("%s"): %s' %
334
                       (sensor_id, cmd, str(e)))
335
            raise Exception(message)
336
337
        self._processes[sensor_id] = process
338
        self._sensors[sensor_id] = sensor
339
        self._sensor_start_times[sensor_id] = int(time.time())
340
341
        self._dispatch_trigger_for_sensor_spawn(sensor=sensor, process=process, cmd=cmd)
342
343
        return process
344
345
    def _stop_sensor_process(self, sensor_id, exit_timeout=PROCESS_EXIT_TIMEOUT):
346
        """
347
        Stop a sensor process for the provided sensor.
348
349
        :param sensor_id: Sensor ID.
350
        :type sensor_id: ``str``
351
352
        :param exit_timeout: How long to wait for process to exit after
353
                             sending SIGTERM signal. If the process doesn't
354
                             exit in this amount of seconds, SIGKILL signal
355
                             will be sent to the process.
356
        :type exit__timeout: ``int``
357
        """
358
        process = self._processes[sensor_id]
359
360
        # Delete sensor before terminating process so that it will not be
361
        # respawned during termination
362
        self._delete_sensor(sensor_id)
363
364
        # Terminate the process and wait for up to stop_timeout seconds for the
365
        # process to exit
366
        process.terminate()
367
368
        timeout = 0
369
        sleep_delay = 1
370
        while timeout < exit_timeout:
371
            status = process.poll()
372
373
            if status is not None:
374
                # Process has exited
375
                break
376
377
            timeout += sleep_delay
378
            time.sleep(sleep_delay)
379
380
        if status is None:
381
            # Process hasn't exited yet, forcefully kill it
382
            process.kill()
383
384
    def _respawn_sensor(self, sensor_id, sensor, exit_code):
385
        """
386
        Method for respawning a sensor which died with a non-zero exit code.
387
        """
388
        extra = {'sensor_id': sensor_id, 'sensor': sensor}
389
390
        if self._stopped:
391
            LOG.debug('Stopped, not respawning a dead sensor', extra=extra)
392
            return
393
394
        should_respawn = self._should_respawn_sensor(sensor_id=sensor_id, sensor=sensor,
395
                                                     exit_code=exit_code)
396
397
        if not should_respawn:
398
            LOG.debug('Not respawning a dead sensor', extra=extra)
399
            return
400
401
        LOG.debug('Respawning dead sensor', extra=extra)
402
403
        self._sensor_respawn_counts[sensor_id] += 1
404
        sleep_delay = (SENSOR_RESPAWN_DELAY * self._sensor_respawn_counts[sensor_id])
405
        eventlet.sleep(sleep_delay)
406
407
        try:
408
            self._spawn_sensor_process(sensor=sensor)
409
        except Exception as e:
410
            LOG.warning(e.message, exc_info=True)
411
412
            # Disable sensor which we are unable to start
413
            del self._sensors[sensor_id]
414
415
    def _should_respawn_sensor(self, sensor_id, sensor, exit_code):
416
        """
417
        Return True if the provided sensor should be respawned, False otherwise.
418
        """
419
        if exit_code == 0:
420
            # We only try to respawn sensors which exited with non-zero status code
421
            return False
422
423
        respawn_count = self._sensor_respawn_counts[sensor_id]
424
        if respawn_count >= SENSOR_MAX_RESPAWN_COUNTS:
425
            LOG.debug('Sensor has already been respawned max times, giving up')
426
            return False
427
428
        return True
429
430
    def _get_sensor_id(self, sensor):
431
        """
432
        Return unique identifier for the provider sensor dict.
433
434
        :type sensor: ``dict``
435
        """
436
        sensor_id = sensor['ref']
437
        return sensor_id
438
439
    def _dispatch_trigger_for_sensor_spawn(self, sensor, process, cmd):
440
        trigger = ResourceReference.to_string_reference(
441
            name=SENSOR_SPAWN_TRIGGER['name'],
442
            pack=SENSOR_SPAWN_TRIGGER['pack'])
443
        now = int(time.time())
444
        payload = {
445
            'id': sensor['class_name'],
446
            'timestamp': now,
447
            'pid': process.pid,
448
            'cmd': cmd
449
        }
450
        self._dispatcher.dispatch(trigger, payload=payload)
451
452
    def _dispatch_trigger_for_sensor_exit(self, sensor, exit_code):
453
        trigger = ResourceReference.to_string_reference(
454
            name=SENSOR_EXIT_TRIGGER['name'],
455
            pack=SENSOR_EXIT_TRIGGER['pack'])
456
        now = int(time.time())
457
        payload = {
458
            'id': sensor['class_name'],
459
            'timestamp': now,
460
            'exit_code': exit_code
461
        }
462
        self._dispatcher.dispatch(trigger, payload=payload)
463
464
    def _delete_sensor(self, sensor_id):
465
        """
466
        Delete / reset all the internal state about a particular sensor.
467
        """
468
        for var in self._internal_sensor_state_variables:
469
            if sensor_id in var:
470
                del var[sensor_id]
471