Completed
Branch prom (d3fc9e)
by Kenny
01:21
created

BasicRender.process()   A

Complexity

Conditions 1

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 10
rs 9.4285
cc 1
1
# -*- coding: utf-8 -*-
2
"""
3
The Render class is used to "render" metrics into a specific format.
4
5
It is an abstract base class and must be sub-classed for each desired
6
metrics format.
7
8
eg. the graphite plaintext format is supported by GraphiteRenderPlain
9
which sub-classes Render and defines the process() and flush() methods.
10
11
.. moduleauthor:: Kenny Freeman <[email protected]>
12
13
"""
14
15
import collections
16
from plumd import Int, Float, String, Boolean, Render, Meta
17
from Queue import Queue
18
from abc import ABCMeta, abstractmethod
19
20
__author__ = 'Kenny Freeman'
21
__email__ = '[email protected]'
22
__version__ = '0.7'
23
__license__ = "ISCL"
24
__docformat__ = 'reStructuredText'
25
26
27
class BasicRender(Render):
28
    """A plugin that renders metrics and is also iterable.
29
30
    :param rconfig: config.Conf object passed from a writer plugin instance
31
    :type rconfig: :class:`config.Conf`
32
    """
33
34
    __metaclass__ = ABCMeta
35
36
    defaults = {
37
        'meta': {}
38
    }
39
40
    def __init__(self, log, config):
41
        """Create a Render instance.
42
43
        todo: fix me / complete this
44
45
        :param log: A logger
46
        :type log: logging.RootLogger
47
        :param config: a plumd.config.Conf configuration helper instance.
48
        :type config: plumd.config.Conf
49
        """
50
        super(BasicRender, self).__init__(log, config)
51
        # ensure default configuration is set
52
        config.defaults(BasicRender.defaults)
53
        # Use a double ended queue with a maximum size
54
        self.metrics = collections.deque()
55
        # define a metadata object and copy in any configured host metadata
56
        self.meta = Meta()
57
        clookup = {
58
            int: Int,
59
            float: Float,
60
            str: String,
61
            bool: Boolean
62
        }
63
        for entry in config.get('meta'):
64
            for key, val in entry.items():
65
                if val.__class__ not in clookup:
66
                    args = (key, val.__class__.__name__)
67
                    err = "host metadata: unsupported type: {0}={1}".format(*args)
68
                    raise ValueError(err)
69
                self.meta.add(clookup[val.__class__](key, val))
70
        self.buff = ""
71
72
    def __str__(self):
73
        """Return our current metrics buffer."""
74
        return self.buff
75
76
    def __repr__(self):
77
        """Return our current metrics buffer."""
78
        return self.buff
79
80
    def get_metrics(self):
81
        """Return our deque of metrics.
82
83
        :rtype: collections.deque
84
        """
85
        raise NotImplementedError("process must be defined by a subclass")
86
87
    def reset(self):
88
        """Clear our metrics buffer and deque."""
89
        self.metrics.clear()
90
        self.buff = str()
91
92
    @abstractmethod
93
    def process(self, rset):
94
        """Render a result set into our supported format and enqueue it.
95
96
        Subclasses for specific backends must define this method.
97
98
        :param rset: A :class:`ResultSet` object
99
        :type rset: :class:`ResultSet`
100
        """
101
        raise NotImplementedError("process must be defined by a subclass")
102
103
    @abstractmethod
104
    def process_many(self, rsets):
105
        """Render result sets into our supported format and enqueue.
106
107
        Subclasses for specific backends must define this method.
108
109
        :param rsets: An iterable of :class:`ResultSet` objects
110
        :type rsets: iterable(:class:`ResultSet`)
111
        """
112
        raise NotImplementedError("process must be defined by a subclass")
113
114
115
class ChunkedRender(Render):
116
    """A plugin that renders metrics and is also iterable.
117
118
    :param rconfig: config.Conf object passed from a writer plugin instance
119
    :type rconfig: :class:`config.Conf`
120
    """
121
122
    __metaclass__ = ABCMeta
123
124
    defaults = {
125
        # todo: move max items, bytes out of here into graphite render
126
        'max_queue': 2048,      # maxlen of deque used for metrics
127
        'max_items': 64,        # maximum number of items to send at once
128
        'max_bytes': 1400,      # maximum number of bytes to send at once
129
        'meta': {}              # metadata to add to each metric
130
    }
131
132
    def __init__(self, log, config):
133
        """Create a Render instance.
134
135
        :param log: A logger
136
        :type log: logging.RootLogger
137
        :param config: a plumd.config.Conf configuration helper instance.
138
        :type config: plumd.config.Conf
139
        """
140
        super(ChunkedRender, self).__init__(log, config)
141
        # ensure default configuration is set
142
        config.defaults(ChunkedRender.defaults)
143
        self.max_bytes = config.get('max_bytes')
144
        self.max_items = config.get('max_items')
145
        # Use a double ended queue with a maximum size
146
        self.metrics = collections.deque(maxlen=config.get('max_queue'))
147
        # define a metadata object and copy in any configured host metadata
148
        self.meta = Meta()
149
        clookup = {
150
            int: Int,
151
            float: Float,
152
            str: String,
153
            bool: Boolean
154
        }
155
        for entry in config.get('meta'):
156
            for key, val in entry.items():
157
                if val.__class__ not in clookup:
158
                    args = (key, val.__class__.__name__)
159
                    err = "host metadata: unsupported type: {0}={1}".format(*args)
160
                    raise ValueError(err)
161
                self.meta.add(clookup[val.__class__](key, val))
162
        self.queues = dict()
163
164
    def register(self, writer):
165
        """Register the writer plugin name to read from this renderer.
166
167
        This creates a queue internally for the writer and returns it.
168
        Writers read from this queue and it is used exclusively for a given
169
        writer.
170
171
        :param writer: The name of a writer plugin to register.
172
        :type writer: str
173
        :rtype: threading.Queue
174
        """
175
        if writer in self.queues:
176
            err = "duplicate writer registered: {0}"
177
            raise ValueError(err.format(writer))
178
        qobj = Queue(maxsize=self.max_queue)
179
        self.queues[writer] = qobj
180
        return qobj
181
182
    def deregister(self, writer):
183
        """De-Register the writer plugin.
184
185
        This removes the writers queue.
186
187
        :param writer: The name of a writer plugin to register.
188
        :type writer: str
189
        :rtype: threading.Queue
190
        """
191
        if writer not in self.queues:
192
            return
193
        del(self.queues[writer])
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after del.
Loading history...
194
195
    @abstractmethod
196
    def flush(self):
197
        """Return any partially rendered metrics.
198
199
        This is useful for eg. during shutdown when there may be
200
        partially rendered chunks of metrics to send.
201
202
        Subclasses for specific backends can define this method when
203
        appropriate.
204
        """
205
        raise NotImplementedError("process must be defined by a subclass")
206
207
    @abstractmethod
208
    def process(self, rset):
209
        """Render a result set into our supported format and enqueue it.
210
211
        Subclasses for specific backends must define this method.
212
213
        :param rset: A :class:`ResultSet` object
214
        :type rset: :class:`ResultSet`
215
        """
216
        raise NotImplementedError("process must be defined by a subclass")
217
218
    @abstractmethod
219
    def process_many(self, rsets):
220
        """Render result sets into our supported format and enqueue.
221
222
        Subclasses for specific backends must define this method.
223
224
        :param rsets: An iterable of :class:`ResultSet` objects
225
        :type rsets: iterable(:class:`ResultSet`)
226
        """
227
        raise NotImplementedError("process must be defined by a subclass")