Issues (49)

glances/exports/glances_influxdb/__init__.py (1 issue)

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""InfluxDB (up to InfluxDB 1.7.x) interface class."""
10
11
import sys
12
from platform import node
13
14
from influxdb import InfluxDBClient
15
from influxdb.client import InfluxDBClientError
16
17
from glances.exports.export import GlancesExport
18
from glances.logger import logger
19
20
FIELD_TO_TAG = ['name', 'cmdline', 'type']
21
22
23
class Export(GlancesExport):
24
    """This class manages the InfluxDB export module."""
25
26
    def __init__(self, config=None, args=None):
27
        """Init the InfluxDB export IF."""
28
        super().__init__(config=config, args=args)
29
30
        # Mandatory configuration keys (additional to host and port)
31
        self.user = None
32
        self.password = None
33
        self.db = None
34
35
        # Optional configuration keys
36
        self.protocol = 'http'
37
        self.prefix = None
38
        self.tags = None
39
        self.hostname = None
40
41
        # Load the InfluxDB configuration file
42
        self.export_enable = self.load_conf(
43
            'influxdb', mandatories=['host', 'port', 'user', 'password', 'db'], options=['protocol', 'prefix', 'tags']
44
        )
45
        if not self.export_enable:
46
            exit('Missing INFLUXDB version 1 config')
47
48
        # The hostname is always add as a tag
49
        self.hostname = node().split('.')[0]
50
51
        # Init the InfluxDB client
52
        self.client = self.init()
53
54
    def init(self):
55
        """Init the connection to the InfluxDB server."""
56
        if not self.export_enable:
57
            return None
58
59
        # Correct issue #1530
60
        if self.protocol is not None and (self.protocol.lower() == 'https'):
61
            ssl = True
62
        else:
63
            ssl = False
64
65
        try:
66
            db = InfluxDBClient(
67
                host=self.host,
68
                port=self.port,
69
                ssl=ssl,
70
                verify_ssl=False,
71
                username=self.user,
72
                password=self.password,
73
                database=self.db,
74
            )
75
            get_all_db = [i['name'] for i in db.get_list_database()]
76
        except InfluxDBClientError as e:
77
            logger.critical(f"Cannot connect to InfluxDB database '{self.db}' ({e})")
78
            sys.exit(2)
79
80
        if self.db in get_all_db:
0 ignored issues
show
The variable get_all_db does not seem to be defined for all execution paths.
Loading history...
81
            logger.info(f"Stats will be exported to InfluxDB server: {db._baseurl}")
82
        else:
83
            logger.critical(f"InfluxDB database '{self.db}' did not exist. Please create it")
84
            sys.exit(2)
85
86
        return db
87
88 View Code Duplication
    def _normalize(self, name, columns, points):
89
        """Normalize data for the InfluxDB's data model.
90
91
        :return: a list of measurements.
92
        """
93
        ret = []
94
95
        # Build initial dict by crossing columns and point
96
        data_dict = dict(zip(columns, points))
97
98
        # issue1871 - Check if a key exist. If a key exist, the value of
99
        # the key should be used as a tag to identify the measurement.
100
        keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')]
101
        if len(keys_list) == 0:
102
            keys_list = [None]
103
104
        for measurement in keys_list:
105
            # Manage field
106
            if measurement is not None:
107
                fields = {
108
                    k.replace(f'{measurement}.', ''): data_dict[k] for k in data_dict if k.startswith(f'{measurement}.')
109
                }
110
            else:
111
                fields = data_dict
112
            # Transform to InfluxDB data model
113
            # https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_reference/
114
            for k in fields:
115
                #  Do not export empty (None) value
116
                if fields[k] is None:
117
                    continue
118
                # Convert numerical to float
119
                try:
120
                    fields[k] = float(fields[k])
121
                except (TypeError, ValueError):
122
                    # Convert others to string
123
                    try:
124
                        fields[k] = str(fields[k])
125
                    except (TypeError, ValueError):
126
                        pass
127
            # Manage tags
128
            tags = self.parse_tags(self.tags)
129
            if 'key' in fields and fields['key'] in fields:
130
                # Create a tag from the key
131
                # Tag should be an string (see InfluxDB data model)
132
                tags[fields['key']] = str(fields[fields['key']])
133
                # Remove it from the field list (can not be a field and a tag)
134
                fields.pop(fields['key'])
135
            # Add the hostname as a tag
136
            tags['hostname'] = self.hostname
137
            # Add name as a tag (example for the process list)
138
            for k in FIELD_TO_TAG:
139
                if k in fields:
140
                    tags[k] = str(fields[k])
141
                    # Remove it from the field list (can not be a field and a tag)
142
                    if k in fields:
143
                        fields.pop(fields[k])
144
            # Add the measurement to the list
145
            ret.append({'measurement': name, 'tags': tags, 'fields': fields})
146
        return ret
147
148
    def export(self, name, columns, points):
149
        """Write the points to the InfluxDB server."""
150
        # Manage prefix
151
        if self.prefix is not None:
152
            name = self.prefix + '.' + name
153
        # Write input to the InfluxDB database
154
        if len(points) == 0:
155
            logger.debug(f"Cannot export empty {name} stats to InfluxDB")
156
        else:
157
            try:
158
                self.client.write_points(self._normalize(name, columns, points), time_precision="s")
159
            except Exception as e:
160
                # Log level set to debug instead of error (see: issue #1561)
161
                logger.debug(f"Cannot export {name} stats to InfluxDB ({e})")
162
            else:
163
                logger.debug(f"Export {name} stats to InfluxDB")
164