Test Failed
Pull Request — develop (#2701)
by
unknown
02:08
created

glances.exports.mqtt.Export.__init__()   B

Complexity

Conditions 5

Size

Total Lines 37
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 25
nop 3
dl 0
loc 37
rs 8.8133
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""MQTT interface class."""
11
12
import socket
13
import string
14
import sys
15
16
from glances.logger import logger
17
from glances.exports.export import GlancesExport
18
from glances.globals import json_dumps
19
20
# Import paho for MQTT
21
import certifi
22
import paho.mqtt.client as paho
23
24
25
class Export(GlancesExport):
26
27
    """This class manages the MQTT export module."""
28
29
    def __init__(self, config=None, args=None):
30
        """Init the MQTT export IF."""
31
        super(Export, self).__init__(config=config, args=args)
32
33
        # Mandatory configuration keys (additional to host and port)
34
        self.user = None
35
        self.password = None
36
        self.topic = None
37
        self.tls = 'true'
38
39
        # Load the MQTT configuration file
40
        self.export_enable = self.load_conf(
41
            'mqtt', mandatories=['host', 'password'], options=['port', 'user', 'hostname', 'topic', 'tls', 'topic_structure']
42
        )
43
        if not self.export_enable:
44
            exit('Missing MQTT config')
45
46
        # Get the current hostname
47
        self.hostname = (self.hostname or socket.gethostname())
48
        if self.hostname in ['NONE']:
49
            self.hostname = socket.gethostname()
50
        else:
51
            self.hostname = self.hostname
52
        self.port = int(self.port) or 8883
53
        self.topic = self.topic or 'glances'
54
        self.user = self.user or 'glances'
55
        self.tls = self.tls and self.tls.lower() == 'true'
56
57
        self.topic_structure = (self.topic_structure or 'per-metric').lower()
58
        if self.topic_structure not in ['per-metric', 'per-plugin']:
59
            logger.critical("topic_structure must be either 'per-metric' or 'per-plugin'.")
60
            sys.exit(2)
61
62
        # Init the MQTT client
63
        self.client = self.init()
64
        if not self.client:
65
            exit("MQTT client initialization failed")
66
67
    def init(self):
68
        """Init the connection to the MQTT server."""
69
        if not self.export_enable:
70
            return None
71
        try:
72
            client = paho.Client(client_id='glances_' + self.hostname, clean_session=False)
73
            client.username_pw_set(username=self.user, password=self.password)
74
            if self.tls:
75
                client.tls_set(certifi.where())
76
            client.connect(host=self.host, port=self.port)
77
            client.loop_start()
78
            return client
79
        except Exception as e:
80
            logger.critical("Connection to MQTT server %s:%s failed with error: %s " % (self.host, self.port, e))
81
            return None
82
83
    def export(self, name, columns, points):
84
        """Write the points in MQTT."""
85
86
        WHITELIST = '_-' + string.ascii_letters + string.digits
87
        SUBSTITUTE = '_'
88
89
        def whitelisted(s, whitelist=WHITELIST, substitute=SUBSTITUTE):
90
            return ''.join(c if c in whitelist else substitute for c in s)
91
92
        if self.topic_structure == 'per-metric':
93
            for sensor, value in zip(columns, points):
94
                try:
95
                    sensor = [whitelisted(name) for name in sensor.split('.')]
96
                    to_export = [self.topic, self.hostname, name]
97
                    to_export.extend(sensor)
98
                    topic = '/'.join(to_export)
99
100
                    self.client.publish(topic, value)
101
                except Exception as e:
102
                    logger.error("Can not export stats to MQTT server (%s)" % e)
103
        elif self.topic_structure == 'per-plugin':
104
            try:
105
                topic = '/'.join([self.topic, self.hostname, name])
106
                sensor_values = dict(zip(columns, points))
107
108
                # Build the value to output
109
                output_value = dict()
110
                for key in sensor_values:
111
                    split_key = key.split('.')
112
113
                    # Add the parent keys if they don't exist
114
                    current_level = output_value
115
                    for depth in range(len(split_key) - 1):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable len does not seem to be defined.
Loading history...
116
                        if split_key[depth] not in current_level:
117
                            current_level[split_key[depth]] = dict()
118
                        current_level = current_level[split_key[depth]]
119
120
                    # Add the value
121
                    current_level[split_key[len(split_key) - 1]] = sensor_values[key]
122
123
                json_value = json_dumps(output_value)
124
                self.client.publish(topic, json_value)
125
            except Exception as e:
126
                logger.error("Can not export stats to MQTT server (%s)" % e)
127