Test Failed
Push — master ( 69b639...e7fa0a )
by Nicolas
04:05 queued 01:05
created

glances.exports.glances_influxdb.Export.init()   B

Complexity

Conditions 6

Size

Total Lines 32
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 24
nop 1
dl 0
loc 32
rs 8.3706
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# Copyright (C) 2021 Nicolargo <[email protected]>
6
#
7
# Glances is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU Lesser General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# Glances is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Lesser General Public License for more details.
16
#
17
# You should have received a copy of the GNU Lesser General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
"""InfluxDB (up to InfluxDB 1.7.x) interface class."""
21
22
import sys
23
from platform import node
24
25
from glances.logger import logger
26
from glances.exports.glances_export import GlancesExport
27
28
from influxdb import InfluxDBClient
29
from influxdb.client import InfluxDBClientError
30
31
32
class Export(GlancesExport):
33
    """This class manages the InfluxDB export module."""
34
35
    def __init__(self, config=None, args=None):
36
        """Init the InfluxDB export IF."""
37
        super(Export, self).__init__(config=config, args=args)
38
39
        # Mandatories configuration keys (additional to host and port)
40
        self.user = None
41
        self.password = None
42
        self.db = None
43
44
        # Optionals configuration keys
45
        self.protocol = 'http'
46
        self.prefix = None
47
        self.tags = None
48
49
        # Load the InfluxDB configuration file
50
        self.export_enable = self.load_conf('influxdb',
51
                                            mandatories=['host', 'port',
52
                                                         'user', 'password',
53
                                                         'db'],
54
                                            options=['protocol',
55
                                                     'prefix',
56
                                                     'tags'])
57
        if not self.export_enable:
58
            sys.exit(2)
59
60
        # The hostname is always add as a tag
61
        self.hostname = node().split('.')[0]
62
63
        # Init the InfluxDB client
64
        self.client = self.init()
65
66
    def init(self):
67
        """Init the connection to the InfluxDB server."""
68
        if not self.export_enable:
69
            return None
70
71
        # Correct issue #1530
72
        if self.protocol is not None and (self.protocol.lower() == 'https'):
73
            ssl = True
74
        else:
75
            ssl = False
76
77
        try:
78
            db = InfluxDBClient(host=self.host,
79
                                port=self.port,
80
                                ssl=ssl,
81
                                verify_ssl=False,
82
                                username=self.user,
83
                                password=self.password,
84
                                database=self.db)
85
            get_all_db = [i['name'] for i in db.get_list_database()]
86
        except InfluxDBClientError as e:
87
            logger.critical("Cannot connect to InfluxDB database '%s' (%s)" % (self.db, e))
88
            sys.exit(2)
89
90
        if self.db in get_all_db:
0 ignored issues
show
introduced by
The variable get_all_db does not seem to be defined for all execution paths.
Loading history...
91
            logger.info(
92
                "Stats will be exported to InfluxDB server: {}".format(db._baseurl))
93
        else:
94
            logger.critical("InfluxDB database '%s' did not exist. Please create it" % self.db)
95
            sys.exit(2)
96
97
        return db
98
99 View Code Duplication
    def _normalize(self, name, columns, points):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
100
        """Normalize data for the InfluxDB's data model.
101
        Output is a list of measurements."""
102
        ret = []
103
104
        # Build initial dict by crossing columns and point
105
        data_dict = dict(zip(columns, points))
106
107
        # issue1871 - Check if a key exist. If a key exist, the value of
108
        # the key should be used as a tag to identify the measurement.
109
        keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')]
110
        if len(keys_list) == 0:
111
            keys_list = [None]
112
113
        for measurement in keys_list:
114
            # Manage field
115
            if measurement is not None:
116
                fields = {k.replace('{}.'.format(measurement), ''): data_dict[k]
117
                          for k in data_dict
118
                          if k.startswith('{}.'.format(measurement))}
119
            else:
120
                fields = data_dict
121
            # Transform to InfluxDB datamodel
122
            # https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/
123
            for k in fields:
124
                #  Do not export empty (None) value
125
                if fields[k] is None:
126
                    fields.pop(k)
127
                # Convert numerical to float
128
                try:
129
                    fields[k] = float(fields[k])
130
                except (TypeError, ValueError):
131
                    # Convert others to string
132
                    try:
133
                        fields[k] = str(fields[k])
134
                    except (TypeError, ValueError):
135
                        pass
136
            # Manage tags
137
            tags = self.parse_tags(self.tags)
138
            if 'key' in fields and fields['key'] in fields:
139
                # Create a tag from the key
140
                # Tag should be an string (see InfluxDB data model)
141
                tags[fields['key']] = str(fields[fields['key']])
142
                # Remove it from the field list (can not be a field and a tag)
143
                fields.pop(fields['key'])
144
            # Add the hostname as a tag
145
            tags['hostname'] = self.hostname
146
            # Add the measurement to the list
147
            ret.append({'measurement': name,
148
                        'tags': tags,
149
                        'fields': fields})
150
        return ret
151
152 View Code Duplication
    def export(self, name, columns, points):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
153
        """Write the points to the InfluxDB server."""
154
        # Manage prefix
155
        if self.prefix is not None:
156
            name = self.prefix + '.' + name
157
        # Write input to the InfluxDB database
158
        if len(points) == 0:
159
            logger.debug("Cannot export empty {} stats to InfluxDB".format(name))
160
        else:
161
            try:
162
                self.client.write_points(self._normalize(name, columns, points),
163
                                         time_precision="s")
164
            except Exception as e:
165
                # Log level set to debug instead of error (see: issue #1561)
166
                logger.debug("Cannot export {} stats to InfluxDB ({})".format(name, e))
167
            else:
168
                logger.debug("Export {} stats to InfluxDB".format(name))
169