config_plugin_writers()   D
last analyzed

Complexity

Conditions 8

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 1 Features 0
Metric Value
c 2
b 1
f 0
dl 0
loc 29
rs 4
cc 8
1
# -*- coding: utf-8 -*-
2
"""Various helper classes.
3
4
.. moduleauthor:: Kenny Freeman <[email protected]>
5
6
"""
7
import imp
8
import sys
9
import os
10
import os.path
11
import time
12
import fcntl
13
import socket
14
import signal
15
import string
16
import inspect
17
import platform
18
import traceback
19
from collections import deque
20
21
import plumd
22
23
__author__ = 'Kenny Freeman'
24
__email__ = '[email protected]'
25
__license__ = "ISCL"
26
__docformat__ = 'reStructuredText'
27
28
PY3 = sys.version_info > (3,)
29
30
31
# use CNT_LIMIT to detect counters wrapping over
32
# eg. ('64bit', 'ELF')
33
arch = platform.architecture()[0]
34
if arch == '64bit':
35
    MAX_CNT = 18446744073709551615
36
elif arch == '32bit':
37
    MAX_CNT = 4294967295
38
else:
39
    MAX_CNT = long(sys.maxint)
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'long'
Loading history...
Bug introduced by
The Module sys does not seem to have a member named maxint.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
40
41
42
def get_hostname():
43
    """Return the hostname of the system the code is running on. Eventually this
44
    may include more checks.
45
46
    :rtype: str
47
    """
48
    return socket.gethostname().split(".")[0]
49
50
51
def get_file_map(fname, col, skip):
52
    """Return a dict representation of a file - use on small files only.
53
54
    eg. file contents of:
55
56
    8       1 sda1 284 166 2673 1724 58 18 9056 369 0 1122 2091
57
58
    with a call of col=2, skip=0 returns:
59
60
    {"sda1": [8, 1, 284, 166, 2673, 1724, 58, 18, 9056, 369, 0, 1122, 2091]}
61
62
    :param fname: Full pathname to a file.
63
    :type fname: str
64
    :param col: Column number to use as the map
65
    :type col: int
66
    :param skip: Skip this many lines of the file.
67
    :type skip: int
68
    :rtype: dict
69
    """
70
    fdat = {}
71
    if os.access(fname, os.R_OK):
72
        with open(fname, 'r') as filed:
73
            dat = filed.read().split("\n")
74
            for line in dat[skip:]:
75
                vals = line.split()
76
                if len(vals) > col:
77
                    map_val = vals.pop(col)
78
                    fdat[map_val] = deque(vals)
79
    return fdat
80
81 View Code Duplication
82
def get_file_map_list(fname, col, skip):
83
    """Return a dict representation of a file - use on small files only.
84
85
    eg. file contents of:
86
87
    8       1 sda1 284 166 2673 1724 58 18 9056 369 0 1122 2091
88
89
    with a call of col=2, skip=0 returns:
90
91
    {"sda1": [8, 1, 284, 166, 2673, 1724, 58, 18, 9056, 369, 0, 1122, 2091]}
92
93
    :param fname: Full pathname to a file.
94
    :type fname: str
95
    :param col: Column number to use as the map
96
    :type col: int
97
    :param skip: Skip this many lines of the file.
98
    :type skip: int
99
    :rtype: dict
100
    """
101
    fdat = {}
102
    if os.access(fname, os.R_OK):
103
        with open(fname, 'r') as filed:
104
            dat = filed.read().split("\n")
105
            for line in dat[skip:]:
106
                vals = line.split()
107
                if len(vals) > col:
108
                    map_val = vals.pop(col)
109
                    fdat[map_val] = vals
110
    return fdat
111
112 View Code Duplication
113
def get_file_list(fname):
114
    """Return the file as a list - use on small files only.
115
116
    :param fname: Full pathname to a file.
117
    :type fname: str
118
    :rtype: dict
119
    """
120
    if os.access(fname, os.R_OK):
121
        with open(fname, 'r') as filed:
122
            return deque(filed.read().strip().split("\n"))
123
    return deque([''])
124
125
126
def get_file(fname):
127
    """Return the file as a string - use on small files only.
128
129
    :param fname: Full pathname to a file.
130
    :type fname: str
131
    :rtype: dict
132
    """
133
    if os.access(fname, os.R_OK):
134
        with open(fname, 'r') as filed:
135
            return filed.read()
136
    return ''
137
138
139
class Interval(object):
140
    """A simple helper class to ensure a loop occours every ltime seconds
141
    while optionally providing a means to interupt the loop with a
142
    :class:`threading.Event`.
143
144
145
    :param ltime: The target duration of a loop
146
    :type ltime: int or float
147
    :param evt: An event object that will cause the loop to complete early if
148
        the event is triggered while waiting.
149
    :type evt: threading.Event
150
    :raises: ValueError if the passed loop time is not an int
151
    :raises: ValueError if the passed loop time is not a float
152
    :raises: ValueError if the passed loop time is < 0
153
154
    As an example, use it like this to loop on an interval:
155
156
    :Example:
157
158
        >>> import threading
159
        >>> evt = threading.Event()
160
        >>> while loop:
161
        >>>     with Interval(111.1, evt) as timer:
162
        >>>         do_things_here()
163
164
    .. note:: if the do_things_here() call returns before the loop timer
165
        completes the timer object will either evt.wait() if evt was defined or
166
        time.sleep() for the time remaining in the interval.
167
    """
168
169
    def __init__(self, ltime, evt=None):
170
        """A simple helper class to ensure a loop occours every ltime seconds
171
        while optionally providing a means to interupt the loop with a
172
        :class:`threading.Event`.
173
174
175
        :param ltime: The target duration of a loop
176
        :type ltime: int or float
177
        :param evt: An event object that will cause the loop to complete early
178
            if the event is triggered while waiting.
179
        :type evt: threading.Event
180
        """
181
        if ltime is None or ltime < 0:
182
            raise ValueError("an interval must be defined")
183
        self.ltime = float(ltime)
184
        self.evt = evt
185
        self.lstart = None
186
187
    @property
188
    def remaining(self):
189
        """Return the remaining time in the loop.
190
191
        :rtype: float
192
        """
193
        if self.lstart is not None:
194
            return self.ltime - (time.time() - self.lstart)
195
        return 0.0
196
197
    @property
198
    def elapsed(self):
199
        """Return the elapsed time of the loop.
200
201
        :rtype: float
202
        """
203
        if self.lstart is not None:
204
            return time.time() - self.lstart
205
        return 0.0
206
207
    def __str__(self):
208
        """Return a nicely formatted string.
209
210
        :rtype: str
211
        """
212
        elapsed = None
213
        if self.lstart is not None:
214
            elapsed = time.time() - self.lstart
215
        sval = "Interval {0} seconds, {1} elapsed, {2} started"
216
        return sval.format(self.ltime, elapsed, self.lstart)
217
218
    def __repr__(self):
219
        """Return a nicely formatted string.
220
221
        :rtype: str
222
        """
223
        elapsed = None
224
        if self.lstart is not None:
225
            elapsed = time.time() - self.lstart
226
        sval = "Interval {0} seconds, {1} elapsed, {2} started"
227
        return sval.format(self.ltime, elapsed, self.lstart)
228
229
    def __enter__(self):
230
        """Called on entrance to with() statement - simply records loop start
231
        time.
232
233
        :rtype: Interval
234
        """
235
        self.lstart = time.time()
236
        return self
237
238
    def __exit__(self, *args):
239
        """Called on exit from a with() statement and either waits on the
240
        self.evt :class:`threading.Event` event if defined or time.sleep()'s
241
        the remaining interval time.
242
        """
243
        if self.lstart is None:
244
            self.lstart = time.time()
245
        # block on the event for the remaining time, if any
246
        wtime = self.ltime - (time.time() - self.lstart)
247
        if wtime > 0:
248
            if self.evt is not None:
249
                self.evt.wait(wtime)
250
            else:
251
                time.sleep(wtime)
252
253
254
class SignalWaiter(object):
255
    """A helper class that makes waiting on signals very easy. Define the
256
    list of signals to wait for and to ignore and then call wait(). Instead of
257
    using signal handler functions it uses the signal.set_wakeup_fd() function
258
    and blocks on read().
259
260
    See http://www.pocketnix.org/doc/Fighting_set__wakeup__fd/ and others for
261
    more details.
262
263
    :param sig_wait: :class:`list` signals to wait for
264
    :type sig_wait: list
265
    :param sig_ignore: :class:`list` signals to ignore
266
    :type sig_ignore: list
267
    """
268
269
    def __init__(self, sig_wait, sig_ignore):
270
        """Create a :class:`SignalWaiter`"""
271
        self.sig_wait = sig_wait
272
        self.sig_ignore = sig_ignore
273
274
        # use a pipe to listen for signals
275
        self.rpipe, self.wpipe = os.pipe()
276
        # set non blocking mode on the write end of the pipe
277
        flags = fcntl.fcntl(self.wpipe, fcntl.F_GETFL, 0)
278
        flags = flags | os.O_NONBLOCK
279
        flags = fcntl.fcntl(self.wpipe, fcntl.F_SETFL, flags)
280
281
        # set the write end as a signal wakeup fd - we read signals from it
282
        signal.set_wakeup_fd(self.wpipe)
283
284
        # install dummy signal handlers that do nothing
285
        for signum in self.sig_wait:
286
            signal.signal(signum, lambda x, y: None)
287
288
        # install the ignore signal handler for sig_ignore signals
289
        for signum in self.sig_ignore:
290
            signal.signal(signum, signal.SIG_IGN)
291
292
        # now a call to wait() blocks the caller until a signal is received
293
        # only the signals we've registered handlers for will trigger
294
295
    def wait(self):
296
        """Block the calling thread until signal(s) are received.
297
298
        :rtype: :class:`int` or :class:`Exception`
299
        """
300
        try:
301
            return os.read(self.rpipe, 1)
302
        except Exception as exc:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
303
            return exc
304
305
306
class Filter(object):
307
    """A helper class that efficiently filters characters from strings.
308
309
    :param keep_chars: A string containing all characters to keep in strings
310
    :type keep_chars: str
311
    """
312
313
    def __init__(self, keep_chars=None):
314
        """Create a :class:`Filter`"""
315
        if not keep_chars:
316
            # translate is different in python3, so is string
317
            if PY3:
318
                keep_chars = string.ascii_letters + string.digits + "_-"
319
            else:
320
                keep_chars = string.letters + string.digits + "_-"
0 ignored issues
show
Bug introduced by
The Module string does not seem to have a member named letters.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
321
        self.filter_chars = "".join(char for char in
322
                                    [chr(i) for i in range(256)]
323
                                    if char not in keep_chars)
324
        self.table = dict.fromkeys([ord(i) for i in self.filter_chars], None)
325
        if PY3:
326
            self.process = self.process_py3
327
        else:
328
            self.process = self.process_py2
329
330
    def process_py3(self, filter_str):
331
        """Return a filtered string, removes filter_char's from it.
332
333
        :rtype: str
334
        """
335
        return filter_str.translate(self.table)
336
337
    def process_py2(self, filter_str):
338
        """Return a filtered string, removes filter_char's from it.
339
340
        :rtype: str
341
        """
342
        return filter_str.translate(None, self.filter_chars)
343
344
345
def load_instance(log, cname, mod, cobj):
346
    """Return an instance of the class cname from the specified module.
347
348
    :param log: A logger
349
    :type log: logging.RootLogger
350
    :param cname: The class name to load
351
    :type cname: str
352
    :param mod: A module object from eg. imp.load_source or __import__
353
    :type mod: module
354
    :param cobj: A conf configuration helper instance to pass to the class
355
    :type cobj: conf
356
    :rtype: Object -- an instance of the class requested
357
    :raises: PluginLoadError: many things can go wrong during loading
358
    """
359
    obj = None
360
    # find the class in the module
361
    for n, d in inspect.getmembers(mod, inspect.isclass):
362
        if n == cname:
363
            # try to instantiate the class
364
            try:
365
                obj = d(log, cobj)
366
            # plugins may raise pretty much any exception
367
            except Exception as e:
368
                tb = traceback.format_exc()
369
                estr = "class {0}, module {1} raised: {2}, trace:{3}"
370
                eargs = [cname, mod.__name__, e, tb]
371
                raise plumd.PluginLoadError(estr.format(*eargs))
0 ignored issues
show
Coding Style introduced by
Usage of * or ** arguments should usually be done with care.

Generally, there is nothing wrong with usage of * or ** arguments. For readability of the code base, we suggest to not over-use these language constructs though.

For more information, we can recommend this blog post from Ned Batchelder including its comments which also touches this aspect.

Loading history...
372
            break
373
    # the module does not have the expected class in it
374
    if obj is None:
375
        estr = "{0} does not define class {1}".format(mod.__name__, cname)
376
        raise plumd.PluginLoadError(estr)
377
    return obj
378
379
380
def load_file_instance(log, cname, fname, cobj):
381
    """Import the class cname from the python file fname and return
382
    an instance of it.
383
384
    raises:
385
        PluginNotFoundError if the fname doesn't exist.
386
387
    :param log: A logger
388
    :type log: logging.RootLogger
389
    :param cname: The name of the class to load
390
    :type cname: str
391
    :param fname: The full path to the python file to load from
392
    :type fname: str
393
    :param cobj: A conf configuration helper instance to pass to the class
394
    :type cobj: conf
395
    :rtype: Object -- an instance of the class requested
396
    :raises: PluginNotFoundError
397
    """
398
    mod = None
399
    if os.path.isfile(fname):
400
        # load the module that has the same name as the file (minus .py)
401
        mod = imp.load_source(cname, fname)
402
    else:
403
        raise plumd.PluginNotFoundError(
404
            "file {0} does not exist".format(fname))
405
    return load_instance(log, cname, mod, cobj)
406
407
408
def load_module_instance(log, cname, mname, cobj):
409
    """Import the class cname from the python module mname and return
410
    an instance of it.
411
412
    raises:
413
        PluginLoadError - so many things can go wrong loading plugins..
414
415
    :param log: A logger
416
    :type log: logging.RootLogger
417
    :param cname: The name of the class to load
418
    :type cname: str
419
    :param mname: The name of the module to load from
420
    :type fname: str
421
    :param cobj: A conf configuration helper instance to pass to the class
422
    :type cobj: conf
423
    :rtype: Object -- an instance of the class requested
424
    :raises: PluginLoadError
425
    """
426
    mod = None
427
    fname = None
428
    if "." in mname:
429
        fname = mname
430
        log.debug("importing {0} from {1}".format(mname, fname))
431
    else:
432
        log.debug("importing {0}".format(mname))
433
    try:
434
        mod = __import__(mname.strip(), fromlist=fname)
435
    except Exception as e:
436
        msg = "module {0} import failed: {1}".format(mname, e)
437
        raise plumd.PluginLoadError(msg)
438
    return load_instance(log, cname, mod, cobj)
439
440
441
def load_plugin(log, pconf):
442
    """Load the configured plugin and return an instance of it.
443
444
    The configuration file for pconf must define at least:
445
446
    name: <name for this instance>          # a uniq name to give this instance
447
    module: <name of module to load from>   # the module name to load
448
    file: <full path to file to load from>  # or, the filename to load
449
    pclass: <name of class to load>         # name of the class to load
450
451
    plus any plugin specific configuration. You can instantiate multiple
452
    instances of a plugin with different configurations however must ensure
453
    that they are configured with uniq names.
454
455
    Also, if both file and module are defined in the configuration the file
456
    configuration is used and module is ignored.
457
458
    Raises:
459
        ConfigError if 'file' and 'module' sources are both configured.
460
        ConfigError if the configuration is missing 'name'.
461
        PluginLoadError if the configured file/module is not found.
462
        PluginLoadError if it was unable to instantiate the requested class.
463
464
    :param log: A logger
465
    :type log: logging.RootLogger
466
    :param pconf: a configuration object that defines the plugin to load
467
    :type pconf: plumd.config.conf
468
    :raises: PluginLoadError, ConfigError
469
    """
470
    fname = pconf.get('file')                   # full path to python file
471
    mname = pconf.get('module')                 # module name
472
    cname = pconf.get('class', exception=True)  # class name
473
    instance = None
474
475
    if fname:
476
        instance = load_file_instance(log, cname, fname, pconf)
477
    elif mname:
478
        try:
479
            __import__(mname.strip())
480
        except ImportError as e:
481
            estr = "module not found: {0} : {1} : {2}"
482
            raise plumd.PluginLoadError(estr.format(mname, pconf.path, e))
483
        instance = load_module_instance(log, cname, mname, pconf)
484
485
    if instance is None:
486
        raise plumd.PluginLoadError("load failed for: {0}".format(pconf.path))
487
488
    return instance
489
490
491
def load_from_conf(log, config, pconf):
492
    """Load a plugin as defined by pconf configuration file and return an
493
    instance of it.
494
495
    raises:
496
        ConfigError if ptype has an incorrect configuration path set
497
        ConfigError if a plugin configuration is missing 'name'
498
        DuplicatePlugin if a plugin configuration has a duplicate 'name'
499
        PluginLoadError if there was an error loading the plugin
500
501
    :param log: A logger
502
    :type log: logging.RootLogger
503
    :param config: a :class:`plumd.config.Conf` instance
504
    :type config: plumd.config.Conf
505
    :param pconf: Full path to a plugin configuration.
506
    :type pconf: str
507
    :rtype: object
508
    :raises: ConfigError, DuplicatePlugin
509
    """
510
    # create a configuration and pass on select values from the main config
511
    defaults = {
512
        'poll.interval': config.get('poll.interval'),
513
        'delay.poll': config.get('delay.poll'),
514
        'meta': config.get('meta')
515
    }
516
    pconfig = plumd.config.Conf(pconf).defaults(defaults)
517
    # check to see if the plugin is disabled, if so return here
518
    if not pconfig.get('enabled', default=True):
519
        return
520
521
    pname = pconfig.get("name", exception=True)
522
523
    # load the plugin
524
    log.debug("loading {0} from {1}".format(pname, pconfig.path))
525
    return load_plugin(log, pconfig)
526
527
528
def get_plugins_dict(log, plugins, pclass):
0 ignored issues
show
Unused Code introduced by
The argument log seems to be unused.
Loading history...
529
    """Return a dict of plugins that are of type pclass where the dict
530
    keys are the plugins name and the values are the plugin object.
531
532
    raises:
533
        DuplicatePlugin if a plugin has a duplicate 'name'
534
535
    :param log: A logger
536
    :type log: logging.RootLogger
537
    :param plugins: A list of plugin objects.
538
    :type plugins: list
539
    :param pclas: A class eg. :class:`plumd.Reader`, :class:`plumd.Writer`
540
    :type pclass: type
541
    :rtype: dict
542
    :raises: DuplicatePlugin
543
    """
544
    plugs = {}
545
    for pobj in plugins:
546
        # get the list of base/super classes for the plugin
547
        pclasses = inspect.getmro(pobj.__class__)
548
549
        # ensure the plugin is a subclass of the requested class
550
        if pclass in pclasses:
551
            pconf = pobj.config
552
            pname = pconf.get('name', exception=True)
553
            if pname in plugs:
554
                err = "duplicate plugin: {0} from {1}"
555
                raise plumd.DuplicatePlugin(err.format(pname, pconf.path))
556
            plugs[pname] = pobj
557
    return plugs
558
559
560
def load_all_plugins(log, config):
561
    """Return a tuple of reader and writer plugin dicts.
562
563
    raises:
564
        ConfigError if ptype has an incorrect configuration path set
565
        ConfigError if a plugin configuration is missing 'name'
566
        DuplicatePlugin if a plugin configuration has a duplicate 'name'
567
        PluginLoadError if there was an error loading the plugin
568
569
    Returns a tuple of: (readers, writers) where each is a dict of:
570
    {'name': <plugin_instance>}. Name is the configured plugin name and
571
    <plugin_instance> is the corresponding instance of the plugin. Note plugins
572
    can be insantiated multiple times with different configurations.
573
574
    :param log: A logger
575
    :type log: logging.RootLogger
576
    :param config: a :class:`plumd.config.Conf` instance
577
    :type config: plumd.config.Conf
578
    :rtype: tuple
579
    :raises: ConfigError, DuplicatePlugin, PluginLoadError
580
    """
581
    plugins = []
582
    # get a list of configuration files
583
    # note the call to ls can raise an exception which we pass upwards
584
    for pconf in plumd.config.find(config.get('config.plugins'), 'yaml'):
585
        # load the plugin object
586
        pobj = load_from_conf(log, config, pconf)
587
588
        # check if the plugin has been disabled in configuration
589
        if pobj is None:
590
            msg = "skipping disabled plugin: {0}"
591
            log.info(msg.format(pconf))
592
            continue
593
594
        plugins.append(pobj)
595
596
    log.debug("plugins: {0}".format(
597
        " ".join([p.__class__.__name__ for p in plugins])))
598
599
    # we iterate over the plugin list multiple times..
600
    # since the list is small and there are not many types this is ok.
601
    readers = get_plugins_dict(log, plugins, plumd.Reader)
602
    writers = get_plugins_dict(log, plugins, plumd.Writer)
603
    log.info("readers: {0}".format(" ".join(readers.keys())))
604
    log.info("writers: {0}".format(" ".join(writers.keys())))
605
606
    return (readers, writers)
607
608
609
def config_plugin_writers(lobj):
610
    """Update the list of writers for each reader in the PluginLoader.
611
612
    todo: change lobj to a tuple of (readers,writers)
613
614
    :param lobj: a :class:`plumd.PluginLoader` object
615
    :type lobj: plumd.PluginLoader
616
617
    :raises: ConfigError
618
    """
619
    allw = [(wobj.queue_evt, wobj.queue) for wobj in lobj.writers.values()]
620
    # now set the PluginReaders writers
621
    for prname, probj in lobj.readers.items():
622
        wcfg = probj.config.get('writers')
623
        # if nothing is configured the reader writes to all
624
        if wcfg is None:
625
            probj.queues = allw
626
            continue
627
628
        # get list of writer names
629
        wnames = [w for w in wcfg if w in lobj.writers]
630
        if wnames:
631
            probj.queues = [(lobj.writers[w].queue_evt, lobj.writers[w].queue)
632
                            for w in wnames]
633
            continue
634
635
        msg = "reader {0} from {1} has no valid writers configured"
636
        lobj.log.error(msg.format(prname, probj.conf.path))
637
        sys.exit(1)
638
639
640
class Differential(object):
641
    """Compute the rate of change of a set of metrics."""
642
643
    def __init__(self):
644
        """Compute the rate of change of a set of metrics."""
645
        self.metrics = {}
646
647
    def per_second(self, name, val, timestamp):
648
        """Record and return the rate of change of a metric in units/second.
649
650
        Assumes values will be < MAX_CNT and detects and corrects overflow.
651
652
        :param name: The name of a metric
653
        :type name: str
654
        :param val: The value of a metric
655
        :type val: int or float
656
        :param max: The maximum possible value for val
657
        :type val: int or float
658
        :rtype: int or float
659
        """
660
        ret = type(val)(0)
661
        if name not in self.metrics:
662
            self.metrics[name] = (val, timestamp)
663
        else:
664
            # get the previous value
665
            pval, ptime = self.metrics[name]
666
            self.metrics[name] = (val, timestamp)
667
            # check for counter wrap
668
            if val < pval:
669
                pval = pval - MAX_CNT
670
            dval = val - pval
671
            dtime = timestamp - ptime
672
            if dtime > 0:
673
                ret = float(dval) / float(dtime)
674
            else:
675
                ret = 0
676
        return ret
677