Test Failed
Push — master ( 7cfc0c...4c3161 )
by Nicolas
04:02
created

glances.exports.export   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 302
Duplicated Lines 4.3 %

Importance

Changes 0
Metric Value
eloc 164
dl 13
loc 302
rs 8.4
c 0
b 0
f 0
wmc 50

13 Methods

Rating   Name   Duplication   Size   Complexity  
A GlancesExport.parse_tags() 0 16 3
A GlancesExport.__init__() 0 23 1
A GlancesExport.get_item_key() 0 10 3
A GlancesExport.exit() 0 3 1
A GlancesExport._log_result_decorator() 13 13 1
B GlancesExport.load_conf() 0 36 7
A GlancesExport.init_fields() 0 8 2
D GlancesExport.normalize_for_influxdb() 0 62 13
A GlancesExport.plugins_to_export() 0 7 1
A GlancesExport.last_exported_list() 0 3 1
B GlancesExport.update() 0 33 6
A GlancesExport.export() 0 3 1
C GlancesExport.build_export() 0 41 10

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like glances.exports.export often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""
10
I am your father...
11
...for all Glances exports IF.
12
"""
13
14
from glances.globals import NoOptionError, NoSectionError, json_dumps
15
from glances.logger import logger
16
from glances.timer import Counter
17
18
19
class GlancesExport:
20
    """Main class for Glances export IF."""
21
22
    # List of non exportable internal plugins
23
    non_exportable_plugins = [
24
        "alert",
25
        "help",
26
        "plugin",
27
        "psutilversion",
28
        "quicklook",
29
        "version",
30
    ]
31
32
    def __init__(self, config=None, args=None):
33
        """Init the export class."""
34
        # Export name
35
        self.export_name = self.__class__.__module__
36
        logger.debug(f"Init export module {self.export_name}")
37
38
        # Init the config & args
39
        self.config = config
40
        self.args = args
41
42
        # By default export is disabled
43
        # Needs to be set to True in the __init__ class of child
44
        self.export_enable = False
45
46
        # Mandatory for (most of) the export module
47
        self.host = None
48
        self.port = None
49
50
        # Save last export list
51
        self._last_exported_list = None
52
53
        # Fields description
54
        self._fields_description = None
55
56 View Code Duplication
    def _log_result_decorator(fct):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
57
        """Log (DEBUG) the result of the function fct."""
58
59
        def wrapper(*args, **kw):
60
            counter = Counter()
61
            ret = fct(*args, **kw)
62
            duration = counter.get()
63
            class_name = args[0].__class__.__name__
64
            class_module = args[0].__class__.__module__
65
            logger.debug(f"{class_name} {class_module} {fct.__name__} return {ret} in {duration} seconds")
66
            return ret
67
68
        return wrapper
69
70
    def exit(self):
71
        """Close the export module."""
72
        logger.debug(f"Finalise export interface {self.export_name}")
73
74
    def load_conf(self, section, mandatories=["host", "port"], options=None):
75
        """Load the export <section> configuration in the Glances configuration file.
76
77
        :param section: name of the export section to load
78
        :param mandatories: a list of mandatory parameters to load
79
        :param options: a list of optional parameters to load
80
81
        :returns: Boolean -- True if section is found
82
        """
83
        options = options or []
84
85
        if self.config is None:
86
            return False
87
88
        # By default read the mandatory host:port items
89
        try:
90
            for opt in mandatories:
91
                setattr(self, opt, self.config.get_value(section, opt))
92
        except NoSectionError:
93
            logger.error(f"No {section} configuration found")
94
            return False
95
        except NoOptionError as e:
96
            logger.error(f"Error in the {section} configuration ({e})")
97
            return False
98
99
        # Load options
100
        for opt in options:
101
            try:
102
                setattr(self, opt, self.config.get_value(section, opt))
103
            except NoOptionError:
104
                pass
105
106
        logger.debug(f"Load {section} from the Glances configuration file")
107
        logger.debug(f"{section} parameters: { ({opt: getattr(self, opt) for opt in mandatories + options}) }")
108
109
        return True
110
111
    def get_item_key(self, item):
112
        """Return the value of the item 'key'."""
113
        ret = None
114
        try:
115
            ret = item[item["key"]]
116
        except KeyError:
117
            logger.error(f"No 'key' available in {item}")
118
        if isinstance(ret, list):
119
            return ret[0]
120
        return ret
121
122
    def parse_tags(self, tags):
123
        """Parse tags into a dict.
124
125
        :param tags: a comma-separated list of 'key:value' pairs. Example: foo:bar,spam:eggs
126
        :return: a dict of tags. Example: {'foo': 'bar', 'spam': 'eggs'}
127
        """
128
        d_tags = {}
129
        if tags:
130
            try:
131
                d_tags = dict([x.split(":") for x in tags.split(",")])
132
            except ValueError:
133
                # one of the 'key:value' pairs was missing
134
                logger.info("Invalid tags passed: %s", tags)
135
                d_tags = {}
136
137
        return d_tags
138
139
    def normalize_for_influxdb(self, name, columns, points):
140
        """Normalize data for the InfluxDB's data model.
141
142
        :return: a list of measurements.
143
        """
144
        FIELD_TO_TAG = ["name", "cmdline", "type"]
145
        ret = []
146
147
        # Build initial dict by crossing columns and point
148
        data_dict = dict(zip(columns, points))
149
150
        # issue1871 - Check if a key exist. If a key exist, the value of
151
        # the key should be used as a tag to identify the measurement.
152
        keys_list = [k.split(".")[0] for k in columns if k.endswith(".key")]
153
        if not keys_list:
154
            keys_list = [None]
155
156
        for measurement in keys_list:
157
            # Manage field
158
            if measurement is not None:
159
                fields = {
160
                    k.replace(f"{measurement}.", ""): data_dict[k] for k in data_dict if k.startswith(f"{measurement}.")
161
                }
162
            else:
163
                fields = data_dict
164
            # Transform to InfluxDB data model
165
            # https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/
166
            for k in fields:
167
                #  Do not export empty (None) value
168
                if fields[k] is None:
169
                    continue
170
                # Convert numerical to float
171
                try:
172
                    fields[k] = float(fields[k])
173
                except (TypeError, ValueError):
174
                    # Convert others to string
175
                    try:
176
                        fields[k] = str(fields[k])
177
                    except (TypeError, ValueError):
178
                        pass
179
            # Manage tags
180
            tags = self.parse_tags(self.tags)
181
            # Add the hostname as a tag
182
            tags["hostname"] = self.hostname
183
            if "hostname" in fields:
184
                fields.pop("hostname")
185
            # Others tags...
186
            if "key" in fields and fields["key"] in fields:
187
                # Create a tag from the key
188
                # Tag should be an string (see InfluxDB data model)
189
                tags[fields["key"]] = str(fields[fields["key"]])
190
                # Remove it from the field list (can not be a field and a tag)
191
                fields.pop(fields["key"])
192
            # Add name as a tag (example for the process list)
193
            for k in FIELD_TO_TAG:
194
                if k in fields:
195
                    tags[k] = str(fields[k])
196
                    # Remove it from the field list (can not be a field and a tag)
197
                    fields.pop(k)
198
            # Add the measurement to the list
199
            ret.append({"measurement": name, "tags": tags, "fields": fields})
200
        return ret
201
202
    def plugins_to_export(self, stats):
203
        """Return the list of plugins to export.
204
205
        :param stats: the stats object
206
        :return: a list of plugins to export
207
        """
208
        return [p for p in stats.getPluginsList() if p not in self.non_exportable_plugins]
209
210
    def last_exported_list(self):
211
        """Return the list of plugins last exported."""
212
        return self._last_exported_list
213
214
    def init_fields(self, stats):
215
        """Return fields description in order to init stats in a server."""
216
        if not self.export_enable:
217
            return False
218
219
        self._last_exported_list = self.plugins_to_export(stats)
220
        self._fields_description = stats.getAllFieldsDescriptionAsDict(plugin_list=self.last_exported_list())
221
        return self._fields_description
222
223
    def update(self, stats):
224
        """Update stats to a server.
225
226
        The method builds two lists: names and values and calls the export method to export the stats.
227
228
        Note: if needed this class can be overwritten.
229
        """
230
        if not self.export_enable:
231
            return False
232
233
        # Get all the stats & limits
234
        self._last_exported_list = self.plugins_to_export(stats)
235
        all_stats = stats.getAllExportsAsDict(plugin_list=self.last_exported_list())
236
        all_limits = stats.getAllLimitsAsDict(plugin_list=self.last_exported_list())
237
238
        # Loop over plugins to export
239
        for plugin in self.last_exported_list():
240
            if isinstance(all_stats[plugin], dict):
241
                all_stats[plugin].update(all_limits[plugin])
242
                # Remove the <plugin>_disable field
243
                all_stats[plugin].pop(f"{plugin}_disable", None)
244
            elif isinstance(all_stats[plugin], list):
245
                # TypeError: string indices must be integers (Network plugin) #1054
246
                for i in all_stats[plugin]:
247
                    i.update(all_limits[plugin])
248
                    # Remove the <plugin>_disable field
249
                    i.pop(f"{plugin}_disable", None)
250
            else:
251
                continue
252
            export_names, export_values = self.build_export(all_stats[plugin])
253
            self.export(plugin, export_names, export_values)
254
255
        return True
256
257
    def build_export(self, stats):
258
        """Build the export lists.
259
        This method builds two lists: names and values.
260
        """
261
262
        # Initialize export lists
263
        export_names = []
264
        export_values = []
265
266
        if isinstance(stats, dict):
267
            # Stats is a dict
268
            # Is there a key ?
269
            if "key" in stats.keys() and stats["key"] in stats.keys():
270
                pre_key = "{}.".format(stats[stats["key"]])
271
            else:
272
                pre_key = ""
273
            # Walk through the dict
274
            for key, value in sorted(stats.items()):
275
                if isinstance(value, bool):
276
                    value = json_dumps(value).decode()
277
278
                if isinstance(value, list):
279
                    value = " ".join([str(v) for v in value])
280
281
                if isinstance(value, dict):
282
                    item_names, item_values = self.build_export(value)
283
                    item_names = [pre_key + key.lower() + str(i) for i in item_names]
284
                    export_names += item_names
285
                    export_values += item_values
286
                else:
287
                    # We are on a simple value
288
                    export_names.append(pre_key + key.lower())
289
                    export_values.append(value)
290
        elif isinstance(stats, list):
291
            # Stats is a list (of dict)
292
            # Recursive loop through the list
293
            for item in stats:
294
                item_names, item_values = self.build_export(item)
295
                export_names += item_names
296
                export_values += item_values
297
        return export_names, export_values
298
299
    def export(self, name, columns, points):
300
        # This method should be implemented by each exporter
301
        pass
302