Test Failed
Push — develop ( f27c39...6e485a )
by Nicolas
04:57 queued 02:14
created

glances.plugins.ports.PluginModel.msg_curse()   A

Complexity

Conditions 5

Size

Total Lines 28
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 15
nop 3
dl 0
loc 28
rs 9.1832
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Ports scanner plugin."""
10
11
import numbers
12
import os
13
import socket
14
import subprocess
15
import threading
16
import time
17
from functools import partial, reduce
18
19
from glances.globals import BSD, MACOS, WINDOWS, bool_type
20
from glances.logger import logger
21
from glances.plugins.plugin.model import GlancesPluginModel
22
from glances.ports_list import GlancesPortsList
23
from glances.timer import Counter
24
from glances.web_list import GlancesWebList
25
26
try:
27
    import requests
28
29
    requests_tag = True
30
except ImportError as e:
31
    requests_tag = False
32
    logger.warning(f"Missing Python Lib ({e}), Ports plugin is limited to port scanning")
33
34
# Fields description
35
# description: human readable description
36
# short_name: shortname to use un UI
37
# unit: unit type
38
# rate: is it a rate ? If yes, // by time_since_update when displayed,
39
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
40
fields_description = {
41
    'host': {
42
        'description': 'Measurement is be done on this host (or IP address)',
43
    },
44
    'port': {
45
        'description': 'Measurement is be done on this port (0 for ICMP)',
46
    },
47
    'description': {
48
        'description': 'Human readable description for the host/port',
49
    },
50
    'refresh': {
51
        'description': 'Refresh time (in seconds) for this host/port',
52
    },
53
    'timeout': {
54
        'description': 'Timeout (in seconds) for the measurement',
55
    },
56
    'status': {
57
        'description': 'Measurement result (in seconds)',
58
        'unit': 'second',
59
    },
60
    'rtt_warning': {
61
        'description': 'Warning threshold (in seconds) for the measurement',
62
        'unit': 'second',
63
    },
64
    'indice': {
65
        'description': 'Unique indice for the host/port',
66
    },
67
}
68
69
70
class PluginModel(GlancesPluginModel):
71
    """Glances ports scanner plugin."""
72
73
    def __init__(self, args=None, config=None):
74
        """Init the plugin."""
75
        super().__init__(args=args, config=config, stats_init_value=[], fields_description=fields_description)
76
        self.args = args
77
        self.config = config
78
79
        # We want to display the stat in the curse interface
80
        self.display_curse = True
81
82
        # Init stats
83
        self.stats = (
84
            GlancesPortsList(config=config, args=args).get_ports_list()
85
            + GlancesWebList(config=config, args=args).get_web_list()
86
        )
87
88
        # Global Thread running all the scans
89
        self._thread = None
90
91
    def exit(self):
92
        """Overwrite the exit method to close threads."""
93
        if self._thread is not None:
94
            self._thread.stop()
95
        # Call the father class
96
        super().exit()
97
98
    def get_key(self):
99
        """Return the key of the list."""
100
        return 'indice'
101
102
    @GlancesPluginModel._check_decorator
103
    @GlancesPluginModel._log_result_decorator
104
    def update(self):
105
        """Update the ports list."""
106
        if self.input_method == 'local':
107
            # Only refresh:
108
            # * if there is not other scanning thread
109
            # * every refresh seconds (define in the configuration file)
110
            if self._thread is None:
111
                thread_is_running = False
112
            else:
113
                thread_is_running = self._thread.is_alive()
114
            if not thread_is_running:
115
                # Run ports scanner
116
                self._thread = ThreadScanner(self.stats)
117
                self._thread.start()
118
        else:
119
            # Not available in SNMP mode
120
            pass
121
122
        return self.stats
123
124 View Code Duplication
    def get_ports_alert(self, port, header="", log=False):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
125
        """Return the alert status relative to the port scan return value."""
126
        ret = 'OK'
127
        if port['status'] is None:
128
            ret = 'CAREFUL'
129
        elif port['status'] == 0:
130
            ret = 'CRITICAL'
131
        elif (
132
            isinstance(port['status'], (float, int))
133
            and port['rtt_warning'] is not None
134
            and port['status'] > port['rtt_warning']
135
        ):
136
            ret = 'WARNING'
137
138
        # Get stat name
139
        stat_name = self.get_stat_name(header=header)
140
141
        # Manage threshold
142
        self.manage_threshold(stat_name, ret)
143
144
        # Manage action
145
        self.manage_action(stat_name, ret.lower(), header, port[self.get_key()])
146
147
        return ret
148
149 View Code Duplication
    def get_web_alert(self, web, header="", log=False):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
150
        """Return the alert status relative to the web/url scan return value."""
151
        ret = 'OK'
152
        if web['status'] is None:
153
            ret = 'CAREFUL'
154
        elif web['status'] not in [200, 301, 302]:
155
            ret = 'CRITICAL'
156
        elif web['rtt_warning'] is not None and web['elapsed'] > web['rtt_warning']:
157
            ret = 'WARNING'
158
159
        # Get stat name
160
        stat_name = self.get_stat_name(header=header)
161
162
        # Manage threshold
163
        self.manage_threshold(stat_name, ret)
164
165
        # Manage action
166
        self.manage_action(stat_name, ret.lower(), header, web[self.get_key()])
167
168
        return ret
169
170
    def set_status_if_host(self, p):
171
        if p['host'] is None:
172
            status = 'None'
173
        elif p['status'] is None:
174
            status = 'Scanning'
175
        elif isinstance(p['status'], bool_type) and p['status'] is True:
176
            status = 'Open'
177
        elif p['status'] == 0:
178
            status = 'Timeout'
179
        else:
180
            # Convert second to ms
181
            status = '{:.0f}ms'.format(p['status'] * 1000.0)
182
183
        return status
184
185
    def set_status_if_url(self, p):
186
        if isinstance(p['status'], numbers.Number):
187
            status = 'Code {}'.format(p['status'])
188
        elif p['status'] is None:
189
            status = 'Scanning'
190
        else:
191
            status = p['status']
192
193
        return status
194
195
    def build_str(self, name_max_width, ret, p):
196
        if 'host' in p:
197
            helper = self.get_ports_alert
198
            status = self.set_status_if_host(p)
199
        elif 'url' in p:
200
            helper = self.get_web_alert
201
            status = self.set_status_if_url(p)
202
203
        msg = '{:{width}}'.format(p['description'][0:name_max_width], width=name_max_width)
204
        ret.append(self.curse_add_line(msg))
205
        msg = f'{status:>9}'
206
        ret.append(self.curse_add_line(msg, helper(p, header=p['indice'] + '_rtt')))
0 ignored issues
show
introduced by
The variable helper does not seem to be defined for all execution paths.
Loading history...
207
        ret.append(self.curse_new_line())
208
209
        return ret
210
211
    def msg_curse(self, args=None, max_width=None):
212
        """Return the dict to display in the curse interface."""
213
        # Init the return message
214
        # Only process if stats exist and display plugin enable...
215
        init = []
216
217
        if not self.stats or args.disable_ports:
218
            return init
219
220
        # Max size for the interface name
221
        if max_width:
222
            name_max_width = max_width - 7
223
        else:
224
            # No max_width defined, return an empty curse message
225
            logger.debug(f"No max_width defined for the {self.plugin_name} plugin, it will not be displayed.")
226
            return init
227
228
        # Build the string message
229
        build_str_with_this_max_width = partial(self.build_str, name_max_width)
230
        ret = reduce(build_str_with_this_max_width, self.stats, init)
231
232
        # Delete the last empty line
233
        try:
234
            ret.pop()
235
        except IndexError:
236
            pass
237
238
        return ret
239
240
241
class ThreadScanner(threading.Thread):
242
    """
243
    Specific thread for the port/web scanner.
244
245
    stats is a list of dict
246
    """
247
248
    def __init__(self, stats):
249
        """Init the class."""
250
        logger.debug(f"ports plugin - Create thread for scan list {stats}")
251
        super().__init__()
252
        # Event needed to stop properly the thread
253
        self._stopper = threading.Event()
254
        # The class return the stats as a list of dict
255
        self._stats = stats
256
        # Is part of Ports plugin
257
        self.plugin_name = "ports"
258
259
    def get_key(self):
260
        """Return the key of the list."""
261
        return 'indice'
262
263
    def run(self):
264
        """Grab the stats.
265
266
        Infinite loop, should be stopped by calling the stop() method.
267
        """
268
        for p in self._stats:
269
            # End of the thread has been asked
270
            if self.stopped():
271
                break
272
            # Scan a port (ICMP or TCP)
273
            if 'port' in p:
274
                self._port_scan(p)
275
                # Had to wait between two scans
276
                # If not, result are not ok
277
                time.sleep(1)
278
            # Scan an URL
279
            elif 'url' in p and requests_tag:
280
                self._web_scan(p)
281
            # Add the key for every element
282
            p['key'] = self.get_key()
283
284
    @property
285
    def stats(self):
286
        """Stats getter."""
287
        return self._stats
288
289
    @stats.setter
290
    def stats(self, value):
291
        """Stats setter."""
292
        self._stats = value
293
294
    def stop(self, timeout=None):
295
        """Stop the thread."""
296
        logger.debug(f"ports plugin - Close thread for scan list {self._stats}")
297
        self._stopper.set()
298
299
    def stopped(self):
300
        """Return True is the thread is stopped."""
301
        return self._stopper.is_set()
302
303
    def _web_scan(self, web):
304
        """Scan the  Web/URL (dict) and update the status key."""
305
        try:
306
            req = requests.head(
307
                web['url'],
308
                allow_redirects=True,
309
                verify=web['ssl_verify'],
310
                proxies=web['proxies'],
311
                timeout=web['timeout'],
312
            )
313
        except Exception as e:
314
            logger.debug(e)
315
            web['status'] = 'Error'
316
            web['elapsed'] = 0
317
        else:
318
            web['status'] = req.status_code
319
            web['elapsed'] = req.elapsed.total_seconds()
320
        return web
321
322
    def _port_scan(self, port):
323
        """Scan the port structure (dict) and update the status key."""
324
        if int(port['port']) == 0:
325
            return self._port_scan_icmp(port)
326
        return self._port_scan_tcp(port)
327
328
    def _resolv_name(self, hostname):
329
        """Convert hostname to IP address."""
330
        ip = hostname
331
        try:
332
            ip = socket.gethostbyname(hostname)
333
        except Exception as e:
334
            logger.debug(f"{self.plugin_name}: Cannot convert {hostname} to IP address ({e})")
335
        return ip
336
337
    def _port_scan_icmp(self, port):
338
        """Scan the (ICMP) port structure (dict) and update the status key."""
339
        ret = None
340
341
        # Create the ping command
342
        # Use the system ping command because it already have the sticky bit set
343
        # Python can not create ICMP packet with non root right
344
        if WINDOWS:
345
            timeout_opt = '-w'
346
            count_opt = '-n'
347
        elif MACOS or BSD:
348
            timeout_opt = '-t'
349
            count_opt = '-c'
350
        else:
351
            # Linux and co...
352
            timeout_opt = '-W'
353
            count_opt = '-c'
354
        # Build the command line
355
        # Note: Only string are allowed
356
        cmd = [
357
            'ping',
358
            count_opt,
359
            '1',
360
            timeout_opt,
361
            str(self._resolv_name(port['timeout'])),
362
            self._resolv_name(port['host']),
363
        ]
364
        fnull = open(os.devnull, 'w')
365
366
        try:
367
            counter = Counter()
368
            ret = subprocess.check_call(cmd, stdout=fnull, stderr=fnull, close_fds=True)
369
            if ret == 0:
370
                port['status'] = counter.get()
371
            else:
372
                port['status'] = False
373
        except subprocess.CalledProcessError:
374
            # Correct issue #1084: No Offline status for timed-out ports
375
            port['status'] = False
376
        except Exception as e:
377
            logger.debug("{}: Error while pinging host {} ({})".format(self.plugin_name, port['host'], e))
378
379
        fnull.close()
380
381
        return ret
382
383
    def _port_scan_tcp(self, port):
384
        """Scan the (TCP) port structure (dict) and update the status key."""
385
        ret = None
386
387
        # Create and configure the scanning socket
388
        try:
389
            socket.setdefaulttimeout(port['timeout'])
390
            _socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
391
        except Exception as e:
392
            logger.debug(f"{self.plugin_name}: Error while creating scanning socket ({e})")
393
394
        # Scan port
395
        ip = self._resolv_name(port['host'])
396
        counter = Counter()
397
        try:
398
            ret = _socket.connect_ex((ip, int(port['port'])))
0 ignored issues
show
introduced by
The variable _socket does not seem to be defined for all execution paths.
Loading history...
399
        except Exception as e:
400
            logger.debug(f"{self.plugin_name}: Error while scanning port {port} ({e})")
401
        else:
402
            if ret == 0:
403
                port['status'] = counter.get()
404
            else:
405
                port['status'] = False
406
        finally:
407
            _socket.close()
408
409
        return ret
410