Issues (49)

glances/exports/glances_influxdb2/__init__.py (1 issue)

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""InfluxDB (from to InfluxDB 1.8+) interface class."""
10
11
import sys
12
from platform import node
13
14
from influxdb_client import InfluxDBClient, WriteOptions
15
16
from glances.exports.export import GlancesExport
17
from glances.logger import logger
18
19
FIELD_TO_TAG = ['name', 'cmdline', 'type']
20
21
22
class Export(GlancesExport):
23
    """This class manages the InfluxDB export module."""
24
25
    def __init__(self, config=None, args=None):
26
        """Init the InfluxDB export IF."""
27
        super().__init__(config=config, args=args)
28
29
        # Mandatory configuration keys (additional to host and port)
30
        self.org = None
31
        self.bucket = None
32
        self.token = None
33
34
        # Optional configuration keys
35
        self.protocol = 'http'
36
        self.prefix = None
37
        self.tags = None
38
        self.hostname = None
39
        self.interval = None
40
41
        # Load the InfluxDB configuration file
42
        self.export_enable = self.load_conf(
43
            'influxdb2',
44
            mandatories=['host', 'port', 'user', 'password', 'org', 'bucket', 'token'],
45
            options=['protocol', 'prefix', 'tags', 'interval'],
46
        )
47
        if not self.export_enable:
48
            exit('Missing influxdb2 config')
49
50
        # Interval between two exports (in seconds)
51
        if self.interval is None:
52
            self.interval = 0
53
        try:
54
            self.interval = int(self.interval)
55
        except ValueError:
56
            logger.warning("InfluxDB export interval is not an integer, use default value")
57
            self.interval = 0
58
        # and should be set to the Glances refresh time if the value is 0
59
        self.interval = self.interval if self.interval > 0 else self.args.time
60
        logger.debug(f"InfluxDB export interval is set to {self.interval} seconds")
61
62
        # The hostname is always add as a tag
63
        self.hostname = node().split('.')[0]
64
65
        # Init the InfluxDB client
66
        self.client = self.init()
67
68
    def init(self):
69
        """Init the connection to the InfluxDB server."""
70
        if not self.export_enable:
71
            return None
72
73
        url = f'{self.protocol}://{self.host}:{self.port}'
74
        try:
75
            # See docs: https://influxdb-client.readthedocs.io/en/stable/api.html#influxdbclient
76
            client = InfluxDBClient(url=url, enable_gzip=False, verify_ssl=False, org=self.org, token=self.token)
77
        except Exception as e:
78
            logger.critical(f"Cannot connect to InfluxDB server '{url}' ({e})")
79
            sys.exit(2)
80
        else:
81
            logger.info(f"Connected to InfluxDB server version {client.health().version} ({client.health().message})")
82
83
        # Create the write client
84
        return client.write_api(
85
            write_options=WriteOptions(
86
                batch_size=500,
87
                flush_interval=self.interval * 1000,
88
                jitter_interval=2000,
89
                retry_interval=5000,
90
                max_retries=5,
91
                max_retry_delay=30000,
92
                exponential_base=2,
93
            )
94
        )
95
96 View Code Duplication
    def _normalize(self, name, columns, points):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
97
        """Normalize data for the InfluxDB's data model.
98
99
        :return: a list of measurements.
100
        """
101
        ret = []
102
103
        # Build initial dict by crossing columns and point
104
        data_dict = dict(zip(columns, points))
105
106
        # issue1871 - Check if a key exist. If a key exist, the value of
107
        # the key should be used as a tag to identify the measurement.
108
        keys_list = [k.split('.')[0] for k in columns if k.endswith('.key')]
109
        if len(keys_list) == 0:
110
            keys_list = [None]
111
112
        for measurement in keys_list:
113
            # Manage field
114
            if measurement is not None:
115
                fields = {
116
                    k.replace(f'{measurement}.', ''): data_dict[k] for k in data_dict if k.startswith(f'{measurement}.')
117
                }
118
            else:
119
                fields = data_dict
120
            # Transform to InfluxDB datamodel
121
            # https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/
122
            for k in fields:
123
                #  Do not export empty (None) value
124
                if fields[k] is None:
125
                    continue
126
                # Convert numerical to float
127
                try:
128
                    fields[k] = float(fields[k])
129
                except (TypeError, ValueError):
130
                    # Convert others to string
131
                    try:
132
                        fields[k] = str(fields[k])
133
                    except (TypeError, ValueError):
134
                        pass
135
            # Manage tags
136
            tags = self.parse_tags(self.tags)
137
            if 'key' in fields and fields['key'] in fields:
138
                # Create a tag from the key
139
                # Tag should be an string (see InfluxDB data model)
140
                tags[fields['key']] = str(fields[fields['key']])
141
                # Remove it from the field list (can not be a field and a tag)
142
                fields.pop(fields['key'])
143
            # Add the hostname as a tag
144
            tags['hostname'] = self.hostname
145
            # Add name as a tag (example for the process list)
146
            for k in FIELD_TO_TAG:
147
                if k in fields:
148
                    tags[k] = str(fields[k])
149
                    # Remove it from the field list (can not be a field and a tag)
150
                    if k in fields:
151
                        fields.pop(fields[k])
152
            # Add the measurement to the list
153
            ret.append({'measurement': name, 'tags': tags, 'fields': fields})
154
        return ret
155
156
    def export(self, name, columns, points):
157
        """Write the points to the InfluxDB server."""
158
        # Manage prefix
159
        if self.prefix is not None:
160
            name = self.prefix + '.' + name
161
        # Write input to the InfluxDB database
162
        if len(points) == 0:
163
            logger.debug(f"Cannot export empty {name} stats to InfluxDB")
164
        else:
165
            try:
166
                self.client.write(self.bucket, self.org, self._normalize(name, columns, points), time_precision="s")
167
            except Exception as e:
168
                # Log level set to debug instead of error (see: issue #1561)
169
                logger.debug(f"Cannot export {name} stats to InfluxDB ({e})")
170
            else:
171
                logger.debug(f"Export {name} stats to InfluxDB")
172