Issues (49)

glances/exports/glances_cassandra/__init__.py (1 issue)

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Cassandra/Scylla interface class."""
10
11
import sys
12
from datetime import datetime
13
from numbers import Number
14
15
from cassandra import InvalidRequest
16
from cassandra.auth import PlainTextAuthProvider
17
from cassandra.cluster import Cluster
18
from cassandra.util import uuid_from_time
19
20
from glances.exports.export import GlancesExport
21
from glances.logger import logger
22
23
24
class Export(GlancesExport):
25
    """This class manages the Cassandra/Scylla export module."""
26
27
    def __init__(self, config=None, args=None):
28
        """Init the Cassandra export IF."""
29
        super().__init__(config=config, args=args)
30
31
        # Mandatory configuration keys (additional to host and port)
32
        self.keyspace = None
33
34
        # Optional configuration keys
35
        self.protocol_version = 3
36
        self.replication_factor = 2
37
        self.table = None
38
        self.username = None
39
        self.password = None
40
41
        # Load the Cassandra configuration file section
42
        self.export_enable = self.load_conf(
43
            'cassandra',
44
            mandatories=['host', 'port', 'keyspace'],
45
            options=['protocol_version', 'replication_factor', 'table', 'username', 'password'],
46
        )
47
        if not self.export_enable:
48
            sys.exit(2)
49
50
        # Init the Cassandra client
51
        self.cluster, self.session = self.init()
52
53
    def init(self):
54
        """Init the connection to the Cassandra server."""
55
        if not self.export_enable:
56
            return None
57
58
        # if username and/or password are not set the connection will try to connect with no auth
59
        auth_provider = PlainTextAuthProvider(username=self.username, password=self.password)
60
61
        # Cluster
62
        try:
63
            cluster = Cluster(
64
                [self.host],
65
                port=int(self.port),
66
                protocol_version=int(self.protocol_version),
67
                auth_provider=auth_provider,
68
            )
69
            session = cluster.connect()
70
        except Exception as e:
71
            logger.critical(f"Cannot connect to Cassandra cluster '{self.host}:{self.port}' ({e})")
72
            sys.exit(2)
73
74
        # Keyspace
75
        try:
76
            session.set_keyspace(self.keyspace)
0 ignored issues
show
The variable session does not seem to be defined for all execution paths.
Loading history...
77
        except InvalidRequest:
78
            logger.info(f"Create keyspace {self.keyspace} on the Cassandra cluster")
79
            c = (
80
                f"CREATE KEYSPACE {self.keyspace} WITH "
81
                f"replication = {{ 'class': 'SimpleStrategy', 'replication_factor': '{self.replication_factor}' }}"
82
            )
83
            session.execute(c)
84
            session.set_keyspace(self.keyspace)
85
86
        logger.info(
87
            f"Stats will be exported to Cassandra cluster {cluster.metadata.cluster_name} "
88
            f"({cluster.metadata.all_hosts()}) in keyspace {self.keyspace}"
89
        )
90
91
        # Table
92
        try:
93
            session.execute(
94
                f"CREATE TABLE {self.table} "
95
                f"(plugin text, time timeuuid, stat map<text,float>, PRIMARY KEY (plugin, time)) "
96
                f"WITH CLUSTERING ORDER BY (time DESC)"
97
            )
98
        except Exception:
99
            logger.debug(f"Cassandra table {self.table} already exist")
100
101
        return cluster, session
102
103
    def export(self, name, columns, points):
104
        """Write the points to the Cassandra cluster."""
105
        logger.debug(f"Export {name} stats to Cassandra")
106
107
        # Remove non number stats and convert all to float (for Boolean)
108
        data = {k: float(v) for (k, v) in dict(zip(columns, points)).iteritems() if isinstance(v, Number)}
109
110
        # Write input to the Cassandra table
111
        try:
112
            stmt = f"INSERT INTO {self.table} (plugin, time, stat) VALUES (?, ?, ?)"
113
            query = self.session.prepare(stmt)
114
            self.session.execute(query, (name, uuid_from_time(datetime.now()), data))
115
        except Exception as e:
116
            logger.error(f"Cannot export {name} stats to Cassandra ({e})")
117
118
    def exit(self):
119
        """Close the Cassandra export module."""
120
        # To ensure all connections are properly closed
121
        self.session.shutdown()
122
        self.cluster.shutdown()
123
        # Call the father method
124
        super().exit()
125