PluginModel.get_conds_if_port()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Ports scanner plugin."""
10
11
import numbers
12
import os
13
import socket
14
import subprocess
15
import threading
16
import time
17
from functools import partial, reduce
18
19
from glances.globals import BSD, MACOS, WINDOWS, bool_type
20
from glances.logger import logger
21
from glances.plugins.plugin.model import GlancesPluginModel
22
from glances.ports_list import GlancesPortsList
23
from glances.timer import Counter
24
from glances.web_list import GlancesWebList
25
26
try:
27
    import requests
28
29
    requests_tag = True
30
except ImportError as e:
31
    requests_tag = False
32
    logger.warning(f"Missing Python Lib ({e}), Ports plugin is limited to port scanning")
33
34
# Fields description
35
# description: human readable description
36
# short_name: shortname to use un UI
37
# unit: unit type
38
# rate: is it a rate ? If yes, // by time_since_update when displayed,
39
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
40
fields_description = {
41
    'host': {
42
        'description': 'Measurement is be done on this host (or IP address)',
43
    },
44
    'port': {
45
        'description': 'Measurement is be done on this port (0 for ICMP)',
46
    },
47
    'description': {
48
        'description': 'Human readable description for the host/port',
49
    },
50
    'refresh': {
51
        'description': 'Refresh time (in seconds) for this host/port',
52
    },
53
    'timeout': {
54
        'description': 'Timeout (in seconds) for the measurement',
55
    },
56
    'status': {
57
        'description': 'Measurement result (in seconds)',
58
        'unit': 'second',
59
    },
60
    'rtt_warning': {
61
        'description': 'Warning threshold (in seconds) for the measurement',
62
        'unit': 'second',
63
    },
64
    'indice': {
65
        'description': 'Unique indice for the host/port',
66
    },
67
}
68
69
70
class PluginModel(GlancesPluginModel):
71
    """Glances ports scanner plugin."""
72
73
    def __init__(self, args=None, config=None):
74
        """Init the plugin."""
75
        super().__init__(args=args, config=config, stats_init_value=[], fields_description=fields_description)
76
        self.args = args
77
        self.config = config
78
79
        # We want to display the stat in the curse interface
80
        self.display_curse = True
81
82
        # Init stats
83
        self.stats = (
84
            GlancesPortsList(config=config, args=args).get_ports_list()
85
            + GlancesWebList(config=config, args=args).get_web_list()
86
        )
87
88
        # Global Thread running all the scans
89
        self._thread = None
90
91
    def exit(self):
92
        """Overwrite the exit method to close threads."""
93
        if self._thread is not None:
94
            self._thread.stop()
95
        # Call the father class
96
        super().exit()
97
98
    def get_key(self):
99
        """Return the key of the list."""
100
        return 'indice'
101
102
    @GlancesPluginModel._check_decorator
103
    @GlancesPluginModel._log_result_decorator
104
    def update(self):
105
        """Update the ports list."""
106
        if self.input_method == 'local':
107
            # Only refresh:
108
            # * if there is not other scanning thread
109
            # * every refresh seconds (define in the configuration file)
110
            if self._thread is None:
111
                thread_is_running = False
112
            else:
113
                thread_is_running = self._thread.is_alive()
114
            if not thread_is_running:
115
                # Run ports scanner
116
                self._thread = ThreadScanner(self.stats)
117
                self._thread.start()
118
        else:
119
            # Not available in SNMP mode
120
            self.stats = None
121
122
        return self.stats
123
124
    def get_conds_if_port(self, port):
125
        return {
126
            'CAREFUL': port['status'] is None,
127
            'CRITICAL': port['status'] == 0,
128
            'WARNING': isinstance(port['status'], (float, int))
129
            and port['rtt_warning'] is not None
130
            and port['status'] > port['rtt_warning'],
131
        }
132
133
    def get_ports_alert(self, port, header="", log=False):
134
        """Return the alert status relative to the port scan return value."""
135
136
        return self.get_p_alert(self.get_conds_if_port(port), port, header, log)
137
138
    def get_conds_if_url(self, web):
139
        return {
140
            'CAREFUL': web['status'] is None,
141
            'CRITICAL': web['status'] not in [200, 301, 302],
142
            'WARNING': web['rtt_warning'] is not None and web['elapsed'] > web['rtt_warning'],
143
        }
144
145
    def get_web_alert(self, web, header="", log=False):
146
        """Return the alert status relative to the web/url scan return value."""
147
148
        return self.get_p_alert(self.get_conds_if_url(web), web, header, log)
149
150
    def get_p_alert(self, conds, p, header="", log=False):
151
        ret = self.get_default_ret_value(conds)
152
153
        # Get stat name
154
        stat_name = self.get_stat_name(header=header)
155
156
        # Manage threshold
157
        self.manage_threshold(stat_name, ret)
158
159
        # Manage action
160
        self.manage_action(stat_name, ret.lower(), header, p[self.get_key()])
161
162
        return ret
163
164
    def get_default_ret_value(self, conds):
165
        ret_as_dict_val = {'ret': key for key, cond in conds.items() if cond}
166
167
        return ret_as_dict_val.get('ret', 'OK')
168
169
    def set_status_if_host(self, p):
170
        if p['host'] is None:
171
            status = 'None'
172
        elif p['status'] is None:
173
            status = 'Scanning'
174
        elif isinstance(p['status'], bool_type) and p['status'] is True:
175
            status = 'Open'
176
        elif p['status'] == 0:
177
            status = 'Timeout'
178
        else:
179
            # Convert second to ms
180
            status = '{:.0f}ms'.format(p['status'] * 1000.0)
181
182
        return status
183
184
    def set_status_if_url(self, p):
185
        if isinstance(p['status'], numbers.Number):
186
            status = 'Code {}'.format(p['status'])
187
        elif p['status'] is None:
188
            status = 'Scanning'
189
        else:
190
            status = p['status']
191
192
        return status
193
194
    def build_str(self, name_max_width, ret, p):
195
        helper, status = self.get_status_and_helper(p).get(True)
196
        msg = '{:{width}}'.format(p['description'][0:name_max_width], width=name_max_width)
197
        ret.append(self.curse_add_line(msg))
198
        msg = f'{status:>9}'
199
        ret.append(self.curse_add_line(msg, helper(p, header=p['indice'] + '_rtt')))
200
        ret.append(self.curse_new_line())
201
202
        return ret
203
204
    def get_status_and_helper(self, p):
205
        return {
206
            'host' in p: (self.get_ports_alert, self.set_status_if_host(p)) if 'host' in p else None,
207
            'url' in p: (self.get_web_alert, self.set_status_if_url(p)) if 'url' in p else None,
208
        }
209
210
    def msg_curse(self, args=None, max_width=None):
211
        """Return the dict to display in the curse interface."""
212
        # Init the return message
213
        # Only process if stats exist and display plugin enable...
214
        init = []
215
216
        if not self.stats or args.disable_ports:
217
            return init
218
219
        # Max size for the interface name
220
        if max_width:
221
            name_max_width = max_width - 7
222
        else:
223
            # No max_width defined, return an empty curse message
224
            logger.debug(f"No max_width defined for the {self.plugin_name} plugin, it will not be displayed.")
225
            return init
226
227
        # Build the string message
228
        build_str_with_this_max_width = partial(self.build_str, name_max_width)
229
        ret = reduce(build_str_with_this_max_width, self.stats, init)
230
231
        # Delete the last empty line
232
        try:
233
            ret.pop()
234
        except IndexError:
235
            pass
236
237
        return ret
238
239
240
class ThreadScanner(threading.Thread):
241
    """
242
    Specific thread for the port/web scanner.
243
244
    stats is a list of dict
245
    """
246
247
    def __init__(self, stats):
248
        """Init the class."""
249
        logger.debug(f"ports plugin - Create thread for scan list {stats}")
250
        super().__init__()
251
        # Event needed to stop properly the thread
252
        self._stopper = threading.Event()
253
        # The class return the stats as a list of dict
254
        self._stats = stats
255
        # Is part of Ports plugin
256
        self.plugin_name = "ports"
257
258
    def get_key(self):
259
        """Return the key of the list."""
260
        return 'indice'
261
262
    def run(self):
263
        """Grab the stats.
264
265
        Infinite loop, should be stopped by calling the stop() method.
266
        """
267
        for p in self._stats:
268
            # End of the thread has been asked
269
            if self.stopped():
270
                break
271
            # Scan a port (ICMP or TCP)
272
            if 'port' in p:
273
                self._port_scan(p)
274
                # Had to wait between two scans
275
                # If not, result are not ok
276
                time.sleep(1)
277
            # Scan an URL
278
            elif 'url' in p and requests_tag:
279
                self._web_scan(p)
280
            # Add the key for every element
281
            p['key'] = self.get_key()
282
283
    @property
284
    def stats(self):
285
        """Stats getter."""
286
        return self._stats
287
288
    @stats.setter
289
    def stats(self, value):
290
        """Stats setter."""
291
        self._stats = value
292
293
    def stop(self, timeout=None):
294
        """Stop the thread."""
295
        logger.debug(f"ports plugin - Close thread for scan list {self._stats}")
296
        self._stopper.set()
297
298
    def stopped(self):
299
        """Return True is the thread is stopped."""
300
        return self._stopper.is_set()
301
302
    def _web_scan(self, web):
303
        """Scan the  Web/URL (dict) and update the status key."""
304
        try:
305
            req = requests.head(
306
                web['url'],
307
                allow_redirects=True,
308
                verify=web['ssl_verify'],
309
                proxies=web['proxies'],
310
                timeout=web['timeout'],
311
            )
312
        except Exception as e:
313
            logger.debug(e)
314
            web['status'] = 'Error'
315
            web['elapsed'] = 0
316
        else:
317
            web['status'] = req.status_code
318
            web['elapsed'] = req.elapsed.total_seconds()
319
        return web
320
321
    def _port_scan(self, port):
322
        """Scan the port structure (dict) and update the status key."""
323
        if int(port['port']) == 0:
324
            return self._port_scan_icmp(port)
325
        return self._port_scan_tcp(port)
326
327
    def _resolv_name(self, hostname):
328
        """Convert hostname to IP address."""
329
        ip = hostname
330
        try:
331
            ip = socket.gethostbyname(hostname)
332
        except Exception as e:
333
            logger.debug(f"{self.plugin_name}: Cannot convert {hostname} to IP address ({e})")
334
        return ip
335
336
    def _port_scan_icmp(self, port):
337
        """Scan the (ICMP) port structure (dict) and update the status key."""
338
        ret = None
339
340
        # Create the ping command
341
        # Use the system ping command because it already have the sticky bit set
342
        # Python can not create ICMP packet with non root right
343
        if WINDOWS:
344
            timeout_opt = '-w'
345
            count_opt = '-n'
346
        elif MACOS or BSD:
347
            timeout_opt = '-t'
348
            count_opt = '-c'
349
        else:
350
            # Linux and co...
351
            timeout_opt = '-W'
352
            count_opt = '-c'
353
        # Build the command line
354
        # Note: Only string are allowed
355
        cmd = [
356
            'ping',
357
            count_opt,
358
            '1',
359
            timeout_opt,
360
            str(self._resolv_name(port['timeout'])),
361
            self._resolv_name(port['host']),
362
        ]
363
        fnull = open(os.devnull, 'w')
364
365
        try:
366
            counter = Counter()
367
            ret = subprocess.check_call(cmd, stdout=fnull, stderr=fnull, close_fds=True)
368
            if ret == 0:
369
                port['status'] = counter.get()
370
            else:
371
                port['status'] = False
372
        except subprocess.CalledProcessError:
373
            # Correct issue #1084: No Offline status for timed-out ports
374
            port['status'] = False
375
        except Exception as e:
376
            logger.debug("{}: Error while pinging host {} ({})".format(self.plugin_name, port['host'], e))
377
378
        fnull.close()
379
380
        return ret
381
382
    def _port_scan_tcp(self, port):
383
        """Scan the (TCP) port structure (dict) and update the status key."""
384
        ret = None
385
386
        # Create and configure the scanning socket
387
        try:
388
            socket.setdefaulttimeout(port['timeout'])
389
            _socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
390
        except Exception as e:
391
            logger.debug(f"{self.plugin_name}: Error while creating scanning socket ({e})")
392
393
        # Scan port
394
        ip = self._resolv_name(port['host'])
395
        counter = Counter()
396
        try:
397
            ret = _socket.connect_ex((ip, int(port['port'])))
0 ignored issues
show
introduced by
The variable _socket does not seem to be defined for all execution paths.
Loading history...
398
        except Exception as e:
399
            logger.debug(f"{self.plugin_name}: Error while scanning port {port} ({e})")
400
        else:
401
            if ret == 0:
402
                port['status'] = counter.get()
403
            else:
404
                port['status'] = False
405
        finally:
406
            _socket.close()
407
408
        return ret
409