Test Failed
Push — develop ( 66c9ff...e21229 )
by Nicolas
05:06
created

glances/snmp.py (3 issues)

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# Copyright (C) 2019 Nicolargo <[email protected]>
6
#
7
# Glances is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU Lesser General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# Glances is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Lesser General Public License for more details.
16
#
17
# You should have received a copy of the GNU Lesser General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
import sys
21
22
from glances.logger import logger
0 ignored issues
show
import missing from __future__ import absolute_import
Loading history...
23
24
# Import mandatory PySNMP lib
25
try:
26
    from pysnmp.entity.rfc3413.oneliner import cmdgen
27
except ImportError:
28
    logger.critical("PySNMP library not found. To install it: pip install pysnmp")
0 ignored issues
show
This line is too long as per the coding-style (82/80).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
29
    sys.exit(2)
30
31
32
class GlancesSNMPClient(object):
33
34
    """SNMP client class (based on pysnmp library)."""
35
36
    def __init__(self, host='localhost', port=161, version='2c',
0 ignored issues
show
Too many arguments (7/5)
Loading history...
37
                 community='public', user='private', auth=''):
38
39
        super(GlancesSNMPClient, self).__init__()
40
        self.cmdGen = cmdgen.CommandGenerator()
41
42
        self.version = version
43
44
        self.host = host
45
        self.port = port
46
47
        self.community = community
48
        self.user = user
49
        self.auth = auth
50
51
    def __buid_result(self, varBinds):
52
        """Build the results."""
53
        ret = {}
54
        for name, val in varBinds:
55
            if str(val) == '':
56
                ret[name.prettyPrint()] = ''
57
            else:
58
                ret[name.prettyPrint()] = val.prettyPrint()
59
                # In Python 3, prettyPrint() return 'b'linux'' instead of 'linux'
60
                if ret[name.prettyPrint()].startswith('b\''):
61
                    ret[name.prettyPrint()] = ret[name.prettyPrint()][2:-1]
62
        return ret
63
64
    def __get_result__(self, errorIndication, errorStatus, errorIndex, varBinds):
65
        """Put results in table."""
66
        ret = {}
67
        if not errorIndication or not errorStatus:
68
            ret = self.__buid_result(varBinds)
69
        return ret
70
71
    def get_by_oid(self, *oid):
72
        """SNMP simple request (list of OID).
73
74
        One request per OID list.
75
76
        * oid: oid list
77
        > Return a dict
78
        """
79
        if self.version == '3':
80
            errorIndication, errorStatus, errorIndex, varBinds = self.cmdGen.getCmd(
81
                cmdgen.UsmUserData(self.user, self.auth),
82
                cmdgen.UdpTransportTarget((self.host, self.port)),
83
                *oid
84
            )
85
        else:
86
            errorIndication, errorStatus, errorIndex, varBinds = self.cmdGen.getCmd(
87
                cmdgen.CommunityData(self.community),
88
                cmdgen.UdpTransportTarget((self.host, self.port)),
89
                *oid
90
            )
91
        return self.__get_result__(errorIndication, errorStatus, errorIndex, varBinds)
92
93
    def __bulk_result__(self, errorIndication, errorStatus, errorIndex, varBindTable):
94
        ret = []
95
        if not errorIndication or not errorStatus:
96
            for varBindTableRow in varBindTable:
97
                ret.append(self.__buid_result(varBindTableRow))
98
        return ret
99
100
    def getbulk_by_oid(self, non_repeaters, max_repetitions, *oid):
101
        """SNMP getbulk request.
102
103
        In contrast to snmpwalk, this information will typically be gathered in
104
        a single transaction with the agent, rather than one transaction per
105
        variable found.
106
107
        * non_repeaters: This specifies the number of supplied variables that
108
          should not be iterated over.
109
        * max_repetitions: This specifies the maximum number of iterations over
110
          the repeating variables.
111
        * oid: oid list
112
        > Return a list of dicts
113
        """
114
        if self.version.startswith('3'):
115
            errorIndication, errorStatus, errorIndex, varBinds = self.cmdGen.getCmd(
116
                cmdgen.UsmUserData(self.user, self.auth),
117
                cmdgen.UdpTransportTarget((self.host, self.port)),
118
                non_repeaters,
119
                max_repetitions,
120
                *oid
121
            )
122
        if self.version.startswith('2'):
123
            errorIndication, errorStatus, errorIndex, varBindTable = self.cmdGen.bulkCmd(
124
                cmdgen.CommunityData(self.community),
125
                cmdgen.UdpTransportTarget((self.host, self.port)),
126
                non_repeaters,
127
                max_repetitions,
128
                *oid
129
            )
130
        else:
131
            # Bulk request are not available with SNMP version 1
132
            return []
133
        return self.__bulk_result__(errorIndication, errorStatus, errorIndex, varBindTable)
134