Test Failed
Push — master ( ee826a...d9056e )
by Nicolas
03:09
created

glances.exports.glances_influxdb   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 167
Duplicated Lines 46.11 %

Importance

Changes 0
Metric Value
eloc 93
dl 77
loc 167
rs 10
c 0
b 0
f 0
wmc 26

4 Methods

Rating   Name   Duplication   Size   Complexity  
B Export.init() 0 33 6
D Export._normalize() 61 61 13
A Export.export() 16 16 5
A Export.__init__() 0 27 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""InfluxDB (up to InfluxDB 1.7.x) interface class."""
11
12
import sys
13
from platform import node
14
15
from glances.logger import logger
16
from glances.exports.export import GlancesExport
17
18
from influxdb import InfluxDBClient
19
from influxdb.client import InfluxDBClientError
20
21
FIELD_TO_TAG = ['name', 'cmdline']
22
23
24
class Export(GlancesExport):
25
    """This class manages the InfluxDB export module."""
26
27
    def __init__(self, config=None, args=None):
28
        """Init the InfluxDB export IF."""
29
        super(Export, self).__init__(config=config, args=args)
30
31
        # Mandatory configuration keys (additional to host and port)
32
        self.user = None
33
        self.password = None
34
        self.db = None
35
36
        # Optional configuration keys
37
        self.protocol = 'http'
38
        self.prefix = None
39
        self.tags = None
40
        self.hostname = None
41
42
        # Load the InfluxDB configuration file
43
        self.export_enable = self.load_conf(
44
            'influxdb', mandatories=['host', 'port', 'user', 'password', 'db'], options=['protocol', 'prefix', 'tags']
45
        )
46
        if not self.export_enable:
47
            exit('Missing INFLUXDB version 1 config')
48
49
        # The hostname is always add as a tag
50
        self.hostname = node().split('.')[0]
51
52
        # Init the InfluxDB client
53
        self.client = self.init()
54
55
    def init(self):
56
        """Init the connection to the InfluxDB server."""
57
        if not self.export_enable:
58
            return None
59
60
        # Correct issue #1530
61
        if self.protocol is not None and (self.protocol.lower() == 'https'):
62
            ssl = True
63
        else:
64
            ssl = False
65
66
        try:
67
            db = InfluxDBClient(
68
                host=self.host,
69
                port=self.port,
70
                ssl=ssl,
71
                verify_ssl=False,
72
                username=self.user,
73
                password=self.password,
74
                database=self.db,
75
            )
76
            get_all_db = [i['name'] for i in db.get_list_database()]
77
        except InfluxDBClientError as e:
78
            logger.critical("Cannot connect to InfluxDB database '%s' (%s)" % (self.db, e))
79
            sys.exit(2)
80
81
        if self.db in get_all_db:
0 ignored issues
show
introduced by
The variable get_all_db does not seem to be defined for all execution paths.
Loading history...
82
            logger.info("Stats will be exported to InfluxDB server: {}".format(db._baseurl))
83
        else:
84
            logger.critical("InfluxDB database '%s' did not exist. Please create it" % self.db)
85
            sys.exit(2)
86
87
        return db
88
89 View Code Duplication
    def _normalize(self, name, columns, points):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
90
        """Normalize data for the InfluxDB's data model.
91
92
        :return: a list of measurements.
93
        """
94
        ret = []
95
96
        # Build initial dict by crossing columns and point
97
        data_dict = dict(zip(columns, points))
98
99
        # issue1871 - Check if a key exist. If a key exist, the value of
100
        # the key should be used as a tag to identify the measurement.
101
        keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')]
102
        if len(keys_list) == 0:
103
            keys_list = [None]
104
105
        for measurement in keys_list:
106
            # Manage field
107
            if measurement is not None:
108
                fields = {
109
                    k.replace('{}.'.format(measurement), ''): data_dict[k]
110
                    for k in data_dict
111
                    if k.startswith('{}.'.format(measurement))
112
                }
113
            else:
114
                fields = data_dict
115
            # Transform to InfluxDB data model
116
            # https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/
117
            for k in fields:
118
                #  Do not export empty (None) value
119
                if fields[k] is None:
120
                    continue
121
                # Convert numerical to float
122
                try:
123
                    fields[k] = float(fields[k])
124
                except (TypeError, ValueError):
125
                    # Convert others to string
126
                    try:
127
                        fields[k] = str(fields[k])
128
                    except (TypeError, ValueError):
129
                        pass
130
            # Manage tags
131
            tags = self.parse_tags(self.tags)
132
            if 'key' in fields and fields['key'] in fields:
133
                # Create a tag from the key
134
                # Tag should be an string (see InfluxDB data model)
135
                tags[fields['key']] = str(fields[fields['key']])
136
                # Remove it from the field list (can not be a field and a tag)
137
                fields.pop(fields['key'])
138
            # Add the hostname as a tag
139
            tags['hostname'] = self.hostname
140
            # Add name as a tag (example for the process list)
141
            for k in FIELD_TO_TAG:
142
                if k in fields:
143
                    tags[k] = str(fields[k])
144
                    # Remove it from the field list (can not be a field and a tag)
145
                    if k in fields:
146
                        fields.pop(fields[k])
147
            # Add the measurement to the list
148
            ret.append({'measurement': name, 'tags': tags, 'fields': fields})
149
        return ret
150
151 View Code Duplication
    def export(self, name, columns, points):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
152
        """Write the points to the InfluxDB server."""
153
        # Manage prefix
154
        if self.prefix is not None:
155
            name = self.prefix + '.' + name
156
        # Write input to the InfluxDB database
157
        if len(points) == 0:
158
            logger.debug("Cannot export empty {} stats to InfluxDB".format(name))
159
        else:
160
            try:
161
                self.client.write_points(self._normalize(name, columns, points), time_precision="s")
162
            except Exception as e:
163
                # Log level set to debug instead of error (see: issue #1561)
164
                logger.debug("Cannot export {} stats to InfluxDB ({})".format(name, e))
165
            else:
166
                logger.debug("Export {} stats to InfluxDB".format(name))
167