glances.snmp.GlancesSNMPClient.__bulk_result__()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 6
nop 5
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
import sys
10
11
from glances.logger import logger
12
13
# Import mandatory PySNMP lib
14
try:
15
    from pysnmp.entity.rfc3413.oneliner import cmdgen
16
except ImportError:
17
    logger.critical("PySNMP library not found. To install it: pip install pysnmp")
18
    sys.exit(2)
19
20
21
class GlancesSNMPClient:
22
    """SNMP client class (based on pysnmp library)."""
23
24
    def __init__(self, host='localhost', port=161, version='2c', community='public', user='private', auth=''):
25
        super().__init__()
26
        self.cmdGen = cmdgen.CommandGenerator()
27
28
        self.version = version
29
30
        self.host = host
31
        self.port = port
32
33
        self.community = community
34
        self.user = user
35
        self.auth = auth
36
37
    def __buid_result(self, varBinds):
38
        """Build the results."""
39
        ret = {}
40
        for name, val in varBinds:
41
            if str(val) == '':
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
42
                ret[str(name)] = ''
43
            else:
44
                ret[str(name)] = val.prettyPrint()
45
        return ret
46
47
    def __get_result__(self, errorIndication, errorStatus, errorIndex, varBinds):
48
        """Put results in table."""
49
        ret = {}
50
        if not errorIndication or not errorStatus:
51
            ret = self.__buid_result(varBinds)
52
        return ret
53
54
    def get_by_oid(self, *oid):
55
        """SNMP simple request (list of OID).
56
57
        One request per OID list.
58
59
        :param oid: oid list
60
        :return: a dict
61
        """
62
        if self.version == '3':
63
            errorIndication, errorStatus, errorIndex, varBinds = self.cmdGen.getCmd(
64
                cmdgen.UsmUserData(self.user, self.auth), cmdgen.UdpTransportTarget((self.host, self.port)), *oid
65
            )
66
        else:
67
            errorIndication, errorStatus, errorIndex, varBinds = self.cmdGen.getCmd(
68
                cmdgen.CommunityData(self.community), cmdgen.UdpTransportTarget((self.host, self.port)), *oid
69
            )
70
        return self.__get_result__(errorIndication, errorStatus, errorIndex, varBinds)
71
72
    def __bulk_result__(self, errorIndication, errorStatus, errorIndex, varBindTable):
73
        ret = []
74
        if not errorIndication or not errorStatus:
75
            for varBindTableRow in varBindTable:
76
                ret.append(self.__buid_result(varBindTableRow))
77
        return ret
78
79
    def getbulk_by_oid(self, non_repeaters, max_repetitions, *oid):
80
        """SNMP getbulk request.
81
82
        In contrast to snmpwalk, this information will typically be gathered in
83
        a single transaction with the agent, rather than one transaction per
84
        variable found.
85
86
        * non_repeaters: This specifies the number of supplied variables that
87
          should not be iterated over.
88
        * max_repetitions: This specifies the maximum number of iterations over
89
          the repeating variables.
90
        * oid: oid list
91
        > Return a list of dicts
92
        """
93
        if self.version.startswith('3'):
94
            errorIndication, errorStatus, errorIndex, varBinds = self.cmdGen.getCmd(
95
                cmdgen.UsmUserData(self.user, self.auth),
96
                cmdgen.UdpTransportTarget((self.host, self.port)),
97
                non_repeaters,
98
                max_repetitions,
99
                *oid,
100
            )
101
        if self.version.startswith('2'):
102
            errorIndication, errorStatus, errorIndex, varBindTable = self.cmdGen.bulkCmd(
103
                cmdgen.CommunityData(self.community),
104
                cmdgen.UdpTransportTarget((self.host, self.port)),
105
                non_repeaters,
106
                max_repetitions,
107
                *oid,
108
            )
109
        else:
110
            # Bulk request are not available with SNMP version 1
111
            return []
112
        return self.__bulk_result__(errorIndication, errorStatus, errorIndex, varBindTable)
113