Test Failed
Pull Request — master (#3658)
by Lakshmi
06:41
created

ProcessSensorContainer._spawn_sensor_process()   C

Complexity

Conditions 7

Size

Total Lines 85

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 85
rs 5.3574
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import sys
18
import time
19
import json
20
import subprocess
21
22
from collections import defaultdict
23
24
import eventlet
25
from eventlet.support import greenlets as greenlet
26
from oslo_config import cfg
27
28
from st2common import log as logging
29
from st2common.constants.error_messages import PACK_VIRTUALENV_DOESNT_EXIST
30
from st2common.constants.system import API_URL_ENV_VARIABLE_NAME
31
from st2common.constants.system import AUTH_TOKEN_ENV_VARIABLE_NAME
32
from st2common.constants.triggers import (SENSOR_SPAWN_TRIGGER, SENSOR_EXIT_TRIGGER)
33
from st2common.constants.exit_codes import SUCCESS_EXIT_CODE
34
from st2common.constants.exit_codes import FAILURE_EXIT_CODE
35
from st2common.models.system.common import ResourceReference
36
from st2common.persistence.pack import Pack
37
from st2common.services.access import create_token
38
from st2common.transport.reactor import TriggerDispatcher
39
from st2common.util.api import get_full_public_api_url
40
from st2common.util.pack import get_pack_common_libs_path
41
from st2common.util.shell import on_parent_exit
42
from st2common.util.sandboxing import get_sandbox_python_path
43
from st2common.util.sandboxing import get_sandbox_python_binary_path
44
from st2common.util.sandboxing import get_sandbox_virtualenv_path
45
46
__all__ = [
47
    'ProcessSensorContainer'
48
]
49
50
LOG = logging.getLogger('st2reactor.process_sensor_container')
51
52
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
53
WRAPPER_SCRIPT_NAME = 'sensor_wrapper.py'
54
WRAPPER_SCRIPT_PATH = os.path.join(BASE_DIR, WRAPPER_SCRIPT_NAME)
55
56
# How many times to try to subsequently respawn a sensor after a non-zero exit before giving up
57
SENSOR_MAX_RESPAWN_COUNTS = 2
58
59
# How many seconds after the sensor has been started we should wait before considering sensor as
60
# being started and running successfully
61
SENSOR_SUCCESSFUL_START_THRESHOLD = 10
62
63
# How long to wait (in seconds) before respawning a dead process
64
SENSOR_RESPAWN_DELAY = 2.5
65
66
# How long to wait for process to exit after sending SIGTERM signal. If the process doesn't
67
# exit in this amount of seconds, SIGKILL signal will be sent to the process.
68
PROCESS_EXIT_TIMEOUT = 5
69
70
# TODO: Allow multiple instances of the same sensor with different configuration
71
# options - we need to update sensors for that and add "get_id" or similar
72
# method to the sensor class
73
74
75
class ProcessSensorContainer(object):
76
    """
77
    Sensor container which runs sensors in a separate process.
78
    """
79
80
    def __init__(self, sensors, poll_interval=5, dispatcher=None):
81
        """
82
        :param sensors: A list of sensor dicts.
83
        :type sensors: ``list`` of ``dict``
84
85
        :param poll_interval: How long to sleep between each poll for running / dead sensors.
86
        :type poll_interval: ``float``
87
        """
88
        self._poll_interval = poll_interval
89
90
        self._sensors = {}  # maps sensor_id -> sensor object
91
        self._processes = {}  # maps sensor_id -> sensor process
92
93
        if not dispatcher:
94
            dispatcher = TriggerDispatcher(LOG)
95
        self._dispatcher = dispatcher
96
97
        self._stopped = False
98
99
        sensors = sensors or []
100
        for sensor_obj in sensors:
101
            sensor_id = self._get_sensor_id(sensor=sensor_obj)
102
            self._sensors[sensor_id] = sensor_obj
103
104
        # Stores information needed for respawning dead sensors
105
        self._sensor_start_times = {}  # maps sensor_id -> sensor start time
106
        self._sensor_respawn_counts = defaultdict(int)  # maps sensor_id -> number of respawns
107
108
        # A list of all the instance variables which hold internal state information about a
109
        # particular_sensor
110
        # Note: We don't clear respawn counts since we want to track this through the whole life
111
        # cycle of the container manager
112
        self._internal_sensor_state_variables = [
113
            self._processes,
114
            self._sensors,
115
            self._sensor_start_times,
116
        ]
117
118
        self._enable_common_pack_libs = cfg.CONF.packs.enable_common_libs or False
119
120
121
    def run(self):
122
        self._run_all_sensors()
123
124
        try:
125
            while not self._stopped:
126
                # Poll for all running processes
127
                sensor_ids = self._sensors.keys()
128
129
                if len(sensor_ids) >= 1:
130
                    LOG.debug('%d active sensor(s)' % (len(sensor_ids)))
131
                    self._poll_sensors_for_results(sensor_ids)
132
                else:
133
                    LOG.debug('No active sensors')
134
135
                eventlet.sleep(self._poll_interval)
136
        except greenlet.GreenletExit:
137
            # This exception is thrown when sensor container manager
138
            # kills the thread which runs process container. Not sure
139
            # if this is the best thing to do.
140
            self._stopped = True
141
            return SUCCESS_EXIT_CODE
142
        except:
143
            LOG.exception('Container failed to run sensors.')
144
            self._stopped = True
145
            return FAILURE_EXIT_CODE
146
147
        self._stopped = True
148
        LOG.error('Process container quit. It shouldn\'t.')
149
        return SUCCESS_EXIT_CODE
150
151
    def _poll_sensors_for_results(self, sensor_ids):
152
        """
153
        Main loop which polls sensor for results and detects dead sensors.
154
        """
155
        for sensor_id in sensor_ids:
156
            now = int(time.time())
157
158
            process = self._processes[sensor_id]
159
            status = process.poll()
160
161
            if status is not None:
162
                # Dead process detected
163
                LOG.info('Process for sensor %s has exited with code %s', sensor_id, status)
164
165
                sensor = self._sensors[sensor_id]
166
                self._delete_sensor(sensor_id)
167
168
                self._dispatch_trigger_for_sensor_exit(sensor=sensor,
169
                                                       exit_code=status)
170
171
                # Try to respawn a dead process (maybe it was a simple failure which can be
172
                # resolved with a restart)
173
                eventlet.spawn_n(self._respawn_sensor, sensor_id=sensor_id, sensor=sensor,
174
                                 exit_code=status)
175
            else:
176
                sensor_start_time = self._sensor_start_times[sensor_id]
177
                sensor_respawn_count = self._sensor_respawn_counts[sensor_id]
178
                successfuly_started = (now - sensor_start_time) >= SENSOR_SUCCESSFUL_START_THRESHOLD
179
180
                if successfuly_started and sensor_respawn_count >= 1:
181
                    # Sensor has been successfully running more than threshold seconds, clear the
182
                    # respawn counter so we can try to restart the sensor if it dies later on
183
                    self._sensor_respawn_counts[sensor_id] = 0
184
185
    def running(self):
186
        return len(self._processes)
187
188
    def stopped(self):
189
        return self._stopped
190
191
    def shutdown(self, force=False):
192
        LOG.info('Container shutting down. Invoking cleanup on sensors.')
193
        self._stopped = True
194
195
        if force:
196
            exit_timeout = 0
197
        else:
198
            exit_timeout = PROCESS_EXIT_TIMEOUT
199
200
        sensor_ids = self._sensors.keys()
201
        for sensor_id in sensor_ids:
202
            self._stop_sensor_process(sensor_id=sensor_id, exit_timeout=exit_timeout)
203
204
        LOG.info('All sensors are shut down.')
205
206
        self._sensors = {}
207
        self._processes = {}
208
209
    def add_sensor(self, sensor):
210
        """
211
        Add a new sensor to the container.
212
213
        :type sensor: ``dict``
214
        """
215
        sensor_id = self._get_sensor_id(sensor=sensor)
216
217
        if sensor_id in self._sensors:
218
            LOG.warning('Sensor %s already exists and running.', sensor_id)
219
            return False
220
221
        self._spawn_sensor_process(sensor=sensor)
222
        LOG.debug('Sensor %s started.', sensor_id)
223
        self._sensors[sensor_id] = sensor
224
        return True
225
226
    def remove_sensor(self, sensor):
227
        """
228
        Remove an existing sensor from the container.
229
230
        :type sensor: ``dict``
231
        """
232
        sensor_id = self._get_sensor_id(sensor=sensor)
233
234
        if sensor_id not in self._sensors:
235
            LOG.warning('Sensor %s isn\'t running in this container.', sensor_id)
236
            return False
237
238
        self._stop_sensor_process(sensor_id=sensor_id)
239
        LOG.debug('Sensor %s stopped.', sensor_id)
240
        return True
241
242
    def _run_all_sensors(self):
243
        sensor_ids = self._sensors.keys()
244
245
        for sensor_id in sensor_ids:
246
            sensor_obj = self._sensors[sensor_id]
247
            LOG.info('Running sensor %s', sensor_id)
248
249
            try:
250
                self._spawn_sensor_process(sensor=sensor_obj)
251
            except Exception as e:
252
                LOG.warning(e.message, exc_info=True)
253
254
                # Disable sensor which we are unable to start
255
                del self._sensors[sensor_id]
256
                continue
257
258
            LOG.info('Sensor %s started' % sensor_id)
259
260
    def _spawn_sensor_process(self, sensor):
261
        """
262
        Spawn a new process for the provided sensor.
263
264
        New process uses isolated Python binary from a virtual environment
265
        belonging to the sensor pack.
266
        """
267
        sensor_id = self._get_sensor_id(sensor=sensor)
268
        pack_ref = sensor['pack']
269
        pack_db = Pack.get_by_ref(pack_ref)
270
271
        virtualenv_path = get_sandbox_virtualenv_path(pack=pack_ref)
272
        python_path = get_sandbox_python_binary_path(pack=pack_ref)
273
274
        if virtualenv_path and not os.path.isdir(virtualenv_path):
275
            format_values = {'pack': sensor['pack'], 'virtualenv_path': virtualenv_path}
276
            msg = PACK_VIRTUALENV_DOESNT_EXIST % format_values
277
            raise Exception(msg)
278
279
        trigger_type_refs = sensor['trigger_types'] or []
280
        trigger_type_refs = ','.join(trigger_type_refs)
281
282
        parent_args = json.dumps(sys.argv[1:])
283
284
        args = [
285
            python_path,
286
            WRAPPER_SCRIPT_PATH,
287
            '--pack=%s' % (sensor['pack']),
288
            '--file-path=%s' % (sensor['file_path']),
289
            '--class-name=%s' % (sensor['class_name']),
290
            '--trigger-type-refs=%s' % (trigger_type_refs),
291
            '--parent-args=%s' % (parent_args)
292
        ]
293
294
        if sensor['poll_interval']:
295
            args.append('--poll-interval=%s' % (sensor['poll_interval']))
296
297
        sandbox_python_path = get_sandbox_python_path(inherit_from_parent=True,
298
                                                      inherit_parent_virtualenv=True)
299
        pack_common_libs_path = get_pack_common_libs_path(pack_db=pack_db)
300
301
        env = os.environ.copy()
302
303
        if self._enable_common_pack_libs and pack_common_libs_path:
304
            env['PYTHONPATH'] = pack_common_libs_path + ':' + sandbox_python_path
305
        else:
306
            env['PYTHONPATH'] = sandbox_python_path
307
308
        # Include full api URL and API token specific to that sensor
309
        ttl = cfg.CONF.auth.service_token_ttl
310
        metadata = {
311
            'service': 'sensors_container',
312
            'sensor_path': sensor['file_path'],
313
            'sensor_class': sensor['class_name']
314
        }
315
        temporary_token = create_token(username='sensors_container', ttl=ttl, metadata=metadata,
316
                                       service=True)
317
318
        env[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url()
319
        env[AUTH_TOKEN_ENV_VARIABLE_NAME] = temporary_token.token
320
321
        # TODO 1: Purge temporary token when service stops or sensor process dies
322
        # TODO 2: Store metadata (wrapper process id) with the token and delete
323
        # tokens for old, dead processes on startup
324
        cmd = ' '.join(args)
325
        LOG.debug('Running sensor subprocess (cmd="%s")', cmd)
326
327
        # TODO: Intercept stdout and stderr for aggregated logging purposes
328
        try:
329
            process = subprocess.Popen(args=args, stdin=None, stdout=None,
330
                                       stderr=None, shell=False, env=env,
331
                                       preexec_fn=on_parent_exit('SIGTERM'))
332
        except Exception as e:
333
            cmd = ' '.join(args)
334
            message = ('Failed to spawn process for sensor %s ("%s"): %s' %
335
                       (sensor_id, cmd, str(e)))
336
            raise Exception(message)
337
338
        self._processes[sensor_id] = process
339
        self._sensors[sensor_id] = sensor
340
        self._sensor_start_times[sensor_id] = int(time.time())
341
342
        self._dispatch_trigger_for_sensor_spawn(sensor=sensor, process=process, cmd=cmd)
343
344
        return process
345
346
    def _stop_sensor_process(self, sensor_id, exit_timeout=PROCESS_EXIT_TIMEOUT):
347
        """
348
        Stop a sensor process for the provided sensor.
349
350
        :param sensor_id: Sensor ID.
351
        :type sensor_id: ``str``
352
353
        :param exit_timeout: How long to wait for process to exit after
354
                             sending SIGTERM signal. If the process doesn't
355
                             exit in this amount of seconds, SIGKILL signal
356
                             will be sent to the process.
357
        :type exit__timeout: ``int``
358
        """
359
        process = self._processes[sensor_id]
360
361
        # Delete sensor before terminating process so that it will not be
362
        # respawned during termination
363
        self._delete_sensor(sensor_id)
364
365
        # Terminate the process and wait for up to stop_timeout seconds for the
366
        # process to exit
367
        process.terminate()
368
369
        timeout = 0
370
        sleep_delay = 1
371
        while timeout < exit_timeout:
372
            status = process.poll()
373
374
            if status is not None:
375
                # Process has exited
376
                break
377
378
            timeout += sleep_delay
379
            time.sleep(sleep_delay)
380
381
        if status is None:
382
            # Process hasn't exited yet, forcefully kill it
383
            process.kill()
384
385
    def _respawn_sensor(self, sensor_id, sensor, exit_code):
386
        """
387
        Method for respawning a sensor which died with a non-zero exit code.
388
        """
389
        extra = {'sensor_id': sensor_id, 'sensor': sensor}
390
391
        if self._stopped:
392
            LOG.debug('Stopped, not respawning a dead sensor', extra=extra)
393
            return
394
395
        should_respawn = self._should_respawn_sensor(sensor_id=sensor_id, sensor=sensor,
396
                                                     exit_code=exit_code)
397
398
        if not should_respawn:
399
            LOG.debug('Not respawning a dead sensor', extra=extra)
400
            return
401
402
        LOG.debug('Respawning dead sensor', extra=extra)
403
404
        self._sensor_respawn_counts[sensor_id] += 1
405
        sleep_delay = (SENSOR_RESPAWN_DELAY * self._sensor_respawn_counts[sensor_id])
406
        eventlet.sleep(sleep_delay)
407
408
        try:
409
            self._spawn_sensor_process(sensor=sensor)
410
        except Exception as e:
411
            LOG.warning(e.message, exc_info=True)
412
413
            # Disable sensor which we are unable to start
414
            del self._sensors[sensor_id]
415
416
    def _should_respawn_sensor(self, sensor_id, sensor, exit_code):
417
        """
418
        Return True if the provided sensor should be respawned, False otherwise.
419
        """
420
        if exit_code == 0:
421
            # We only try to respawn sensors which exited with non-zero status code
422
            return False
423
424
        respawn_count = self._sensor_respawn_counts[sensor_id]
425
        if respawn_count >= SENSOR_MAX_RESPAWN_COUNTS:
426
            LOG.debug('Sensor has already been respawned max times, giving up')
427
            return False
428
429
        return True
430
431
    def _get_sensor_id(self, sensor):
432
        """
433
        Return unique identifier for the provider sensor dict.
434
435
        :type sensor: ``dict``
436
        """
437
        sensor_id = sensor['ref']
438
        return sensor_id
439
440
    def _dispatch_trigger_for_sensor_spawn(self, sensor, process, cmd):
441
        trigger = ResourceReference.to_string_reference(
442
            name=SENSOR_SPAWN_TRIGGER['name'],
443
            pack=SENSOR_SPAWN_TRIGGER['pack'])
444
        now = int(time.time())
445
        payload = {
446
            'id': sensor['class_name'],
447
            'timestamp': now,
448
            'pid': process.pid,
449
            'cmd': cmd
450
        }
451
        self._dispatcher.dispatch(trigger, payload=payload)
452
453
    def _dispatch_trigger_for_sensor_exit(self, sensor, exit_code):
454
        trigger = ResourceReference.to_string_reference(
455
            name=SENSOR_EXIT_TRIGGER['name'],
456
            pack=SENSOR_EXIT_TRIGGER['pack'])
457
        now = int(time.time())
458
        payload = {
459
            'id': sensor['class_name'],
460
            'timestamp': now,
461
            'exit_code': exit_code
462
        }
463
        self._dispatcher.dispatch(trigger, payload=payload)
464
465
    def _delete_sensor(self, sensor_id):
466
        """
467
        Delete / reset all the internal state about a particular sensor.
468
        """
469
        for var in self._internal_sensor_state_variables:
470
            if sensor_id in var:
471
                del var[sensor_id]
472