Test Failed
Push — master ( 69b639...e7fa0a )
by Nicolas
04:05 queued 01:05
created

glances.plugins.glances_ports.Plugin.get_key()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# Copyright (C) 2021 Nicolargo <[email protected]>
6
#
7
# Glances is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU Lesser General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# Glances is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Lesser General Public License for more details.
16
#
17
# You should have received a copy of the GNU Lesser General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
"""Ports scanner plugin."""
21
22
import os
23
import subprocess
24
import threading
25
import socket
26
import time
27
import numbers
28
29
from glances.globals import WINDOWS, MACOS, BSD
30
from glances.ports_list import GlancesPortsList
31
from glances.web_list import GlancesWebList
32
from glances.timer import Timer, Counter
33
from glances.compat import bool_type
34
from glances.logger import logger
35
from glances.plugins.glances_plugin import GlancesPlugin
36
37
try:
38
    import requests
39
    requests_tag = True
40
except ImportError as e:
41
    requests_tag = False
42
    logger.warning("Missing Python Lib ({}), Ports plugin is limited to port scanning".format(e))
43
44
45
class Plugin(GlancesPlugin):
46
    """Glances ports scanner plugin."""
47
48
    def __init__(self, args=None, config=None):
49
        """Init the plugin."""
50
        super(Plugin, self).__init__(args=args,
51
                                     config=config,
52
                                     stats_init_value=[])
53
        self.args = args
54
        self.config = config
55
56
        # We want to display the stat in the curse interface
57
        self.display_curse = True
58
59
        # Init stats
60
        self.stats = GlancesPortsList(config=config, args=args).get_ports_list() + \
61
            GlancesWebList(config=config, args=args).get_web_list()
62
63
        # Global Thread running all the scans
64
        self._thread = None
65
66
    def exit(self):
67
        """Overwrite the exit method to close threads."""
68
        if self._thread is not None:
69
            self._thread.stop()
70
        # Call the father class
71
        super(Plugin, self).exit()
72
73
    @GlancesPlugin._check_decorator
74
    @GlancesPlugin._log_result_decorator
75
    def update(self):
76
        """Update the ports list."""
77
        if self.input_method == 'local':
78
            # Only refresh:
79
            # * if there is not other scanning thread
80
            # * every refresh seconds (define in the configuration file)
81
            if self._thread is None:
82
                thread_is_running = False
83
            else:
84
                thread_is_running = self._thread.is_alive()
85
            if not thread_is_running:
86
                # Run ports scanner
87
                self._thread = ThreadScanner(self.stats)
88
                self._thread.start()
89
                # # Restart timer
90
                # if len(self.stats) > 0:
91
                #     self.timer_ports = Timer(self.stats[0]['refresh'])
92
                # else:
93
                #     self.timer_ports = Timer(0)
94
        else:
95
            # Not available in SNMP mode
96
            pass
97
98
        return self.stats
99
100
    def get_key(self):
101
        """Return the key of the list."""
102
        return 'indice'
103
104 View Code Duplication
    def get_ports_alert(self, port, header="", log=False):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
105
        """Return the alert status relative to the port scan return value."""
106
        ret = 'OK'
107
        if port['status'] is None:
108
            ret = 'CAREFUL'
109
        elif port['status'] == 0:
110
            ret = 'CRITICAL'
111
        elif (isinstance(port['status'], (float, int)) and
112
              port['rtt_warning'] is not None and
113
              port['status'] > port['rtt_warning']):
114
            ret = 'WARNING'
115
116
        # Get stat name
117
        stat_name = self.get_stat_name(header=header)
118
119
        # Manage threshold
120
        self.manage_threshold(stat_name, ret)
121
122
        # Manage action
123
        self.manage_action(stat_name,
124
                           ret.lower(),
125
                           header,
126
                           port[self.get_key()])
127
128
        return ret
129
130 View Code Duplication
    def get_web_alert(self, web, header="", log=False):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
131
        """Return the alert status relative to the web/url scan return value."""
132
        ret = 'OK'
133
        if web['status'] is None:
134
            ret = 'CAREFUL'
135
        elif web['status'] not in [200, 301, 302]:
136
            ret = 'CRITICAL'
137
        elif web['rtt_warning'] is not None and web['elapsed'] > web['rtt_warning']:
138
            ret = 'WARNING'
139
140
        # Get stat name
141
        stat_name = self.get_stat_name(header=header)
142
143
        # Manage threshold
144
        self.manage_threshold(stat_name, ret)
145
146
        # Manage action
147
        self.manage_action(stat_name,
148
                           ret.lower(),
149
                           header,
150
                           web[self.get_key()])
151
152
        return ret
153
154
    def msg_curse(self, args=None, max_width=None):
155
        """Return the dict to display in the curse interface."""
156
        # Init the return message
157
        # Only process if stats exist and display plugin enable...
158
        ret = []
159
160
        if not self.stats or args.disable_ports:
161
            return ret
162
163
        # Max size for the interface name
164
        name_max_width = max_width - 7
165
166
        # Build the string message
167
        for p in self.stats:
168
            if 'host' in p:
169
                if p['host'] is None:
170
                    status = 'None'
171
                elif p['status'] is None:
172
                    status = 'Scanning'
173
                elif isinstance(p['status'], bool_type) and p['status'] is True:
174
                    status = 'Open'
175
                elif p['status'] == 0:
176
                    status = 'Timeout'
177
                else:
178
                    # Convert second to ms
179
                    status = '{0:.0f}ms'.format(p['status'] * 1000.0)
180
181
                msg = '{:{width}}'.format(p['description'][0:name_max_width],
182
                                          width=name_max_width)
183
                ret.append(self.curse_add_line(msg))
184
                msg = '{:>9}'.format(status)
185
                ret.append(self.curse_add_line(msg,
186
                                               self.get_ports_alert(p,
187
                                                                    header=p['indice'] + '_rtt')))
188
                ret.append(self.curse_new_line())
189
            elif 'url' in p:
190
                msg = '{:{width}}'.format(p['description'][0:name_max_width],
191
                                          width=name_max_width)
192
                ret.append(self.curse_add_line(msg))
193
                if isinstance(p['status'], numbers.Number):
194
                    status = 'Code {}'.format(p['status'])
195
                elif p['status'] is None:
196
                    status = 'Scanning'
197
                else:
198
                    status = p['status']
199
                msg = '{:>9}'.format(status)
200
                ret.append(self.curse_add_line(msg,
201
                                               self.get_web_alert(p,
202
                                                                  header=p['indice'] + '_rtt')))
203
                ret.append(self.curse_new_line())
204
205
        # Delete the last empty line
206
        try:
207
            ret.pop()
208
        except IndexError:
209
            pass
210
211
        return ret
212
213
214
class ThreadScanner(threading.Thread):
215
    """
216
    Specific thread for the port/web scanner.
217
218
    stats is a list of dict
219
    """
220
221
    def __init__(self, stats):
222
        """Init the class."""
223
        logger.debug("ports plugin - Create thread for scan list {}".format(stats))
224
        super(ThreadScanner, self).__init__()
225
        # Event needed to stop properly the thread
226
        self._stopper = threading.Event()
227
        # The class return the stats as a list of dict
228
        self._stats = stats
229
        # Is part of Ports plugin
230
        self.plugin_name = "ports"
231
232
    def run(self):
233
        """Grab the stats.
234
235
        Infinite loop, should be stopped by calling the stop() method.
236
        """
237
        for p in self._stats:
238
            # End of the thread has been asked
239
            if self.stopped():
240
                break
241
            # Scan a port (ICMP or TCP)
242
            if 'port' in p:
243
                self._port_scan(p)
244
                # Had to wait between two scans
245
                # If not, result are not ok
246
                time.sleep(1)
247
            # Scan an URL
248
            elif 'url' in p and requests_tag:
249
                self._web_scan(p)
250
251
    @property
252
    def stats(self):
253
        """Stats getter."""
254
        return self._stats
255
256
    @stats.setter
257
    def stats(self, value):
258
        """Stats setter."""
259
        self._stats = value
260
261
    def stop(self, timeout=None):
262
        """Stop the thread."""
263
        logger.debug("ports plugin - Close thread for scan list {}".format(self._stats))
264
        self._stopper.set()
265
266
    def stopped(self):
267
        """Return True is the thread is stopped."""
268
        return self._stopper.isSet()
269
270
    def _web_scan(self, web):
271
        """Scan the  Web/URL (dict) and update the status key."""
272
        try:
273
            req = requests.head(web['url'],
274
                                allow_redirects=True,
275
                                verify=web['ssl_verify'],
276
                                proxies=web['proxies'],
277
                                timeout=web['timeout'])
278
        except Exception as e:
279
            logger.debug(e)
280
            web['status'] = 'Error'
281
            web['elapsed'] = 0
282
        else:
283
            web['status'] = req.status_code
284
            web['elapsed'] = req.elapsed.total_seconds()
285
        return web
286
287
    def _port_scan(self, port):
288
        """Scan the port structure (dict) and update the status key."""
289
        if int(port['port']) == 0:
290
            return self._port_scan_icmp(port)
291
        else:
292
            return self._port_scan_tcp(port)
293
294
    def _resolv_name(self, hostname):
295
        """Convert hostname to IP address."""
296
        ip = hostname
297
        try:
298
            ip = socket.gethostbyname(hostname)
299
        except Exception as e:
300
            logger.debug("{}: Cannot convert {} to IP address ({})".format(self.plugin_name, hostname, e))
301
        return ip
302
303
    def _port_scan_icmp(self, port):
304
        """Scan the (ICMP) port structure (dict) and update the status key."""
305
        ret = None
306
307
        # Create the ping command
308
        # Use the system ping command because it already have the steacky bit set
309
        # Python can not create ICMP packet with non root right
310
        if WINDOWS:
311
            timeout_opt = '-w'
312
            count_opt = '-n'
313
        elif MACOS or BSD:
314
            timeout_opt = '-t'
315
            count_opt = '-c'
316
        else:
317
            # Linux and co...
318
            timeout_opt = '-W'
319
            count_opt = '-c'
320
        # Build the command line
321
        # Note: Only string are allowed
322
        cmd = ['ping',
323
               count_opt, '1',
324
               timeout_opt, str(self._resolv_name(port['timeout'])),
325
               self._resolv_name(port['host'])]
326
        fnull = open(os.devnull, 'w')
327
328
        try:
329
            counter = Counter()
330
            ret = subprocess.check_call(cmd, stdout=fnull, stderr=fnull, close_fds=True)
331
            if ret == 0:
332
                port['status'] = counter.get()
333
            else:
334
                port['status'] = False
335
        except subprocess.CalledProcessError:
336
            # Correct issue #1084: No Offline status for timeouted ports
337
            port['status'] = False
338
        except Exception as e:
339
            logger.debug("{}: Error while pinging host {} ({})".format(self.plugin_name, port['host'], e))
340
341
        return ret
342
343
    def _port_scan_tcp(self, port):
344
        """Scan the (TCP) port structure (dict) and update the status key."""
345
        ret = None
346
347
        # Create and configure the scanning socket
348
        try:
349
            socket.setdefaulttimeout(port['timeout'])
350
            _socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
351
        except Exception as e:
352
            logger.debug("{}: Error while creating scanning socket ({})".format(self.plugin_name, e))
353
354
        # Scan port
355
        ip = self._resolv_name(port['host'])
356
        counter = Counter()
357
        try:
358
            ret = _socket.connect_ex((ip, int(port['port'])))
0 ignored issues
show
introduced by
The variable _socket does not seem to be defined for all execution paths.
Loading history...
359
        except Exception as e:
360
            logger.debug("{}: Error while scanning port {} ({})".format(self.plugin_name, port, e))
361
        else:
362
            if ret == 0:
363
                port['status'] = counter.get()
364
            else:
365
                port['status'] = False
366
        finally:
367
            _socket.close()
368
369
        return ret
370