Completed
Pull Request — master (#2304)
by Arma
07:07
created

st2reactor.container.SensorService   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 246
Duplicated Lines 0 %
Metric Value
wmc 25
dl 0
loc 246
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import sys
18
import json
19
import atexit
20
import argparse
21
22
import eventlet
23
from oslo_config import cfg
24
from st2client.client import Client
25
26
from st2common import log as logging
27
from st2common.logging.misc import set_log_level_for_all_loggers
28
from st2common.models.api.trace import TraceContext
29
from st2common.persistence.db_init import db_setup_with_retry
30
from st2common.transport.reactor import TriggerDispatcher
31
from st2common.util import loader
32
from st2common.util.config_parser import ContentPackConfigParser
33
from st2common.services.triggerwatcher import TriggerWatcher
34
from st2reactor.sensor.base import Sensor, PollingSensor
35
from st2reactor.sensor import config
36
from st2common.constants.system import API_URL_ENV_VARIABLE_NAME
37
from st2common.constants.system import AUTH_TOKEN_ENV_VARIABLE_NAME
38
from st2client.models.keyvalue import KeyValuePair
39
40
__all__ = [
41
    'SensorWrapper'
42
]
43
44
eventlet.monkey_patch(
45
    os=True,
46
    select=True,
47
    socket=True,
48
    thread=False if '--use-debugger' in sys.argv else True,
49
    time=True)
50
51
52
class SensorService(object):
53
    """
54
    Instance of this class is passed to the sensor instance and exposes "public"
55
    methods which can be called by the sensor.
56
    """
57
58
    DATASTORE_NAME_SEPARATOR = ':'
59
60
    def __init__(self, sensor_wrapper):
61
        self._sensor_wrapper = sensor_wrapper
62
        self._logger = self._sensor_wrapper._logger
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _logger was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
63
        self._dispatcher = TriggerDispatcher(self._logger)
64
65
        self._client = None
66
67
    def get_logger(self, name):
68
        """
69
        Retrieve an instance of a logger to be used by the sensor class.
70
        """
71
        logger_name = '%s.%s' % (self._sensor_wrapper._logger.name, name)
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _logger was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
72
        logger = logging.getLogger(logger_name)
73
        logger.propagate = True
74
75
        return logger
76
77
    def dispatch(self, trigger, payload=None, trace_tag=None):
78
        """
79
        Method which dispatches the trigger.
80
81
        :param trigger: Full name / reference of the trigger.
82
        :type trigger: ``str``
83
84
        :param payload: Trigger payload.
85
        :type payload: ``dict``
86
87
        :param trace_tag: Tracer to track the triggerinstance.
88
        :type trace_tags: ``str``
89
        """
90
        # empty strings
91
        trace_context = TraceContext(trace_tag=trace_tag) if trace_tag else None
92
        self.dispatch_with_context(trigger, payload=payload, trace_context=trace_context)
93
94
    def dispatch_with_context(self, trigger, payload=None, trace_context=None):
95
        """
96
        Method which dispatches the trigger.
97
98
        :param trigger: Full name / reference of the trigger.
99
        :type trigger: ``str``
100
101
        :param payload: Trigger payload.
102
        :type payload: ``dict``
103
104
        :param trace_context: Trace context to associate with Trigger.
105
        :type trace_context: ``st2common.api.models.api.trace.TraceContext``
106
        """
107
        self._dispatcher.dispatch(trigger, payload=payload, trace_context=trace_context)
108
109
    ##################################
110
    # Methods for datastore management
111
    ##################################
112
113
    def list_values(self, local=True, prefix=None):
114
        """
115
        Retrieve all the datastores items.
116
117
        :param local: List values from a namespace local to this sensor. Defaults to True.
118
        :type: local: ``bool``
119
120
        :param prefix: Optional key name prefix / startswith filter.
121
        :type prefix: ``str``
122
123
        :rtype: ``list`` of :class:`KeyValuePair`
124
        """
125
        client = self._get_api_client()
126
        self._logger.audit('Retrieving all the value from the datastore')
127
128
        key_prefix = self._get_full_key_prefix(local=local, prefix=prefix)
129
        kvps = client.keys.get_all(prefix=key_prefix)
130
        return kvps
131
132
    def get_value(self, name, local=True):
133
        """
134
        Retrieve a value from the datastore for the provided key.
135
136
        By default, value is retrieved from the namespace local to the sensor. If you want to
137
        retrieve a global value from a datastore, pass local=False to this method.
138
139
        :param name: Key name.
140
        :type name: ``str``
141
142
        :param local: Retrieve value from a namespace local to the sensor. Defaults to True.
143
        :type: local: ``bool``
144
145
        :rtype: ``str`` or ``None``
146
        """
147
        name = self._get_full_key_name(name=name, local=local)
148
149
        client = self._get_api_client()
150
        self._logger.audit('Retrieving value from the datastore (name=%s)', name)
151
152
        try:
153
            kvp = client.keys.get_by_id(id=name)
154
        except Exception:
155
            return None
156
157
        if kvp:
158
            return kvp.value
159
160
        return None
161
162
    def set_value(self, name, value, ttl=None, local=True):
163
        """
164
        Set a value for the provided key.
165
166
        By default, value is set in a namespace local to the sensor. If you want to
167
        set a global value, pass local=False to this method.
168
169
        :param name: Key name.
170
        :type name: ``str``
171
172
        :param value: Key value.
173
        :type value: ``str``
174
175
        :param ttl: Optional TTL (in seconds).
176
        :type ttl: ``int``
177
178
        :param local: Set value in a namespace local to the sensor. Defaults to True.
179
        :type: local: ``bool``
180
181
        :return: ``True`` on success, ``False`` otherwise.
182
        :rtype: ``bool``
183
        """
184
        name = self._get_full_key_name(name=name, local=local)
185
186
        value = str(value)
187
        client = self._get_api_client()
188
189
        self._logger.audit('Setting value in the datastore (name=%s)', name)
190
191
        instance = KeyValuePair()
192
        instance.id = name
193
        instance.name = name
194
        instance.value = value
195
196
        if ttl:
197
            instance.ttl = ttl
198
199
        client.keys.update(instance=instance)
200
        return True
201
202
    def delete_value(self, name, local=True):
203
        """
204
        Delete the provided key.
205
206
        By default, value is deleted from a namespace local to the sensor. If you want to
207
        delete a global value, pass local=False to this method.
208
209
        :param name: Name of the key to delete.
210
        :type name: ``str``
211
212
        :param local: Delete a value in a namespace local to the sensor. Defaults to True.
213
        :type: local: ``bool``
214
215
        :return: ``True`` on success, ``False`` otherwise.
216
        :rtype: ``bool``
217
        """
218
        name = self._get_full_key_name(name=name, local=local)
219
220
        client = self._get_api_client()
221
222
        instance = KeyValuePair()
223
        instance.id = name
224
        instance.name = name
225
226
        self._logger.audit('Deleting value from the datastore (name=%s)', name)
227
228
        try:
229
            client.keys.delete(instance=instance)
230
        except Exception:
231
            return False
232
233
        return True
234
235
    def _get_api_client(self):
236
        """
237
        Retrieve API client instance.
238
        """
239
        # TODO: API client is really unfriendly and needs to be re-designed and
240
        # improved
241
        api_url = os.environ.get(API_URL_ENV_VARIABLE_NAME, None)
242
        auth_token = os.environ.get(AUTH_TOKEN_ENV_VARIABLE_NAME, None)
243
244
        if not api_url or not auth_token:
245
            raise ValueError('%s and %s environment variable must be set' %
246
                             (API_URL_ENV_VARIABLE_NAME, AUTH_TOKEN_ENV_VARIABLE_NAME))
247
248
        if not self._client:
249
            self._client = Client(api_url=api_url)
250
251
        return self._client
252
253
    def _get_full_key_name(self, name, local):
254
        """
255
        Retrieve a full key name.
256
257
        :rtype: ``str``
258
        """
259
        if local:
260
            name = self._get_key_name_with_sensor_prefix(name=name)
261
262
        return name
263
264
    def _get_full_key_prefix(self, local, prefix=None):
265
        if local:
266
            key_prefix = self._get_sensor_local_key_name_prefix()
267
268
            if prefix:
269
                key_prefix += prefix
270
        else:
271
            key_prefix = prefix
272
273
        return key_prefix
274
275
    def _get_sensor_local_key_name_prefix(self):
276
        """
277
        Retrieve key prefix which is local to this sensor.
278
        """
279
        key_prefix = self._get_datastore_key_prefix() + self.DATASTORE_NAME_SEPARATOR
280
        return key_prefix
281
282
    def _get_key_name_with_sensor_prefix(self, name):
283
        """
284
        Retrieve a full key name which is local to the current sensor.
285
286
        :param name: Base datastore key name.
287
        :type name: ``str``
288
289
        :rtype: ``str``
290
        """
291
        prefix = self._get_datastore_key_prefix()
292
        full_name = prefix + self.DATASTORE_NAME_SEPARATOR + name
293
        return full_name
294
295
    def _get_datastore_key_prefix(self):
296
        prefix = '%s.%s' % (self._sensor_wrapper._pack, self._sensor_wrapper._class_name)
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _pack was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
Coding Style Best Practice introduced by
It seems like _class_name was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
297
        return prefix
298
299
300
class SensorWrapper(object):
301
    def __init__(self, pack, file_path, class_name, trigger_types,
0 ignored issues
show
Comprehensibility Bug introduced by
trigger_types is re-defining a name which is already available in the outer-scope (previously defined on line 497).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
302
                 poll_interval=None, parent_args=None):
0 ignored issues
show
Comprehensibility Bug introduced by
parent_args is re-defining a name which is already available in the outer-scope (previously defined on line 499).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
303
        """
304
        :param pack: Name of the pack this sensor belongs to.
305
        :type pack: ``str``
306
307
        :param file_path: Path to the sensor module file.
308
        :type file_path: ``str``
309
310
        :param class_name: Sensor class name.
311
        :type class_name: ``str``
312
313
        :param trigger_types: A list of references to trigger types which
314
                                  belong to this sensor.
315
        :type trigger_types: ``list`` of ``str``
316
317
        :param poll_interval: Sensor poll interval (in seconds).
318
        :type poll_interval: ``int`` or ``None``
319
320
        :param parent_args: Command line arguments passed to the parent process.
321
        :type parse_args: ``list``
322
        """
323
        self._pack = pack
324
        self._file_path = file_path
325
        self._class_name = class_name
326
        self._trigger_types = trigger_types or []
327
        self._poll_interval = poll_interval
328
        self._parent_args = parent_args or []
329
        self._trigger_names = {}
330
331
        # 1. Parse the config with inherited parent args
332
        try:
333
            config.parse_args(args=self._parent_args)
334
        except Exception:
335
            pass
336
337
        # 2. Establish DB connection
338
        username = cfg.CONF.database.username if hasattr(cfg.CONF.database, 'username') else None
339
        password = cfg.CONF.database.password if hasattr(cfg.CONF.database, 'password') else None
340
        db_setup_with_retry(cfg.CONF.database.db_name, cfg.CONF.database.host,
341
                            cfg.CONF.database.port, username=username, password=password)
342
343
        # 3. Instantiate the watcher
344
        self._trigger_watcher = TriggerWatcher(create_handler=self._handle_create_trigger,
345
                                               update_handler=self._handle_update_trigger,
346
                                               delete_handler=self._handle_delete_trigger,
347
                                               trigger_types=self._trigger_types,
348
                                               queue_suffix='sensorwrapper_%s_%s' %
349
                                               (self._pack, self._class_name),
350
                                               exclusive=True)
351
352
        # 4. Set up logging
353
        self._logger = logging.getLogger('SensorWrapper.%s' %
354
                                         (self._class_name))
355
        logging.setup(cfg.CONF.sensorcontainer.logging)
356
357
        if '--debug' in parent_args:
358
            set_log_level_for_all_loggers()
359
360
        self._sensor_instance = self._get_sensor_instance()
361
362
    def run(self):
363
        atexit.register(self.stop)
364
365
        self._trigger_watcher.start()
366
        self._logger.info('Watcher started')
367
368
        self._logger.info('Running sensor initialization code')
369
        self._sensor_instance.setup()
370
371
        if self._poll_interval:
372
            message = ('Running sensor in active mode (poll interval=%ss)' %
373
                       (self._poll_interval))
374
        else:
375
            message = 'Running sensor in passive mode'
376
377
        self._logger.info(message)
378
379
        try:
380
            self._sensor_instance.run()
381
        except Exception as e:
382
            # Include traceback
383
            msg = ('Sensor "%s" run method raised an exception: %s.' %
384
                   (self._class_name, str(e)))
385
            self._logger.warn(msg, exc_info=True)
386
            raise Exception(msg)
387
388
    def stop(self):
389
        # Stop watcher
390
        self._logger.info('Stopping trigger watcher')
391
        self._trigger_watcher.stop()
392
393
        # Run sensor cleanup code
394
        self._logger.info('Invoking cleanup on sensor')
395
        self._sensor_instance.cleanup()
396
397
    ##############################################
398
    # Event handler methods for the trigger events
399
    ##############################################
400
401
    def _handle_create_trigger(self, trigger):
402
        self._logger.debug('Calling sensor "add_trigger" method (trigger.type=%s)' %
403
                           (trigger.type))
404
        self._trigger_names[str(trigger.id)] = trigger
405
406
        trigger = self._sanitize_trigger(trigger=trigger)
407
        self._sensor_instance.add_trigger(trigger=trigger)
408
409
    def _handle_update_trigger(self, trigger):
410
        self._logger.debug('Calling sensor "update_trigger" method (trigger.type=%s)' %
411
                           (trigger.type))
412
        self._trigger_names[str(trigger.id)] = trigger
413
414
        trigger = self._sanitize_trigger(trigger=trigger)
415
        self._sensor_instance.update_trigger(trigger=trigger)
416
417
    def _handle_delete_trigger(self, trigger):
418
        trigger_id = str(trigger.id)
419
        if trigger_id not in self._trigger_names:
420
            return
421
422
        self._logger.debug('Calling sensor "remove_trigger" method (trigger.type=%s)' %
423
                           (trigger.type))
424
        del self._trigger_names[trigger_id]
425
426
        trigger = self._sanitize_trigger(trigger=trigger)
427
        self._sensor_instance.remove_trigger(trigger=trigger)
428
429
    def _get_sensor_instance(self):
430
        """
431
        Retrieve instance of a sensor class.
432
        """
433
        _, filename = os.path.split(self._file_path)
434
        module_name, _ = os.path.splitext(filename)
435
436
        sensor_class = loader.register_plugin_class(base_class=Sensor,
437
                                                    file_path=self._file_path,
438
                                                    class_name=self._class_name)
439
440
        if not sensor_class:
441
            raise ValueError('Sensor module is missing a class with name "%s"' %
442
                             (self._class_name))
443
444
        sensor_class_kwargs = {}
445
        sensor_class_kwargs['sensor_service'] = SensorService(sensor_wrapper=self)
446
447
        sensor_config = self._get_sensor_config()
448
        sensor_class_kwargs['config'] = sensor_config
449
450
        if self._poll_interval and issubclass(sensor_class, PollingSensor):
451
            sensor_class_kwargs['poll_interval'] = self._poll_interval
452
453
        try:
454
            sensor_instance = sensor_class(**sensor_class_kwargs)
455
        except Exception:
456
            self._logger.exception('Failed to instantiate "%s" sensor class' % (self._class_name))
457
            raise Exception('Failed to instantiate "%s" sensor class' % (self._class_name))
458
459
        return sensor_instance
460
461
    def _get_sensor_config(self):
462
        config_parser = ContentPackConfigParser(pack_name=self._pack)
463
        config = config_parser.get_sensor_config(sensor_file_path=self._file_path)
0 ignored issues
show
Comprehensibility Bug introduced by
config is re-defining a name which is already available in the outer-scope (previously defined on line 35).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
464
465
        if config:
466
            self._logger.info('Using config "%s" for sensor "%s"' % (config.file_path,
467
                                                                     self._class_name))
468
            return config.config
469
        else:
470
            self._logger.info('No config found for sensor "%s"' % (self._class_name))
471
            return {}
472
473
    def _sanitize_trigger(self, trigger):
474
        sanitized = trigger._data
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _data was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
475
        if 'id' in sanitized:
476
            # Friendly objectid rather than the MongoEngine representation.
477
            sanitized['id'] = str(sanitized['id'])
478
        return sanitized
479
480
481
if __name__ == '__main__':
482
    parser = argparse.ArgumentParser(description='Sensor runner wrapper')
483
    parser.add_argument('--pack', required=True,
484
                        help='Name of the pack this sensor belongs to')
485
    parser.add_argument('--file-path', required=True,
486
                        help='Path to the sensor module')
487
    parser.add_argument('--class-name', required=True,
488
                        help='Name of the sensor class')
489
    parser.add_argument('--trigger-type-refs', required=False,
490
                        help='Comma delimited string of trigger type references')
491
    parser.add_argument('--poll-interval', type=int, default=None, required=False,
492
                        help='Sensor poll interval')
493
    parser.add_argument('--parent-args', required=False,
494
                        help='Command line arguments passed to the parent process')
495
    args = parser.parse_args()
496
497
    trigger_types = args.trigger_type_refs
498
    trigger_types = trigger_types.split(',') if trigger_types else []
499
    parent_args = json.loads(args.parent_args) if args.parent_args else []
500
    assert isinstance(parent_args, list)
501
502
    obj = SensorWrapper(pack=args.pack,
503
                        file_path=args.file_path,
504
                        class_name=args.class_name,
505
                        trigger_types=trigger_types,
506
                        poll_interval=args.poll_interval,
507
                        parent_args=parent_args)
508
    obj.run()
509