Issues (50)

glances/exports/glances_influxdb2/__init__.py (2 issues)

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""InfluxDB (from to InfluxDB 1.8+) interface class."""
11
12
import sys
13
from platform import node
14
15
from glances.logger import logger
16
from glances.exports.export import GlancesExport
17
18
from influxdb_client import InfluxDBClient, WriteOptions
19
20
FIELD_TO_TAG = ['name', 'cmdline']
21
22
23
class Export(GlancesExport):
24
    """This class manages the InfluxDB export module."""
25
26
    def __init__(self, config=None, args=None):
27
        """Init the InfluxDB export IF."""
28
        super(Export, self).__init__(config=config, args=args)
29
30
        # Mandatory configuration keys (additional to host and port)
31
        self.org = None
32
        self.bucket = None
33
        self.token = None
34
35
        # Optional configuration keys
36
        self.protocol = 'http'
37
        self.prefix = None
38
        self.tags = None
39
        self.hostname = None
40
        self.interval = None
41
42
        # Load the InfluxDB configuration file
43
        self.export_enable = self.load_conf(
44
            'influxdb2',
45
            mandatories=['host', 'port', 'user', 'password', 'org', 'bucket', 'token'],
46
            options=['protocol', 'prefix', 'tags', 'interval'],
47
        )
48
        if not self.export_enable:
49
            exit('Missing influxdb2 config')
50
51
        # Interval between two exports (in seconds)
52
        if self.interval is None:
53
            self.interval = 0
54
        try:
55
            self.interval = int(self.interval)
56
        except ValueError:
57
            logger.warning("InfluxDB export interval is not an integer, use default value")
58
            self.interval = 0
59
        # and should be set to the Glances refresh time if the value is 0
60
        self.interval = self.interval if self.interval > 0 else self.args.time
61
        logger.debug("InfluxDB export interval is set to {} seconds".format(self.interval))
62
63
        # The hostname is always add as a tag
64
        self.hostname = node().split('.')[0]
65
66
        # Init the InfluxDB client
67
        self.client = self.init()
68
69
    def init(self):
70
        """Init the connection to the InfluxDB server."""
71
        if not self.export_enable:
72
            return None
73
74
        url = '{}://{}:{}'.format(self.protocol, self.host, self.port)
75
        try:
76
            # See docs: https://influxdb-client.readthedocs.io/en/stable/api.html#influxdbclient
77
            client = InfluxDBClient(url=url, enable_gzip=False, verify_ssl=False, org=self.org, token=self.token)
78
        except Exception as e:
79
            logger.critical("Cannot connect to InfluxDB server '%s' (%s)" % (url, e))
80
            sys.exit(2)
81
        else:
82
            logger.info(
83
                "Connected to InfluxDB server version {} ({})".format(client.health().version, client.health().message)
84
            )
85
86
        # Create the write client
87
        write_client = client.write_api(
88
            write_options=WriteOptions(
89
                batch_size=500,
90
                flush_interval=self.interval * 1000,
91
                jitter_interval=2000,
92
                retry_interval=5000,
93
                max_retries=5,
94
                max_retry_delay=30000,
95
                exponential_base=2,
96
            )
97
        )
98
        return write_client
99
100 View Code Duplication
    def _normalize(self, name, columns, points):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
101
        """Normalize data for the InfluxDB's data model.
102
103
        :return: a list of measurements.
104
        """
105
        ret = []
106
107
        # Build initial dict by crossing columns and point
108
        data_dict = dict(zip(columns, points))
109
110
        # issue1871 - Check if a key exist. If a key exist, the value of
111
        # the key should be used as a tag to identify the measurement.
112
        keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')]
113
        if len(keys_list) == 0:
114
            keys_list = [None]
115
116
        for measurement in keys_list:
117
            # Manage field
118
            if measurement is not None:
119
                fields = {
120
                    k.replace('{}.'.format(measurement), ''): data_dict[k]
121
                    for k in data_dict
122
                    if k.startswith('{}.'.format(measurement))
123
                }
124
            else:
125
                fields = data_dict
126
            # Transform to InfluxDB datamodel
127
            # https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/
128
            for k in fields:
129
                #  Do not export empty (None) value
130
                if fields[k] is None:
131
                    continue
132
                # Convert numerical to float
133
                try:
134
                    fields[k] = float(fields[k])
135
                except (TypeError, ValueError):
136
                    # Convert others to string
137
                    try:
138
                        fields[k] = str(fields[k])
139
                    except (TypeError, ValueError):
140
                        pass
141
            # Manage tags
142
            tags = self.parse_tags(self.tags)
143
            if 'key' in fields and fields['key'] in fields:
144
                # Create a tag from the key
145
                # Tag should be an string (see InfluxDB data model)
146
                tags[fields['key']] = str(fields[fields['key']])
147
                # Remove it from the field list (can not be a field and a tag)
148
                fields.pop(fields['key'])
149
            # Add the hostname as a tag
150
            tags['hostname'] = self.hostname
151
            # Add name as a tag (example for the process list)
152
            for k in FIELD_TO_TAG:
153
                if k in fields:
154
                    tags[k] = str(fields[k])
155
                    # Remove it from the field list (can not be a field and a tag)
156
                    if k in fields:
157
                        fields.pop(fields[k])
158
            # Add the measurement to the list
159
            ret.append({'measurement': name, 'tags': tags, 'fields': fields})
160
        return ret
161
162 View Code Duplication
    def export(self, name, columns, points):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
163
        """Write the points to the InfluxDB server."""
164
        # Manage prefix
165
        if self.prefix is not None:
166
            name = self.prefix + '.' + name
167
        # Write input to the InfluxDB database
168
        if len(points) == 0:
169
            logger.debug("Cannot export empty {} stats to InfluxDB".format(name))
170
        else:
171
            try:
172
                self.client.write(self.bucket, self.org, self._normalize(name, columns, points), time_precision="s")
173
            except Exception as e:
174
                # Log level set to debug instead of error (see: issue #1561)
175
                logger.debug("Cannot export {} stats to InfluxDB ({})".format(name, e))
176
            else:
177
                logger.debug("Export {} stats to InfluxDB".format(name))
178