Export.update()   F
last analyzed

Complexity

Conditions 15

Size

Total Lines 73
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 15
eloc 50
nop 2
dl 0
loc 73
rs 2.9998
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.exports.glances_timescaledb.Export.update() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2025 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""TimescaleDB interface class."""
10
11
import sys
12
import time
13
from platform import node
14
15
import psycopg
16
17
from glances.exports.export import GlancesExport
18
from glances.logger import logger
19
20
# Define the type conversions for TimescaleDB
21
# https://www.postgresql.org/docs/current/datatype.html
22
convert_types = {
23
    'bool': 'BOOLEAN',
24
    'int': 'BIGINT',
25
    'float': 'DOUBLE PRECISION',
26
    'str': 'TEXT',
27
    'tuple': 'TEXT',  # Store tuples as TEXT (comma-separated)
28
    'list': 'TEXT',  # Store lists as TEXT (comma-separated)
29
    'NoneType': 'DOUBLE PRECISION',  # Use DOUBLE PRECISION for NoneType to avoid issues with NULL
30
}
31
32
33
class Export(GlancesExport):
34
    """This class manages the TimescaleDB export module."""
35
36
    def __init__(self, config=None, args=None):
37
        """Init the TimescaleDB export IF."""
38
        super().__init__(config=config, args=args)
39
40
        # Mandatory configuration keys (additional to host and port)
41
        self.db = None
42
43
        # Optional configuration keys
44
        self.user = None
45
        self.password = None
46
        self.hostname = None
47
48
        # Load the configuration file
49
        self.export_enable = self.load_conf(
50
            'timescaledb', mandatories=['host', 'port', 'db'], options=['user', 'password', 'hostname']
51
        )
52
        if not self.export_enable:
53
            exit('Missing TimescaleDB config')
54
55
        # The hostname is always add as an identifier in the TimescaleDB table
56
        # so we can filter the stats by hostname
57
        self.hostname = self.hostname or node().split(".")[0]
58
59
        # Init the TimescaleDB client
60
        self.client = self.init()
61
62
    def init(self):
63
        """Init the connection to the TimescaleDB server."""
64
        if not self.export_enable:
65
            return None
66
67
        try:
68
            # See https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
69
            conn_str = f"host={self.host} port={self.port} dbname={self.db} user={self.user} password={self.password}"
70
            db = psycopg.connect(conn_str)
71
        except Exception as e:
72
            logger.critical(f"Cannot connect to TimescaleDB server {self.host}:{self.port} ({e})")
73
            sys.exit(2)
74
        else:
75
            logger.info(f"Stats will be exported to TimescaleDB server: {self.host}:{self.port}")
76
77
        return db
78
79
    def normalize(self, value):
80
        """Normalize the value to be exportable to TimescaleDB."""
81
        if value is None:
82
            return 'NULL'
83
        if isinstance(value, bool):
84
            return str(value).upper()
85
        if isinstance(value, (list, tuple)):
86
            return ', '.join([f"'{v}'" for v in value])
87
        if isinstance(value, str):
88
            return f"'{value}'"
89
90
        return f"{value}"
91
92
    def update(self, stats):
93
        """Update the TimescaleDB export module."""
94
        if not self.export_enable:
95
            return False
96
97
        # Get all the stats & limits
98
        # Current limitation with sensors and fs plugins because fields list is not the same
99
        self._last_exported_list = [p for p in self.plugins_to_export(stats) if p not in ['sensors', 'fs']]
100
        all_stats = stats.getAllExportsAsDict(plugin_list=self.last_exported_list())
101
        all_limits = stats.getAllLimitsAsDict(plugin_list=self.last_exported_list())
102
103
        # Loop over plugins to export
104
        for plugin in self.last_exported_list():
105
            if isinstance(all_stats[plugin], dict):
106
                all_stats[plugin].update(all_limits[plugin])
107
                # Remove the <plugin>_disable field
108
                all_stats[plugin].pop(f"{plugin}_disable", None)
109
                # user is a special field that should not be exported
110
                # rename it to user_<plugin>
111
                if 'user' in all_stats[plugin]:
112
                    all_stats[plugin][f'user_{plugin}'] = all_stats[plugin].pop('user')
113
            elif isinstance(all_stats[plugin], list):
114
                for i in all_stats[plugin]:
115
                    i.update(all_limits[plugin])
116
                    # Remove the <plugin>_disable field
117
                    i.pop(f"{plugin}_disable", None)
118
                    # user is a special field that should not be exported
119
                    # rename it to user_<plugin>
120
                    if 'user' in i:
121
                        i[f'user_{plugin}'] = i.pop('user')
122
            else:
123
                continue
124
125
            plugin_stats = all_stats[plugin]
126
            creation_list = []  # List used to create the TimescaleDB table
127
            segmented_by = []  # List of columns used to segment the data
128
            values_list = []  # List of values to insert (list of lists, one list per row)
129
            if isinstance(plugin_stats, dict):
130
                # Stats is a dict
131
                # Create the list used to create the TimescaleDB table
132
                creation_list.append('time TIMESTAMPTZ NOT NULL')
133
                creation_list.append('hostname_id TEXT NOT NULL')
134
                segmented_by.extend(['hostname_id'])  # Segment by hostname
135
                for key, value in plugin_stats.items():
136
                    creation_list.append(f"{key} {convert_types[type(value).__name__]} NULL")
137
                values_list.append('NOW()')  # Add the current time (insertion time)
138
                values_list.append(f"'{self.hostname}'")  # Add the hostname
139
                values_list.extend([self.normalize(value) for value in plugin_stats.values()])
140
                values_list = [values_list]
141
            elif isinstance(plugin_stats, list) and len(plugin_stats) > 0 and 'key' in plugin_stats[0]:
142
                # Stats is a list
143
                # Create the list used to create the TimescaleDB table
144
                creation_list.append('time TIMESTAMPTZ NOT NULL')
145
                creation_list.append('hostname_id TEXT NOT NULL')
146
                creation_list.append('key_id TEXT NOT NULL')
147
                segmented_by.extend(['hostname_id', 'key_id'])  # Segment by hostname and key
148
                for key, value in plugin_stats[0].items():
149
                    creation_list.append(f"{key} {convert_types[type(value).__name__]} NULL")
150
                # Create the values list (it is a list of list to have a single datamodel for all the plugins)
151
                for plugin_item in plugin_stats:
152
                    item_list = []
153
                    item_list.append('NOW()')  # Add the current time (insertion time)
154
                    item_list.append(f"'{self.hostname}'")  # Add the hostname
155
                    item_list.append(f"'{plugin_item.get('key')}'")
156
                    item_list.extend([self.normalize(value) for value in plugin_item.values()])
157
                    values_list.append(item_list[:-1])
158
            else:
159
                continue
160
161
            # Export stats to TimescaleDB
162
            self.export(plugin, creation_list, segmented_by, values_list)
163
164
        return True
165
166
    def export(self, plugin, creation_list, segmented_by, values_list):
167
        """Export the stats to the TimescaleDB server."""
168
        logger.debug(f"Export {plugin} stats to TimescaleDB")
169
170
        with self.client.cursor() as cur:
171
            # Is the table exists?
172
            cur.execute(f"select exists(select * from information_schema.tables where table_name='{plugin}')")
173
            if not cur.fetchone()[0]:
174
                # Create the table if it does not exist
175
                # https://github.com/timescale/timescaledb/blob/main/README.md#create-a-hypertable
176
                # Execute the create table query
177
                create_query = f"""
178
CREATE TABLE {plugin} (
179
    {', '.join(creation_list)}
180
)
181
WITH (
182
    timescaledb.hypertable,
183
    timescaledb.partition_column='time',
184
    timescaledb.segmentby = '{", ".join(segmented_by)}'
185
);"""
186
                logger.debug(f"Create table: {create_query}")
187
                try:
188
                    cur.execute(create_query)
189
                except Exception as e:
190
                    logger.error(f"Cannot create table {plugin}: {e}")
191
                    return
192
193
            # Insert the data
194
            # https://github.com/timescale/timescaledb/blob/main/README.md#insert-and-query-data
195
            insert_list = [f"({','.join(i)})" for i in values_list]
196
            insert_query = f"INSERT INTO {plugin} VALUES {','.join(insert_list)};"
197
            logger.debug(f"Insert data into table: {insert_query}")
198
            try:
199
                cur.execute(insert_query)
200
            except Exception as e:
201
                logger.error(f"Cannot insert data into table {plugin}: {e}")
202
                return
203
204
        # Commit the changes (for every plugin or to be done at the end ?)
205
        self.client.commit()
206
207
    def exit(self):
208
        """Close the TimescaleDB export module."""
209
        # Force last write
210
        self.client.commit()
211
212
        # Close the TimescaleDB client
213
        time.sleep(3)  # Wait a bit to ensure all data is written
214
        self.client.close()
215
216
        # Call the father method
217
        super().exit()
218