Test Failed
Push — master ( ee826a...d9056e )
by Nicolas
03:09
created

glances.exports.glances_mqtt.Export.init()   B

Complexity

Conditions 5

Size

Total Lines 28
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 21
nop 1
dl 0
loc 28
rs 8.9093
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""MQTT interface class."""
11
12
import socket
13
import string
14
import sys
15
16
from glances.logger import logger
17
from glances.exports.export import GlancesExport
18
from glances.globals import json_dumps
19
20
# Import paho for MQTT
21
import certifi
22
import paho.mqtt.client as paho
23
24
25
class Export(GlancesExport):
26
    """This class manages the MQTT export module."""
27
28
    def __init__(self, config=None, args=None):
29
        """Init the MQTT export IF."""
30
        super(Export, self).__init__(config=config, args=args)
31
32
        # Mandatory configuration keys (additional to host and port)
33
        self.user = None
34
        self.password = None
35
        self.topic = None
36
        self.tls = 'true'
37
38
        # Load the MQTT configuration file
39
        self.export_enable = self.load_conf(
40
            'mqtt',
41
            mandatories=['host', 'password'],
42
            options=['port', 'devicename', 'user', 'topic', 'tls', 'topic_structure', 'callback_api_version'],
43
        )
44
        if not self.export_enable:
45
            exit('Missing MQTT config')
46
47
        # Get the current hostname
48
        self.devicename = self.devicename or socket.gethostname()
49
        self.port = int(self.port) or 8883
50
        self.topic = self.topic or 'glances'
51
        self.user = self.user or 'glances'
52
        self.tls = self.tls and self.tls.lower() == 'true'
53
54
        self.topic_structure = (self.topic_structure or 'per-metric').lower()
55
        if self.topic_structure not in ['per-metric', 'per-plugin']:
56
            logger.critical("topic_structure must be either 'per-metric' or 'per-plugin'.")
57
            sys.exit(2)
58
59
        # Init the MQTT client
60
        self.client = self.init()
61
        if not self.client:
62
            exit("MQTT client initialization failed")
63
64
    def init(self):
65
        # Get the current callback api version
66
        self.callback_api_version = int(self.callback_api_version) or 2
67
68
        # Set enum for connection
69
        if self.callback_api_version == 1:
70
            self.callback_api_version = paho.CallbackAPIVersion.VERSION1
71
        else:
72
            self.callback_api_version = paho.CallbackAPIVersion.VERSION2
73
74
        """Init the connection to the MQTT server."""
75
        if not self.export_enable:
76
            return None
77
        try:
78
            client = paho.Client(
79
                callback_api_version=self.callback_api_version,
80
                client_id='glances_' + self.devicename,
81
                clean_session=False,
82
            )
83
            client.username_pw_set(username=self.user, password=self.password)
84
            if self.tls:
85
                client.tls_set(certifi.where())
86
            client.connect(host=self.host, port=self.port)
87
            client.loop_start()
88
            return client
89
        except Exception as e:
90
            logger.critical("Connection to MQTT server %s:%s failed with error: %s " % (self.host, self.port, e))
91
            return None
92
93
    def export(self, name, columns, points):
94
        """Write the points in MQTT."""
95
96
        WHITELIST = '_-' + string.ascii_letters + string.digits
97
        SUBSTITUTE = '_'
98
99
        def whitelisted(s, whitelist=WHITELIST, substitute=SUBSTITUTE):
100
            return ''.join(c if c in whitelist else substitute for c in s)
101
102
        if self.topic_structure == 'per-metric':
103
            for sensor, value in zip(columns, points):
104
                try:
105
                    sensor = [whitelisted(name) for name in sensor.split('.')]
106
                    to_export = [self.topic, self.devicename, name]
107
                    to_export.extend(sensor)
108
                    topic = '/'.join(to_export)
109
110
                    self.client.publish(topic, value)
111
                except Exception as e:
112
                    logger.error("Can not export stats to MQTT server (%s)" % e)
113
        elif self.topic_structure == 'per-plugin':
114
            try:
115
                topic = '/'.join([self.topic, self.devicename, name])
116
                sensor_values = dict(zip(columns, points))
117
118
                # Build the value to output
119
                output_value = dict()
120
                for key in sensor_values:
121
                    split_key = key.split('.')
122
123
                    # Add the parent keys if they don't exist
124
                    current_level = output_value
125
                    for depth in range(len(split_key) - 1):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable len does not seem to be defined.
Loading history...
126
                        if split_key[depth] not in current_level:
127
                            current_level[split_key[depth]] = dict()
128
                        current_level = current_level[split_key[depth]]
129
130
                    # Add the value
131
                    current_level[split_key[len(split_key) - 1]] = sensor_values[key]
132
133
                json_value = json_dumps(output_value)
134
                self.client.publish(topic, json_value)
135
            except Exception as e:
136
                logger.error("Can not export stats to MQTT server (%s)" % e)
137