1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of Glances. |
||
4 | # |
||
5 | # Copyright (C) 2019 Nicolargo <[email protected]> |
||
6 | # |
||
7 | # Glances is free software; you can redistribute it and/or modify |
||
8 | # it under the terms of the GNU Lesser General Public License as published by |
||
9 | # the Free Software Foundation, either version 3 of the License, or |
||
10 | # (at your option) any later version. |
||
11 | # |
||
12 | # Glances is distributed in the hope that it will be useful, |
||
13 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
14 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
15 | # GNU Lesser General Public License for more details. |
||
16 | # |
||
17 | # You should have received a copy of the GNU Lesser General Public License |
||
18 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
19 | |||
20 | """Kafka interface class.""" |
||
21 | |||
22 | import sys |
||
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
23 | |||
24 | from glances.logger import logger |
||
0 ignored issues
–
show
|
|||
25 | from glances.compat import iteritems |
||
0 ignored issues
–
show
|
|||
26 | from glances.exports.glances_export import GlancesExport |
||
0 ignored issues
–
show
|
|||
27 | |||
28 | from kafka import KafkaProducer |
||
0 ignored issues
–
show
|
|||
29 | import json |
||
0 ignored issues
–
show
|
|||
30 | |||
31 | |||
32 | class Export(GlancesExport): |
||
33 | |||
34 | """This class manages the Kafka export module.""" |
||
35 | |||
36 | def __init__(self, config=None, args=None): |
||
37 | """Init the Kafka export IF.""" |
||
38 | super(Export, self).__init__(config=config, args=args) |
||
39 | |||
40 | # Mandatories configuration keys (additional to host and port) |
||
41 | self.topic = None |
||
42 | |||
43 | # Optionals configuration keys |
||
44 | self.compression = None |
||
45 | |||
46 | # Load the Kafka configuration file section |
||
47 | self.export_enable = self.load_conf('kafka', |
||
48 | mandatories=['host', 'port', 'topic'], |
||
0 ignored issues
–
show
|
|||
49 | options=['compression']) |
||
50 | if not self.export_enable: |
||
51 | sys.exit(2) |
||
52 | |||
53 | # Init the kafka client |
||
54 | self.client = self.init() |
||
55 | |||
56 | def init(self): |
||
57 | """Init the connection to the Kafka server.""" |
||
58 | if not self.export_enable: |
||
59 | return None |
||
60 | |||
61 | # Build the server URI with host and port |
||
62 | server_uri = '{}:{}'.format(self.host, self.port) |
||
63 | |||
64 | try: |
||
65 | s = KafkaProducer(bootstrap_servers=server_uri, |
||
0 ignored issues
–
show
The name
s does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site.
Loading history...
|
|||
66 | value_serializer=lambda v: json.dumps(v).encode('utf-8'), |
||
0 ignored issues
–
show
|
|||
67 | compression_type=self.compression) |
||
68 | except Exception as e: |
||
0 ignored issues
–
show
Catching very general exceptions such as
Exception is usually not recommended.
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed. So, unless you specifically plan to handle any error, consider adding a more specific exception.
Loading history...
The name
e does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site.
Loading history...
|
|||
69 | logger.critical("Cannot connect to Kafka server %s (%s)" % (server_uri, e)) |
||
0 ignored issues
–
show
|
|||
70 | sys.exit(2) |
||
71 | else: |
||
72 | logger.info("Connected to the Kafka server %s" % server_uri) |
||
0 ignored issues
–
show
|
|||
73 | |||
74 | return s |
||
75 | |||
76 | def export(self, name, columns, points): |
||
77 | """Write the points to the kafka server.""" |
||
78 | logger.debug("Export {} stats to Kafka".format(name)) |
||
0 ignored issues
–
show
|
|||
79 | |||
80 | # Create DB input |
||
81 | data = dict(zip(columns, points)) |
||
82 | |||
83 | # Send stats to the kafka topic |
||
84 | # key=<plugin name> |
||
85 | # value=JSON dict |
||
86 | try: |
||
87 | self.client.send(self.topic, |
||
88 | key=name, |
||
89 | value=data) |
||
90 | except Exception as e: |
||
0 ignored issues
–
show
Catching very general exceptions such as
Exception is usually not recommended.
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed. So, unless you specifically plan to handle any error, consider adding a more specific exception.
Loading history...
The name
e does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site.
Loading history...
|
|||
91 | logger.error("Cannot export {} stats to Kafka ({})".format(name, e)) |
||
0 ignored issues
–
show
|
|||
92 | |||
93 | def exit(self): |
||
94 | """Close the Kafka export module.""" |
||
95 | # To ensure all connections are properly closed |
||
96 | self.client.flush() |
||
97 | self.client.close() |
||
98 | # Call the father method |
||
99 | super(Export, self).exit() |
||
100 |