glances.exports.export   C
last analyzed

Complexity

Total Complexity 55

Size/Duplication

Total Lines 332
Duplicated Lines 3.92 %

Importance

Changes 0
Metric Value
eloc 180
dl 13
loc 332
rs 6
c 0
b 0
f 0
wmc 55

15 Methods

Rating   Name   Duplication   Size   Complexity  
A GlancesExport.exit() 0 3 1
A GlancesExport._log_result_decorator() 13 13 1
B GlancesExport.update() 0 33 6
A GlancesExport.init_fields() 0 8 2
A GlancesExport.parse_tags() 0 16 3
A GlancesExport.is_excluded() 0 3 1
A GlancesExport.__init__() 0 27 2
A GlancesExport.get_item_key() 0 10 3
D GlancesExport.normalize_for_influxdb() 0 62 13
A GlancesExport.export() 0 3 1
A GlancesExport.plugins_to_export() 0 7 1
B GlancesExport.load_conf() 0 36 7
A GlancesExport.load_common_conf() 0 17 2
A GlancesExport.last_exported_list() 0 3 1
C GlancesExport.build_export() 0 43 11

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like glances.exports.export often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""
10
I am your father...
11
...for all Glances exports IF.
12
"""
13
14
import re
15
16
from glances.globals import NoOptionError, NoSectionError, json_dumps
17
from glances.logger import logger
18
from glances.timer import Counter
19
20
21
class GlancesExport:
22
    """Main class for Glances export IF."""
23
24
    # List of non exportable internal plugins
25
    non_exportable_plugins = [
26
        "alert",
27
        "help",
28
        "plugin",
29
        "psutilversion",
30
        "quicklook",
31
        "version",
32
    ]
33
34
    def __init__(self, config=None, args=None):
35
        """Init the export class."""
36
        # Export name
37
        self.export_name = self.__class__.__module__
38
        logger.debug(f"Init export module {self.export_name}")
39
40
        # Init the config & args
41
        self.config = config
42
        self.args = args
43
44
        # By default export is disabled
45
        # Needs to be set to True in the __init__ class of child
46
        self.export_enable = False
47
48
        # Mandatory for (most of) the export module
49
        self.host = None
50
        self.port = None
51
52
        # Save last export list
53
        self._last_exported_list = None
54
55
        # Fields description
56
        self._fields_description = None
57
58
        # Load the default common export configuration
59
        if self.config is not None:
60
            self.load_common_conf()
61
62 View Code Duplication
    def _log_result_decorator(fct):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
63
        """Log (DEBUG) the result of the function fct."""
64
65
        def wrapper(*args, **kw):
66
            counter = Counter()
67
            ret = fct(*args, **kw)
68
            duration = counter.get()
69
            class_name = args[0].__class__.__name__
70
            class_module = args[0].__class__.__module__
71
            logger.debug(f"{class_name} {class_module} {fct.__name__} return {ret} in {duration} seconds")
72
            return ret
73
74
        return wrapper
75
76
    def exit(self):
77
        """Close the export module."""
78
        logger.debug(f"Finalise export interface {self.export_name}")
79
80
    def load_common_conf(self):
81
        """Load the common export configuration in the Glances configuration file.
82
83
        :returns: Boolean -- True if section is found
84
        """
85
        # Read the common [export] section
86
        section = "export"
87
88
        opt = "exclude_fields"
89
        try:
90
            setattr(self, opt, self.config.get_list_value(section, opt))
91
        except NoOptionError:
92
            logger.debug(f"{opt} option not found in the {section} configuration section")
93
94
        logger.debug(f"Load common {section} from the Glances configuration file")
95
96
        return True
97
98
    def load_conf(self, section, mandatories=["host", "port"], options=None):
99
        """Load the export <section> configuration in the Glances configuration file.
100
101
        :param section: name of the export section to load
102
        :param mandatories: a list of mandatory parameters to load
103
        :param options: a list of optional parameters to load
104
105
        :returns: Boolean -- True if section is found
106
        """
107
        options = options or []
108
109
        if self.config is None:
110
            return False
111
112
        # By default read the mandatory host:port items
113
        try:
114
            for opt in mandatories:
115
                setattr(self, opt, self.config.get_value(section, opt))
116
        except NoSectionError:
117
            logger.error(f"No {section} configuration found")
118
            return False
119
        except NoOptionError as e:
120
            logger.error(f"Error in the {section} configuration ({e})")
121
            return False
122
123
        # Load options
124
        for opt in options:
125
            try:
126
                setattr(self, opt, self.config.get_value(section, opt))
127
            except NoOptionError:
128
                logger.debug(f"{opt} option not found in the {section} configuration section")
129
130
        logger.debug(f"Load {section} from the Glances configuration file")
131
        logger.debug(f"{section} parameters: { ({opt: getattr(self, opt) for opt in mandatories + options}) }")
132
133
        return True
134
135
    def get_item_key(self, item):
136
        """Return the value of the item 'key'."""
137
        ret = None
138
        try:
139
            ret = item[item["key"]]
140
        except KeyError:
141
            logger.error(f"No 'key' available in {item}")
142
        if isinstance(ret, list):
143
            return ret[0]
144
        return ret
145
146
    def parse_tags(self, tags):
147
        """Parse tags into a dict.
148
149
        :param tags: a comma-separated list of 'key:value' pairs. Example: foo:bar,spam:eggs
150
        :return: a dict of tags. Example: {'foo': 'bar', 'spam': 'eggs'}
151
        """
152
        d_tags = {}
153
        if tags:
154
            try:
155
                d_tags = dict([x.split(":") for x in tags.split(",")])
156
            except ValueError:
157
                # one of the 'key:value' pairs was missing
158
                logger.info("Invalid tags passed: %s", tags)
159
                d_tags = {}
160
161
        return d_tags
162
163
    def normalize_for_influxdb(self, name, columns, points):
164
        """Normalize data for the InfluxDB's data model.
165
166
        :return: a list of measurements.
167
        """
168
        FIELD_TO_TAG = ["name", "cmdline", "type"]
169
        ret = []
170
171
        # Build initial dict by crossing columns and point
172
        data_dict = dict(zip(columns, points))
173
174
        # issue1871 - Check if a key exist. If a key exist, the value of
175
        # the key should be used as a tag to identify the measurement.
176
        keys_list = [k.split(".")[0] for k in columns if k.endswith(".key")]
177
        if not keys_list:
178
            keys_list = [None]
179
180
        for measurement in keys_list:
181
            # Manage field
182
            if measurement is not None:
183
                fields = {
184
                    k.replace(f"{measurement}.", ""): data_dict[k] for k in data_dict if k.startswith(f"{measurement}.")
185
                }
186
            else:
187
                fields = data_dict
188
            # Transform to InfluxDB data model
189
            # https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/
190
            for k in fields:
191
                #  Do not export empty (None) value
192
                if fields[k] is None:
193
                    continue
194
                # Convert numerical to float
195
                try:
196
                    fields[k] = float(fields[k])
197
                except (TypeError, ValueError):
198
                    # Convert others to string
199
                    try:
200
                        fields[k] = str(fields[k])
201
                    except (TypeError, ValueError):
202
                        pass
203
            # Manage tags
204
            tags = self.parse_tags(self.tags)
205
            # Add the hostname as a tag
206
            tags["hostname"] = self.hostname
207
            if "hostname" in fields:
208
                fields.pop("hostname")
209
            # Others tags...
210
            if "key" in fields and fields["key"] in fields:
211
                # Create a tag from the key
212
                # Tag should be an string (see InfluxDB data model)
213
                tags[fields["key"]] = str(fields[fields["key"]])
214
                # Remove it from the field list (can not be a field and a tag)
215
                fields.pop(fields["key"])
216
            # Add name as a tag (example for the process list)
217
            for k in FIELD_TO_TAG:
218
                if k in fields:
219
                    tags[k] = str(fields[k])
220
                    # Remove it from the field list (can not be a field and a tag)
221
                    fields.pop(k)
222
            # Add the measurement to the list
223
            ret.append({"measurement": name, "tags": tags, "fields": fields})
224
        return ret
225
226
    def is_excluded(self, field):
227
        """Return true if the field is excluded."""
228
        return hasattr(self, 'exclude_fields') and any(re.fullmatch(i, field, re.I) for i in self.exclude_fields)
229
230
    def plugins_to_export(self, stats):
231
        """Return the list of plugins to export.
232
233
        :param stats: the stats object
234
        :return: a list of plugins to export
235
        """
236
        return [p for p in stats.getPluginsList() if p not in self.non_exportable_plugins]
237
238
    def last_exported_list(self):
239
        """Return the list of plugins last exported."""
240
        return self._last_exported_list
241
242
    def init_fields(self, stats):
243
        """Return fields description in order to init stats in a server."""
244
        if not self.export_enable:
245
            return False
246
247
        self._last_exported_list = self.plugins_to_export(stats)
248
        self._fields_description = stats.getAllFieldsDescriptionAsDict(plugin_list=self.last_exported_list())
249
        return self._fields_description
250
251
    def update(self, stats):
252
        """Update stats to a server.
253
254
        The method builds two lists: names and values and calls the export method to export the stats.
255
256
        Note: if needed this class can be overwritten.
257
        """
258
        if not self.export_enable:
259
            return False
260
261
        # Get all the stats & limits
262
        self._last_exported_list = self.plugins_to_export(stats)
263
        all_stats = stats.getAllExportsAsDict(plugin_list=self.last_exported_list())
264
        all_limits = stats.getAllLimitsAsDict(plugin_list=self.last_exported_list())
265
266
        # Loop over plugins to export
267
        for plugin in self.last_exported_list():
268
            if isinstance(all_stats[plugin], dict):
269
                all_stats[plugin].update(all_limits[plugin])
270
                # Remove the <plugin>_disable field
271
                all_stats[plugin].pop(f"{plugin}_disable", None)
272
            elif isinstance(all_stats[plugin], list):
273
                # TypeError: string indices must be integers (Network plugin) #1054
274
                for i in all_stats[plugin]:
275
                    i.update(all_limits[plugin])
276
                    # Remove the <plugin>_disable field
277
                    i.pop(f"{plugin}_disable", None)
278
            else:
279
                continue
280
            export_names, export_values = self.build_export(all_stats[plugin])
281
            self.export(plugin, export_names, export_values)
282
283
        return True
284
285
    def build_export(self, stats):
286
        """Build the export lists.
287
        This method builds two lists: names and values.
288
        """
289
290
        # Initialize export lists
291
        export_names = []
292
        export_values = []
293
294
        if isinstance(stats, dict):
295
            # Stats is a dict
296
            # Is there a key ?
297
            if "key" in stats and stats["key"] in stats:
298
                pre_key = "{}.".format(stats[stats["key"]])
299
            else:
300
                pre_key = ""
301
            # Walk through the dict
302
            for key, value in sorted(stats.items()):
303
                if isinstance(value, bool):
304
                    value = json_dumps(value).decode()
305
306
                if isinstance(value, list):
307
                    value = " ".join([str(v) for v in value])
308
309
                if isinstance(value, dict):
310
                    item_names, item_values = self.build_export(value)
311
                    item_names = [pre_key + key.lower() + str(i) for i in item_names]
312
                    export_names += item_names
313
                    export_values += item_values
314
                else:
315
                    # We are on a simple value
316
                    if self.is_excluded(pre_key + key.lower()):
317
                        continue
318
                    export_names.append(pre_key + key.lower())
319
                    export_values.append(value)
320
        elif isinstance(stats, list):
321
            # Stats is a list (of dict)
322
            # Recursive loop through the list
323
            for item in stats:
324
                item_names, item_values = self.build_export(item)
325
                export_names += item_names
326
                export_values += item_values
327
        return export_names, export_values
328
329
    def export(self, name, columns, points):
330
        # This method should be implemented by each exporter
331
        pass
332