Completed
Push — develop ( 8b5b19...8ebade )
by Nicolas
05:17 queued 02:08
created

glances/exports/glances_kafka.py (4 issues)

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# Copyright (C) 2019 Nicolargo <[email protected]>
6
#
7
# Glances is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU Lesser General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# Glances is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Lesser General Public License for more details.
16
#
17
# You should have received a copy of the GNU Lesser General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
"""Kafka interface class."""
21
22
import sys
23
24
from glances.logger import logger
25
from glances.compat import iteritems
26
from glances.exports.glances_export import GlancesExport
27
28
from kafka import KafkaProducer
29
import json
30
import codecs
31
32
33
class Export(GlancesExport):
34
35
    """This class manages the Kafka export module."""
36
37
    def __init__(self, config=None, args=None):
38
        """Init the Kafka export IF."""
39
        super(Export, self).__init__(config=config, args=args)
40
41
        # Mandatories configuration keys (additional to host and port)
42
        self.topic = None
43
44
        # Optionals configuration keys
45
        self.compression = None
46
        self.tags = None
47
48
        # Load the Kafka configuration file section
49
        self.export_enable = self.load_conf('kafka',
50
                                            mandatories=['host', 'port',
51
                                                         'topic'],
52
                                            options=['compression',
53
                                                     'tags'])
54
        if not self.export_enable:
55
            sys.exit(2)
56
57
        # Init the kafka client
58
        self.client = self.init()
59
60
    def init(self):
61
        """Init the connection to the Kafka server."""
62
        if not self.export_enable:
63
            return None
64
65
        # Build the server URI with host and port
66
        server_uri = '{}:{}'.format(self.host, self.port)
67
68
        try:
69
            s = KafkaProducer(bootstrap_servers=server_uri,
70
                              value_serializer=lambda v: json.dumps(v).encode('utf-8'),
71
                              compression_type=self.compression)
72
        except Exception as e:
0 ignored issues
show
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
73
            logger.critical("Cannot connect to Kafka server %s (%s)" % (server_uri, e))
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
74
            sys.exit(2)
75
        else:
76
            logger.info("Connected to the Kafka server %s" % server_uri)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
77
78
        return s
79
80
    def export(self, name, columns, points):
81
        """Write the points to the kafka server."""
82
        logger.debug("Export {} stats to Kafka".format(name))
83
84
        # Create DB input
85
        data = dict(zip(columns, points))
86
        if self.tags is not None:
87
            data.update(self.parse_tags(self.tags))
88
89
        # Send stats to the kafka topic
90
        # key=<plugin name>
91
        # value=JSON dict
92
        try:
93
            self.client.send(self.topic,
94
                             # Kafka key name needs to be bytes #1593
95
                             key=name.encode('utf-8'),
96
                             value=data)
97
        except Exception as e:
0 ignored issues
show
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
98
            logger.error("Cannot export {} stats to Kafka ({})".format(name, e))
99
100
    def exit(self):
101
        """Close the Kafka export module."""
102
        # To ensure all connections are properly closed
103
        self.client.flush()
104
        self.client.close()
105
        # Call the father method
106
        super(Export, self).exit()
107