Completed
Push — master ( 7fc0f8...8b57d9 )
by Nicolas
37s
created

glances/exports/glances_cassandra.py (1 issue)

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# Copyright (C) 2017 Nicolargo <[email protected]>
6
#
7
# Glances is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU Lesser General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# Glances is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Lesser General Public License for more details.
16
#
17
# You should have received a copy of the GNU Lesser General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
"""Cassandra/Scylla interface class."""
21
22
import sys
23
from datetime import datetime
24
from numbers import Number
25
26
from glances.logger import logger
27
from glances.exports.glances_export import GlancesExport
28
from glances.compat import iteritems
29
30
from cassandra.cluster import Cluster
31
from cassandra.util import uuid_from_time
32
from cassandra import InvalidRequest
33
34
35
class Export(GlancesExport):
36
37
    """This class manages the Cassandra/Scylla export module."""
38
39 View Code Duplication
    def __init__(self, config=None, args=None):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
40
        """Init the Cassandra export IF."""
41
        super(Export, self).__init__(config=config, args=args)
42
43
        # Mandatories configuration keys (additional to host and port)
44
        self.keyspace = None
45
46
        # Optionals configuration keys
47
        self.protocol_version = 3
48
        self.replication_factor = 2
49
        self.table = None
50
51
        # Load the Cassandra configuration file section
52
        self.export_enable = self.load_conf('cassandra',
53
                                            mandatories=['host', 'port', 'keyspace'],
54
                                            options=['protocol_version',
55
                                                     'replication_factor',
56
                                                     'table'])
57
        if not self.export_enable:
58
            sys.exit(2)
59
60
        # Init the Cassandra client
61
        self.cluster, self.session = self.init()
62
63
    def init(self):
64
        """Init the connection to the InfluxDB server."""
65
        if not self.export_enable:
66
            return None
67
68
        # Cluster
69
        try:
70
            cluster = Cluster([self.host],
71
                              port=int(self.port),
72
                              protocol_version=int(self.protocol_version))
73
            session = cluster.connect()
74
        except Exception as e:
75
            logger.critical("Cannot connect to Cassandra cluster '%s:%s' (%s)" % (self.host, self.port, e))
76
            sys.exit(2)
77
78
        # Keyspace
79
        try:
80
            session.set_keyspace(self.keyspace)
81
        except InvalidRequest as e:
82
            logger.info("Create keyspace {} on the Cassandra cluster".format(self.keyspace))
83
            c = "CREATE KEYSPACE %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '%s' }" % (self.keyspace, self.replication_factor)
84
            session.execute(c)
85
            session.set_keyspace(self.keyspace)
86
87
        logger.info(
88
            "Stats will be exported to Cassandra cluster {} ({}) in keyspace {}".format(
89
                cluster.metadata.cluster_name, cluster.metadata.all_hosts(), self.keyspace))
90
91
        # Table
92
        try:
93
            session.execute("CREATE TABLE %s (plugin text, time timeuuid, stat map<text,float>, PRIMARY KEY (plugin, time)) WITH CLUSTERING ORDER BY (time DESC)" % self.table)
94
        except Exception:
95
            logger.debug("Cassandra table %s already exist" % self.table)
96
97
        return cluster, session
98
99
    def export(self, name, columns, points):
100
        """Write the points to the Cassandra cluster."""
101
        logger.debug("Export {} stats to Cassandra".format(name))
102
103
        # Remove non number stats and convert all to float (for Boolean)
104
        data = {k: float(v) for (k, v) in dict(zip(columns, points)).iteritems() if isinstance(v, Number)}
105
106
        # Write input to the Cassandra table
107
        try:
108
            self.session.execute(
109
                """
110
                INSERT INTO localhost (plugin, time, stat)
111
                VALUES (%s, %s, %s)
112
                """,
113
                (name, uuid_from_time(datetime.now()), data)
114
            )
115
        except Exception as e:
116
            logger.error("Cannot export {} stats to Cassandra ({})".format(name, e))
117
118
    def exit(self):
119
        """Close the Cassandra export module."""
120
        # To ensure all connections are properly closed
121
        self.session.shutdown()
122
        self.cluster.shutdown()
123
        # Call the father method
124
        super(Export, self).exit()
125