Test Failed
Push — develop ( bdd66c...540682 )
by Nicolas
02:39
created

Export.normalize()   B

Complexity

Conditions 7

Size

Total Lines 15
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 12
nop 2
dl 0
loc 15
rs 8
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2025 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""TimescaleDB interface class."""
10
11
import sys
12
import time
13
from platform import node
14
15
import psycopg
16
17
from glances.exports.export import GlancesExport
18
from glances.logger import logger
19
20
# Define the type conversions for TimescaleDB
21
# https://www.postgresql.org/docs/current/datatype.html
22
convert_types = {
23
    'bool': 'BOOLEAN',
24
    'int': 'BIGINT',
25
    'float': 'DOUBLE PRECISION',
26
    'str': 'TEXT',
27
    'tuple': 'TEXT',  # Store tuples as TEXT (comma-separated)
28
    'list': 'TEXT',  # Store lists as TEXT (comma-separated)
29
    'NoneType': 'DOUBLE PRECISION',  # Use DOUBLE PRECISION for NoneType to avoid issues with NULL
30
}
31
32
33
class Export(GlancesExport):
34
    """This class manages the TimescaleDB export module."""
35
36 View Code Duplication
    def __init__(self, config=None, args=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
37
        """Init the TimescaleDB export IF."""
38
        super().__init__(config=config, args=args)
39
40
        # Mandatory configuration keys (additional to host and port)
41
        self.db = None
42
43
        # Optional configuration keys
44
        self.user = None
45
        self.password = None
46
        self.hostname = None
47
48
        # Load the configuration file
49
        self.export_enable = self.load_conf(
50
            'timescaledb', mandatories=['host', 'port', 'db'], options=['user', 'password', 'hostname']
51
        )
52
        if not self.export_enable:
53
            exit('Missing TimescaleDB config')
54
55
        # The hostname is always add as an identifier in the TimescaleDB table
56
        # so we can filter the stats by hostname
57
        self.hostname = self.hostname or node().split(".")[0]
58
59
        # Init the TimescaleDB client
60
        self.client = self.init()
61
62
    def init(self):
63
        """Init the connection to the TimescaleDB server."""
64
        if not self.export_enable:
65
            return None
66
67
        try:
68
            # See https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
69
            conn_str = f"host={self.host} port={self.port} dbname={self.db} user={self.user} password={self.password}"
70
            db = psycopg.connect(conn_str)
71
        except Exception as e:
72
            logger.critical(f"Cannot connect to TimescaleDB server {self.host}:{self.port} ({e})")
73
            sys.exit(2)
74
        else:
75
            logger.info(f"Stats will be exported to TimescaleDB server: {self.host}:{self.port}")
76
77
        return db
78
79
    def normalize(self, value):
80
        """Normalize the value to be exportable to TimescaleDB."""
81
        if value is None:
82
            return 'NULL'
83
        if isinstance(value, bool):
84
            return str(value).upper()
85
        if isinstance(value, (list, tuple)):
86
            # Special case for list of one boolean
87
            if len(value) == 1 and isinstance(value[0], bool):
88
                return str(value[0]).upper()
89
            return ', '.join([f"'{v}'" for v in value])
90
        if isinstance(value, str):
91
            return f"'{value}'"
92
93
        return f"{value}"
94
95
    def update(self, stats):
96
        """Update the TimescaleDB export module."""
97
        if not self.export_enable:
98
            return False
99
100
        # Get all the stats & limits
101
        # @TODO: Current limitation with sensors, fs and diskio plugins because fields list is not the same
102
        self._last_exported_list = [p for p in self.plugins_to_export(stats) if p not in ['sensors', 'fs', 'diskio']]
103
        all_stats = stats.getAllExportsAsDict(plugin_list=self.last_exported_list())
104
        all_limits = stats.getAllLimitsAsDict(plugin_list=self.last_exported_list())
105
106
        # Loop over plugins to export
107
        for plugin in self.last_exported_list():
108
            if isinstance(all_stats[plugin], dict):
109
                all_stats[plugin].update(all_limits[plugin])
110
                # Remove the <plugin>_disable field
111
                all_stats[plugin].pop(f"{plugin}_disable", None)
112
                # user is a special field that should not be exported
113
                # rename it to user_<plugin>
114
                if 'user' in all_stats[plugin]:
115
                    all_stats[plugin][f'user_{plugin}'] = all_stats[plugin].pop('user')
116
            elif isinstance(all_stats[plugin], list):
117
                for i in all_stats[plugin]:
118
                    i.update(all_limits[plugin])
119
                    # Remove the <plugin>_disable field
120
                    i.pop(f"{plugin}_disable", None)
121
                    # user is a special field that should not be exported
122
                    # rename it to user_<plugin>
123
                    if 'user' in i:
124
                        i[f'user_{plugin}'] = i.pop('user')
125
            else:
126
                continue
127
128
            plugin_stats = all_stats[plugin]
129
            creation_list = []  # List used to create the TimescaleDB table
130
            segmented_by = []  # List of columns used to segment the data
131
            values_list = []  # List of values to insert (list of lists, one list per row)
132
            if isinstance(plugin_stats, dict):
133
                # Stats is a dict
134
                # Create the list used to create the TimescaleDB table
135
                creation_list.append('time TIMESTAMPTZ NOT NULL')
136
                creation_list.append('hostname_id TEXT NOT NULL')
137
                segmented_by.extend(['hostname_id'])  # Segment by hostname
138
                for key, value in plugin_stats.items():
139
                    creation_list.append(f"{key} {convert_types[type(value).__name__]} NULL")
140
                values_list.append('NOW()')  # Add the current time (insertion time)
141
                values_list.append(f"'{self.hostname}'")  # Add the hostname
142
                values_list.extend([self.normalize(value) for value in plugin_stats.values()])
143
                values_list = [values_list]
144
            elif isinstance(plugin_stats, list) and len(plugin_stats) > 0 and 'key' in plugin_stats[0]:
145
                # Stats is a list
146
                # Create the list used to create the TimescaleDB table
147
                creation_list.append('time TIMESTAMPTZ NOT NULL')
148
                creation_list.append('hostname_id TEXT NOT NULL')
149
                creation_list.append('key_id TEXT NOT NULL')
150
                segmented_by.extend(['hostname_id', 'key_id'])  # Segment by hostname and key
151
                for key, value in plugin_stats[0].items():
152
                    creation_list.append(f"{key} {convert_types[type(value).__name__]} NULL")
153
                # Create the values list (it is a list of list to have a single datamodel for all the plugins)
154
                for plugin_item in plugin_stats:
155
                    item_list = []
156
                    item_list.append('NOW()')  # Add the current time (insertion time)
157
                    item_list.append(f"'{self.hostname}'")  # Add the hostname
158
                    item_list.append(f"'{plugin_item.get('key')}'")
159
                    item_list.extend([self.normalize(value) for value in plugin_item.values()])
160
                    values_list.append(item_list[:-1])
161
            else:
162
                continue
163
164
            # Export stats to TimescaleDB
165
            # logger.info(plugin)
166
            # logger.info(f"Segmented by: {segmented_by}")
167
            # logger.info(list(zip(creation_list, values_list[0])))
168
            self.export(plugin, creation_list, segmented_by, values_list)
169
170
        return True
171
172
    def export(self, plugin, creation_list, segmented_by, values_list):
173
        """Export the stats to the TimescaleDB server."""
174
        logger.debug(f"Export {plugin} stats to TimescaleDB")
175
176
        with self.client.cursor() as cur:
177
            # Is the table exists?
178
            cur.execute(f"select exists(select * from information_schema.tables where table_name='{plugin}')")
179
            if not cur.fetchone()[0]:
180
                # Create the table if it does not exist
181
                # https://github.com/timescale/timescaledb/blob/main/README.md#create-a-hypertable
182
                # Execute the create table query
183
                create_query = f"""
184
CREATE TABLE {plugin} (
185
    {', '.join(creation_list)}
186
)
187
WITH (
188
    timescaledb.hypertable,
189
    timescaledb.partition_column='time',
190
    timescaledb.segmentby = '{", ".join(segmented_by)}'
191
);"""
192
                logger.debug(f"Create table: {create_query}")
193
                try:
194
                    cur.execute(create_query)
195
                except Exception as e:
196
                    logger.error(f"Cannot create table {plugin}: {e}")
197
                    return
198
199
            # Insert the data
200
            # https://github.com/timescale/timescaledb/blob/main/README.md#insert-and-query-data
201
            insert_list = [f"({','.join(i)})" for i in values_list]
202
            insert_query = f"INSERT INTO {plugin} VALUES {','.join(insert_list)};"
203
            logger.debug(f"Insert data into table: {insert_query}")
204
            try:
205
                cur.execute(insert_query)
206
            except Exception as e:
207
                logger.error(f"Cannot insert data into table {plugin}: {e}")
208
                return
209
210
        # Commit the changes (for every plugin or to be done at the end ?)
211
        self.client.commit()
212
213
    def exit(self):
214
        """Close the TimescaleDB export module."""
215
        # Force last write
216
        self.client.commit()
217
218
        # Close the TimescaleDB client
219
        time.sleep(3)  # Wait a bit to ensure all data is written
220
        self.client.close()
221
222
        # Call the father method
223
        super().exit()
224