1
|
|
|
# |
2
|
|
|
# This file is part of Glances. |
3
|
|
|
# |
4
|
|
|
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]> |
5
|
|
|
# |
6
|
|
|
# SPDX-License-Identifier: LGPL-3.0-only |
7
|
|
|
# |
8
|
|
|
|
9
|
|
|
"""Cassandra/Scylla interface class.""" |
10
|
|
|
|
11
|
|
|
import sys |
12
|
|
|
from datetime import datetime |
13
|
|
|
from numbers import Number |
14
|
|
|
|
15
|
|
|
from cassandra import InvalidRequest |
16
|
|
|
from cassandra.auth import PlainTextAuthProvider |
17
|
|
|
from cassandra.cluster import Cluster |
18
|
|
|
from cassandra.util import uuid_from_time |
19
|
|
|
|
20
|
|
|
from glances.exports.export import GlancesExport |
21
|
|
|
from glances.logger import logger |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
class Export(GlancesExport): |
25
|
|
|
"""This class manages the Cassandra/Scylla export module.""" |
26
|
|
|
|
27
|
|
|
def __init__(self, config=None, args=None): |
28
|
|
|
"""Init the Cassandra export IF.""" |
29
|
|
|
super().__init__(config=config, args=args) |
30
|
|
|
|
31
|
|
|
# Mandatory configuration keys (additional to host and port) |
32
|
|
|
self.keyspace = None |
33
|
|
|
|
34
|
|
|
# Optional configuration keys |
35
|
|
|
self.protocol_version = 3 |
36
|
|
|
self.replication_factor = 2 |
37
|
|
|
self.table = None |
38
|
|
|
self.username = None |
39
|
|
|
self.password = None |
40
|
|
|
|
41
|
|
|
# Load the Cassandra configuration file section |
42
|
|
|
self.export_enable = self.load_conf( |
43
|
|
|
'cassandra', |
44
|
|
|
mandatories=['host', 'port', 'keyspace'], |
45
|
|
|
options=['protocol_version', 'replication_factor', 'table', 'username', 'password'], |
46
|
|
|
) |
47
|
|
|
if not self.export_enable: |
48
|
|
|
sys.exit(2) |
49
|
|
|
|
50
|
|
|
# Init the Cassandra client |
51
|
|
|
self.cluster, self.session = self.init() |
52
|
|
|
|
53
|
|
|
def init(self): |
54
|
|
|
"""Init the connection to the Cassandra server.""" |
55
|
|
|
if not self.export_enable: |
56
|
|
|
return None |
57
|
|
|
|
58
|
|
|
# if username and/or password are not set the connection will try to connect with no auth |
59
|
|
|
auth_provider = PlainTextAuthProvider(username=self.username, password=self.password) |
60
|
|
|
|
61
|
|
|
# Cluster |
62
|
|
|
try: |
63
|
|
|
cluster = Cluster( |
64
|
|
|
[self.host], |
65
|
|
|
port=int(self.port), |
66
|
|
|
protocol_version=int(self.protocol_version), |
67
|
|
|
auth_provider=auth_provider, |
68
|
|
|
) |
69
|
|
|
session = cluster.connect() |
70
|
|
|
except Exception as e: |
71
|
|
|
logger.critical(f"Cannot connect to Cassandra cluster '{self.host}:{self.port}' ({e})") |
72
|
|
|
sys.exit(2) |
73
|
|
|
|
74
|
|
|
# Keyspace |
75
|
|
|
try: |
76
|
|
|
session.set_keyspace(self.keyspace) |
|
|
|
|
77
|
|
|
except InvalidRequest: |
78
|
|
|
logger.info(f"Create keyspace {self.keyspace} on the Cassandra cluster") |
79
|
|
|
c = ( |
80
|
|
|
f"CREATE KEYSPACE {self.keyspace} WITH " |
81
|
|
|
f"replication = {{ 'class': 'SimpleStrategy', 'replication_factor': '{self.replication_factor}' }}" |
82
|
|
|
) |
83
|
|
|
session.execute(c) |
84
|
|
|
session.set_keyspace(self.keyspace) |
85
|
|
|
|
86
|
|
|
logger.info( |
87
|
|
|
f"Stats will be exported to Cassandra cluster {cluster.metadata.cluster_name} " |
88
|
|
|
f"({cluster.metadata.all_hosts()}) in keyspace {self.keyspace}" |
89
|
|
|
) |
90
|
|
|
|
91
|
|
|
# Table |
92
|
|
|
try: |
93
|
|
|
session.execute( |
94
|
|
|
f"CREATE TABLE {self.table} " |
95
|
|
|
f"(plugin text, time timeuuid, stat map<text,float>, PRIMARY KEY (plugin, time)) " |
96
|
|
|
f"WITH CLUSTERING ORDER BY (time DESC)" |
97
|
|
|
) |
98
|
|
|
except Exception: |
99
|
|
|
logger.debug(f"Cassandra table {self.table} already exist") |
100
|
|
|
|
101
|
|
|
return cluster, session |
102
|
|
|
|
103
|
|
|
def export(self, name, columns, points): |
104
|
|
|
"""Write the points to the Cassandra cluster.""" |
105
|
|
|
logger.debug(f"Export {name} stats to Cassandra") |
106
|
|
|
|
107
|
|
|
# Remove non number stats and convert all to float (for Boolean) |
108
|
|
|
data = {k: float(v) for (k, v) in dict(zip(columns, points)).iteritems() if isinstance(v, Number)} |
109
|
|
|
|
110
|
|
|
# Write input to the Cassandra table |
111
|
|
|
try: |
112
|
|
|
stmt = f"INSERT INTO {self.table} (plugin, time, stat) VALUES (?, ?, ?)" |
113
|
|
|
query = self.session.prepare(stmt) |
114
|
|
|
self.session.execute(query, (name, uuid_from_time(datetime.now()), data)) |
115
|
|
|
except Exception as e: |
116
|
|
|
logger.error(f"Cannot export {name} stats to Cassandra ({e})") |
117
|
|
|
|
118
|
|
|
def exit(self): |
119
|
|
|
"""Close the Cassandra export module.""" |
120
|
|
|
# To ensure all connections are properly closed |
121
|
|
|
self.session.shutdown() |
122
|
|
|
self.cluster.shutdown() |
123
|
|
|
# Call the father method |
124
|
|
|
super().exit() |
125
|
|
|
|