Issues (46)

glances/exports/glances_influxdb2.py (2 issues)

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""InfluxDB (from to InfluxDB 1.8+) interface class."""
11
12
import sys
13
from platform import node
14
15
from glances.logger import logger
16
from glances.exports.glances_export import GlancesExport
17
18
from influxdb_client import InfluxDBClient, WriteOptions
19
20
21
class Export(GlancesExport):
22
    """This class manages the InfluxDB export module."""
23
24
    def __init__(self, config=None, args=None):
25
        """Init the InfluxDB export IF."""
26
        super(Export, self).__init__(config=config, args=args)
27
28
        # Mandatory configuration keys (additional to host and port)
29
        self.org = None
30
        self.bucket = None
31
        self.token = None
32
33
        # Optional configuration keys
34
        self.protocol = 'http'
35
        self.prefix = None
36
        self.tags = None
37
        self.hostname = None
38
        self.interval = None
39
40
        # Load the InfluxDB configuration file
41
        self.export_enable = self.load_conf(
42
            'influxdb2',
43
            mandatories=['host', 'port', 'user', 'password', 'org', 'bucket', 'token'],
44
            options=['protocol', 'prefix', 'tags', 'interval'],
45
        )
46
        if not self.export_enable:
47
            exit('Missing influxdb2 config')
48
49
        # Interval between two exports (in seconds)
50
        if self.interval is None:
51
            self.interval = 0
52
        try:
53
            self.interval = int(self.interval)
54
        except ValueError:
55
            logger.warning("InfluxDB export interval is not an integer, use default value")
56
            self.interval = 0
57
        # and should be set to the Glances refresh time if the value is 0
58
        self.interval = self.interval if self.interval > 0 else self.args.time
59
        logger.debug("InfluxDB export interval is set to {} seconds".format(self.interval))
60
61
        # The hostname is always add as a tag
62
        self.hostname = node().split('.')[0]
63
64
        # Init the InfluxDB client
65
        self.client = self.init()
66
67
    def init(self):
68
        """Init the connection to the InfluxDB server."""
69
        if not self.export_enable:
70
            return None
71
72
        url = '{}://{}:{}'.format(self.protocol, self.host, self.port)
73
        try:
74
            # See docs: https://influxdb-client.readthedocs.io/en/stable/api.html#influxdbclient
75
            client = InfluxDBClient(url=url, enable_gzip=False, verify_ssl=False, org=self.org, token=self.token)
76
        except Exception as e:
77
            logger.critical("Cannot connect to InfluxDB server '%s' (%s)" % (url, e))
78
            sys.exit(2)
79
        else:
80
            logger.info(
81
                "Connected to InfluxDB server version {} ({})".format(client.health().version, client.health().message)
82
            )
83
84
        # Create the write client
85
        write_client = client.write_api(
86
            write_options=WriteOptions(
87
                batch_size=500,
88
                flush_interval=self.interval * 1000,
89
                jitter_interval=2000,
90
                retry_interval=5000,
91
                max_retries=5,
92
                max_retry_delay=30000,
93
                exponential_base=2,
94
            )
95
        )
96
        return write_client
97
98 View Code Duplication
    def _normalize(self, name, columns, points):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
99
        """Normalize data for the InfluxDB's data model.
100
101
        :return: a list of measurements.
102
        """
103
        ret = []
104
105
        # Build initial dict by crossing columns and point
106
        data_dict = dict(zip(columns, points))
107
108
        # issue1871 - Check if a key exist. If a key exist, the value of
109
        # the key should be used as a tag to identify the measurement.
110
        keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')]
111
        if len(keys_list) == 0:
112
            keys_list = [None]
113
114
        for measurement in keys_list:
115
            # Manage field
116
            if measurement is not None:
117
                fields = {
118
                    k.replace('{}.'.format(measurement), ''): data_dict[k]
119
                    for k in data_dict
120
                    if k.startswith('{}.'.format(measurement))
121
                }
122
            else:
123
                fields = data_dict
124
            # Transform to InfluxDB datamodel
125
            # https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/
126
            for k in fields:
127
                #  Do not export empty (None) value
128
                if fields[k] is None:
129
                    continue
130
                # Convert numerical to float
131
                try:
132
                    fields[k] = float(fields[k])
133
                except (TypeError, ValueError):
134
                    # Convert others to string
135
                    try:
136
                        fields[k] = str(fields[k])
137
                    except (TypeError, ValueError):
138
                        pass
139
            # Manage tags
140
            tags = self.parse_tags(self.tags)
141
            if 'key' in fields and fields['key'] in fields:
142
                # Create a tag from the key
143
                # Tag should be an string (see InfluxDB data model)
144
                tags[fields['key']] = str(fields[fields['key']])
145
                # Remove it from the field list (can not be a field and a tag)
146
                fields.pop(fields['key'])
147
            # Add the hostname as a tag
148
            tags['hostname'] = self.hostname
149
            # Add the measurement to the list
150
            ret.append({'measurement': name, 'tags': tags, 'fields': fields})
151
        return ret
152
153 View Code Duplication
    def export(self, name, columns, points):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
154
        """Write the points to the InfluxDB server."""
155
        # Manage prefix
156
        if self.prefix is not None:
157
            name = self.prefix + '.' + name
158
        # Write input to the InfluxDB database
159
        if len(points) == 0:
160
            logger.debug("Cannot export empty {} stats to InfluxDB".format(name))
161
        else:
162
            try:
163
                self.client.write(self.bucket, self.org, self._normalize(name, columns, points), time_precision="s")
164
            except Exception as e:
165
                # Log level set to debug instead of error (see: issue #1561)
166
                logger.debug("Cannot export {} stats to InfluxDB ({})".format(name, e))
167
            else:
168
                logger.debug("Export {} stats to InfluxDB".format(name))
169