Test Failed
Push — develop ( 66c9ff...e21229 )
by Nicolas
05:06
created

glances/exports/glances_kafka.py (1 issue)

Checks for unused imports

Unused Code Minor
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# Copyright (C) 2019 Nicolargo <[email protected]>
6
#
7
# Glances is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU Lesser General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# Glances is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Lesser General Public License for more details.
16
#
17
# You should have received a copy of the GNU Lesser General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
"""Kafka interface class."""
21
22
import sys
23
24
from glances.logger import logger
25
from glances.compat import iteritems
0 ignored issues
show
Unused iteritems imported from glances.compat
Loading history...
26
from glances.exports.glances_export import GlancesExport
27
28
from kafka import KafkaProducer
29
import json
30
31
32
class Export(GlancesExport):
33
34
    """This class manages the Kafka export module."""
35
36
    def __init__(self, config=None, args=None):
37
        """Init the Kafka export IF."""
38
        super(Export, self).__init__(config=config, args=args)
39
40
        # Mandatories configuration keys (additional to host and port)
41
        self.topic = None
42
43
        # Optionals configuration keys
44
        self.compression = None
45
46
        # Load the Kafka configuration file section
47
        self.export_enable = self.load_conf('kafka',
48
                                            mandatories=['host', 'port', 'topic'],
49
                                            options=['compression'])
50
        if not self.export_enable:
51
            sys.exit(2)
52
53
        # Init the kafka client
54
        self.client = self.init()
55
56
    def init(self):
57
        """Init the connection to the Kafka server."""
58
        if not self.export_enable:
59
            return None
60
61
        # Build the server URI with host and port
62
        server_uri = '{}:{}'.format(self.host, self.port)
63
64
        try:
65
            s = KafkaProducer(bootstrap_servers=server_uri,
66
                              value_serializer=lambda v: json.dumps(v).encode('utf-8'),
67
                              compression_type=self.compression)
68
        except Exception as e:
69
            logger.critical("Cannot connect to Kafka server %s (%s)" % (server_uri, e))
70
            sys.exit(2)
71
        else:
72
            logger.info("Connected to the Kafka server %s" % server_uri)
73
74
        return s
75
76
    def export(self, name, columns, points):
77
        """Write the points to the kafka server."""
78
        logger.debug("Export {} stats to Kafka".format(name))
79
80
        # Create DB input
81
        data = dict(zip(columns, points))
82
83
        # Send stats to the kafka topic
84
        # key=<plugin name>
85
        # value=JSON dict
86
        try:
87
            self.client.send(self.topic,
88
                             key=name,
89
                             value=data)
90
        except Exception as e:
91
            logger.error("Cannot export {} stats to Kafka ({})".format(name, e))
92
93
    def exit(self):
94
        """Close the Kafka export module."""
95
        # To ensure all connections are properly closed
96
        self.client.flush()
97
        self.client.close()
98
        # Call the father method
99
        super(Export, self).exit()
100