Issues (45)

glances/exports/glances_cassandra/__init__.py (1 issue)

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Cassandra/Scylla interface class."""
11
12
import sys
13
from datetime import datetime
14
from numbers import Number
15
16
from glances.logger import logger
17
from glances.exports.export import GlancesExport
18
19
from cassandra.auth import PlainTextAuthProvider
20
from cassandra.cluster import Cluster
21
from cassandra.util import uuid_from_time
22
from cassandra import InvalidRequest
23
24
25
class Export(GlancesExport):
26
27
    """This class manages the Cassandra/Scylla export module."""
28
29
    def __init__(self, config=None, args=None):
30
        """Init the Cassandra export IF."""
31
        super(Export, self).__init__(config=config, args=args)
32
33
        # Mandatory configuration keys (additional to host and port)
34
        self.keyspace = None
35
36
        # Optional configuration keys
37
        self.protocol_version = 3
38
        self.replication_factor = 2
39
        self.table = None
40
        self.username = None
41
        self.password = None
42
43
        # Load the Cassandra configuration file section
44
        self.export_enable = self.load_conf(
45
            'cassandra',
46
            mandatories=['host', 'port', 'keyspace'],
47
            options=['protocol_version', 'replication_factor', 'table', 'username', 'password'],
48
        )
49
        if not self.export_enable:
50
            sys.exit(2)
51
52
        # Init the Cassandra client
53
        self.cluster, self.session = self.init()
54
55
    def init(self):
56
        """Init the connection to the Cassandra server."""
57
        if not self.export_enable:
58
            return None
59
60
        # if username and/or password are not set the connection will try to connect with no auth
61
        auth_provider = PlainTextAuthProvider(username=self.username, password=self.password)
62
63
        # Cluster
64
        try:
65
            cluster = Cluster(
66
                [self.host],
67
                port=int(self.port),
68
                protocol_version=int(self.protocol_version),
69
                auth_provider=auth_provider,
70
            )
71
            session = cluster.connect()
72
        except Exception as e:
73
            logger.critical("Cannot connect to Cassandra cluster '%s:%s' (%s)" % (self.host, self.port, e))
74
            sys.exit(2)
75
76
        # Keyspace
77
        try:
78
            session.set_keyspace(self.keyspace)
0 ignored issues
show
The variable session does not seem to be defined for all execution paths.
Loading history...
79
        except InvalidRequest:
80
            logger.info("Create keyspace {} on the Cassandra cluster".format(self.keyspace))
81
            c = "CREATE KEYSPACE %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '%s' }" % (
82
                self.keyspace,
83
                self.replication_factor,
84
            )
85
            session.execute(c)
86
            session.set_keyspace(self.keyspace)
87
88
        logger.info(
89
            "Stats will be exported to Cassandra cluster {} ({}) in keyspace {}".format(
90
                cluster.metadata.cluster_name, cluster.metadata.all_hosts(), self.keyspace
91
            )
92
        )
93
94
        # Table
95
        try:
96
            session.execute(
97
                "CREATE TABLE %s (plugin text, time timeuuid, stat map<text,float>, PRIMARY KEY (plugin, time)) \
98
                    WITH CLUSTERING ORDER BY (time DESC)"
99
                % self.table
100
            )
101
        except Exception:
102
            logger.debug("Cassandra table %s already exist" % self.table)
103
104
        return cluster, session
105
106
    def export(self, name, columns, points):
107
        """Write the points to the Cassandra cluster."""
108
        logger.debug("Export {} stats to Cassandra".format(name))
109
110
        # Remove non number stats and convert all to float (for Boolean)
111
        data = {k: float(v) for (k, v) in dict(zip(columns, points)).iteritems() if isinstance(v, Number)}
112
113
        # Write input to the Cassandra table
114
        try:
115
            stmt = "INSERT INTO {} (plugin, time, stat) VALUES (?, ?, ?)".format(self.table)
116
            query = self.session.prepare(stmt)
117
            self.session.execute(query, (name, uuid_from_time(datetime.now()), data))
118
        except Exception as e:
119
            logger.error("Cannot export {} stats to Cassandra ({})".format(name, e))
120
121
    def exit(self):
122
        """Close the Cassandra export module."""
123
        # To ensure all connections are properly closed
124
        self.session.shutdown()
125
        self.cluster.shutdown()
126
        # Call the father method
127
        super(Export, self).exit()
128