Test Failed
Push — master ( 69b639...e7fa0a )
by Nicolas
04:05 queued 01:05
created

glances.exports.glances_influxdb2   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 160
Duplicated Lines 43.75 %

Importance

Changes 0
Metric Value
eloc 90
dl 70
loc 160
rs 10
c 0
b 0
f 0
wmc 21

4 Methods

Rating   Name   Duplication   Size   Complexity  
A Export.__init__() 0 27 2
C Export._normalize() 51 51 10
A Export.export() 19 19 5
A Export.init() 0 27 4

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# Copyright (C) 2021 Nicolargo <[email protected]>
6
#
7
# Glances is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU Lesser General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# Glances is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Lesser General Public License for more details.
16
#
17
# You should have received a copy of the GNU Lesser General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
"""InfluxDB (from to InfluxDB 1.8+) interface class."""
21
22
import sys
23
24
from glances.logger import logger
25
from glances.exports.glances_export import GlancesExport
26
27
from influxdb_client import InfluxDBClient, WriteOptions
28
29
30
class Export(GlancesExport):
31
    """This class manages the InfluxDB export module."""
32
33
    def __init__(self, config=None, args=None):
34
        """Init the InfluxDB export IF."""
35
        super(Export, self).__init__(config=config, args=args)
36
37
        # Mandatories configuration keys (additional to host and port)
38
        self.org = None
39
        self.bucket = None
40
        self.token = None
41
42
        # Optionals configuration keys
43
        self.protocol = 'http'
44
        self.prefix = None
45
        self.tags = None
46
47
        # Load the InfluxDB configuration file
48
        self.export_enable = self.load_conf('influxdb2',
49
                                            mandatories=['host', 'port',
50
                                                         'user', 'password',
51
                                                         'org', 'bucket', 'token'],
52
                                            options=['protocol',
53
                                                     'prefix',
54
                                                     'tags'])
55
        if not self.export_enable:
56
            sys.exit(2)
57
58
        # Init the InfluxDB client
59
        self.client = self.init()
60
61
    def init(self):
62
        """Init the connection to the InfluxDB server."""
63
        if not self.export_enable:
64
            return None
65
66
        url = '{}://{}:{}'.format(self.protocol, self.host, self.port)
67
        try:
68
            client = InfluxDBClient(url=url,
69
                                    enable_gzip=False,
70
                                    org=self.org,
71
                                    token=self.token)
72
        except Exception as e:
73
            logger.critical("Cannot connect to InfluxDB server '%s' (%s)" % (url, e))
74
            sys.exit(2)
75
        else:
76
            logger.info("Connected to InfluxDB server version {} ({})".format(client.health().version,
77
                                                                              client.health().message))
78
79
        # Create the write client
80
        write_client = client.write_api(write_options=WriteOptions(batch_size=500,
81
                                                                   flush_interval=10000,
82
                                                                   jitter_interval=2000,
83
                                                                   retry_interval=5000,
84
                                                                   max_retries=5,
85
                                                                   max_retry_delay=30000,
86
                                                                   exponential_base=2))
87
        return write_client
88
89 View Code Duplication
    def _normalize(self, name, columns, points):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
90
        """Normalize data for the InfluxDB's data model."""
91
        ret = []
92
93
        # Build initial dict by crossing columns and point
94
        data_dict = dict(zip(columns, points))
95
96
        # issue1871 - Check if a key exist. If a key exist, the value of
97
        # the key should be used as a tag to identify the measurement.
98
        keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')]
99
        if len(keys_list) == 0:
100
            keys_list = [None]
101
102
        for measurement in keys_list:
103
            # Manage field
104
            if measurement is not None:
105
                fields = {k.replace('{}.'.format(measurement), ''): data_dict[k]
106
                          for k in data_dict
107
                          if k.startswith('{}.'.format(measurement))}
108
            else:
109
                fields = data_dict
110
            # Transform to InfluxDB datamodel
111
            # https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/
112
            for k in fields:
113
                #  Do not export empty (None) value
114
                if fields[k] is None:
115
                    fields.pop(k)
116
                # Convert numerical to float
117
                try:
118
                    fields[k] = float(fields[k])
119
                except (TypeError, ValueError):
120
                    # Convert others to string
121
                    try:
122
                        fields[k] = str(fields[k])
123
                    except (TypeError, ValueError):
124
                        pass
125
            # Manage tags
126
            tags = self.parse_tags(self.tags)
127
            if 'key' in fields and fields['key'] in fields:
128
                # Create a tag from the key
129
                # Tag should be an string (see InfluxDB data model)
130
                tags[fields['key']] = str(fields[fields['key']])
131
                # Remove it from the field list (can not be a field and a tag)
132
                fields.pop(fields['key'])
133
            # Add the hostname as a tag
134
            tags['hostname'] = self.hostname
135
            # Add the measurement to the list
136
            ret.append({'measurement': name,
137
                        'tags': tags,
138
                        'fields': fields})
139
        return ret
140
141 View Code Duplication
    def export(self, name, columns, points):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
142
        """Write the points to the InfluxDB server."""
143
        # Manage prefix
144
        if self.prefix is not None:
145
            name = self.prefix + '.' + name
146
        # Write input to the InfluxDB database
147
        if len(points) == 0:
148
            logger.debug("Cannot export empty {} stats to InfluxDB".format(name))
149
        else:
150
            try:
151
                self.client.write(self.bucket,
152
                                  self.org,
153
                                  self._normalize(name, columns, points),
154
                                  time_precision="s")
155
            except Exception as e:
156
                # Log level set to debug instead of error (see: issue #1561)
157
                logger.debug("Cannot export {} stats to InfluxDB ({})".format(name, e))
158
            else:
159
                logger.debug("Export {} stats to InfluxDB".format(name))
160