Test Failed
Push — develop ( 30cc9f...48251c )
by Nicolas
02:03
created

glances.plugins.ports   F

Complexity

Total Complexity 63

Size/Duplication

Total Lines 353
Duplicated Lines 12.46 %

Importance

Changes 0
Metric Value
eloc 212
dl 44
loc 353
rs 3.36
c 0
b 0
f 0
wmc 63

17 Methods

Rating   Name   Duplication   Size   Complexity  
A PluginModel.get_web_alert() 20 20 5
A ThreadScanner.stats() 0 4 1
A PluginModel.update() 0 21 4
B ThreadScanner.run() 0 18 6
A PluginModel.exit() 0 6 2
A ThreadScanner._port_scan() 0 6 2
A ThreadScanner.stop() 0 4 1
F PluginModel.msg_curse() 0 52 14
A PluginModel.get_key() 0 3 1
A ThreadScanner._resolv_name() 0 8 2
A ThreadScanner.__init__() 0 10 1
A ThreadScanner.stopped() 0 3 1
A ThreadScanner._web_scan() 0 18 3
B ThreadScanner._port_scan_icmp() 0 45 7
A ThreadScanner._port_scan_tcp() 0 27 5
B PluginModel.get_ports_alert() 24 24 6
A PluginModel.__init__() 0 17 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like glances.plugins.ports often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Ports scanner plugin."""
11
12
import os
13
import subprocess
14
import threading
15
import socket
16
import time
17
import numbers
18
19
from glances.globals import WINDOWS, MACOS, BSD, bool_type
20
from glances.ports_list import GlancesPortsList
21
from glances.web_list import GlancesWebList
22
from glances.timer import Counter
23
from glances.logger import logger
24
from glances.plugins.plugin.model import GlancesPluginModel
25
26
try:
27
    import requests
28
29
    requests_tag = True
30
except ImportError as e:
31
    requests_tag = False
32
    logger.warning("Missing Python Lib ({}), Ports plugin is limited to port scanning".format(e))
33
34
35
class PluginModel(GlancesPluginModel):
36
    """Glances ports scanner plugin."""
37
38
    def __init__(self, args=None, config=None):
39
        """Init the plugin."""
40
        super(PluginModel, self).__init__(args=args, config=config, stats_init_value=[])
41
        self.args = args
42
        self.config = config
43
44
        # We want to display the stat in the curse interface
45
        self.display_curse = True
46
47
        # Init stats
48
        self.stats = (
49
            GlancesPortsList(config=config, args=args).get_ports_list()
50
            + GlancesWebList(config=config, args=args).get_web_list()
51
        )
52
53
        # Global Thread running all the scans
54
        self._thread = None
55
56
    def exit(self):
57
        """Overwrite the exit method to close threads."""
58
        if self._thread is not None:
59
            self._thread.stop()
60
        # Call the father class
61
        super(PluginModel, self).exit()
62
63
    @GlancesPluginModel._check_decorator
64
    @GlancesPluginModel._log_result_decorator
65
    def update(self):
66
        """Update the ports list."""
67
        if self.input_method == 'local':
68
            # Only refresh:
69
            # * if there is not other scanning thread
70
            # * every refresh seconds (define in the configuration file)
71
            if self._thread is None:
72
                thread_is_running = False
73
            else:
74
                thread_is_running = self._thread.is_alive()
75
            if not thread_is_running:
76
                # Run ports scanner
77
                self._thread = ThreadScanner(self.stats)
78
                self._thread.start()
79
        else:
80
            # Not available in SNMP mode
81
            pass
82
83
        return self.stats
84
85
    def get_key(self):
86
        """Return the key of the list."""
87
        return 'indice'
88
89 View Code Duplication
    def get_ports_alert(self, port, header="", log=False):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
90
        """Return the alert status relative to the port scan return value."""
91
        ret = 'OK'
92
        if port['status'] is None:
93
            ret = 'CAREFUL'
94
        elif port['status'] == 0:
95
            ret = 'CRITICAL'
96
        elif (
97
            isinstance(port['status'], (float, int))
98
            and port['rtt_warning'] is not None
99
            and port['status'] > port['rtt_warning']
100
        ):
101
            ret = 'WARNING'
102
103
        # Get stat name
104
        stat_name = self.get_stat_name(header=header)
105
106
        # Manage threshold
107
        self.manage_threshold(stat_name, ret)
108
109
        # Manage action
110
        self.manage_action(stat_name, ret.lower(), header, port[self.get_key()])
111
112
        return ret
113
114 View Code Duplication
    def get_web_alert(self, web, header="", log=False):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
115
        """Return the alert status relative to the web/url scan return value."""
116
        ret = 'OK'
117
        if web['status'] is None:
118
            ret = 'CAREFUL'
119
        elif web['status'] not in [200, 301, 302]:
120
            ret = 'CRITICAL'
121
        elif web['rtt_warning'] is not None and web['elapsed'] > web['rtt_warning']:
122
            ret = 'WARNING'
123
124
        # Get stat name
125
        stat_name = self.get_stat_name(header=header)
126
127
        # Manage threshold
128
        self.manage_threshold(stat_name, ret)
129
130
        # Manage action
131
        self.manage_action(stat_name, ret.lower(), header, web[self.get_key()])
132
133
        return ret
134
135
    def msg_curse(self, args=None, max_width=None):
136
        """Return the dict to display in the curse interface."""
137
        # Init the return message
138
        # Only process if stats exist and display plugin enable...
139
        ret = []
140
141
        if not self.stats or args.disable_ports:
142
            return ret
143
144
        # Max size for the interface name
145
        name_max_width = max_width - 7
146
147
        # Build the string message
148
        for p in self.stats:
149
            if 'host' in p:
150
                if p['host'] is None:
151
                    status = 'None'
152
                elif p['status'] is None:
153
                    status = 'Scanning'
154
                elif isinstance(p['status'], bool_type) and p['status'] is True:
155
                    status = 'Open'
156
                elif p['status'] == 0:
157
                    status = 'Timeout'
158
                else:
159
                    # Convert second to ms
160
                    status = '{0:.0f}ms'.format(p['status'] * 1000.0)
161
162
                msg = '{:{width}}'.format(p['description'][0:name_max_width], width=name_max_width)
163
                ret.append(self.curse_add_line(msg))
164
                msg = '{:>9}'.format(status)
165
                ret.append(self.curse_add_line(msg, self.get_ports_alert(p, header=p['indice'] + '_rtt')))
166
                ret.append(self.curse_new_line())
167
            elif 'url' in p:
168
                msg = '{:{width}}'.format(p['description'][0:name_max_width], width=name_max_width)
169
                ret.append(self.curse_add_line(msg))
170
                if isinstance(p['status'], numbers.Number):
171
                    status = 'Code {}'.format(p['status'])
172
                elif p['status'] is None:
173
                    status = 'Scanning'
174
                else:
175
                    status = p['status']
176
                msg = '{:>9}'.format(status)
177
                ret.append(self.curse_add_line(msg, self.get_web_alert(p, header=p['indice'] + '_rtt')))
178
                ret.append(self.curse_new_line())
179
180
        # Delete the last empty line
181
        try:
182
            ret.pop()
183
        except IndexError:
184
            pass
185
186
        return ret
187
188
189
class ThreadScanner(threading.Thread):
190
    """
191
    Specific thread for the port/web scanner.
192
193
    stats is a list of dict
194
    """
195
196
    def __init__(self, stats):
197
        """Init the class."""
198
        logger.debug("ports plugin - Create thread for scan list {}".format(stats))
199
        super(ThreadScanner, self).__init__()
200
        # Event needed to stop properly the thread
201
        self._stopper = threading.Event()
202
        # The class return the stats as a list of dict
203
        self._stats = stats
204
        # Is part of Ports plugin
205
        self.plugin_name = "ports"
206
207
    def run(self):
208
        """Grab the stats.
209
210
        Infinite loop, should be stopped by calling the stop() method.
211
        """
212
        for p in self._stats:
213
            # End of the thread has been asked
214
            if self.stopped():
215
                break
216
            # Scan a port (ICMP or TCP)
217
            if 'port' in p:
218
                self._port_scan(p)
219
                # Had to wait between two scans
220
                # If not, result are not ok
221
                time.sleep(1)
222
            # Scan an URL
223
            elif 'url' in p and requests_tag:
224
                self._web_scan(p)
225
226
    @property
227
    def stats(self):
228
        """Stats getter."""
229
        return self._stats
230
231
    @stats.setter
232
    def stats(self, value):
233
        """Stats setter."""
234
        self._stats = value
235
236
    def stop(self, timeout=None):
237
        """Stop the thread."""
238
        logger.debug("ports plugin - Close thread for scan list {}".format(self._stats))
239
        self._stopper.set()
240
241
    def stopped(self):
242
        """Return True is the thread is stopped."""
243
        return self._stopper.is_set()
244
245
    def _web_scan(self, web):
246
        """Scan the  Web/URL (dict) and update the status key."""
247
        try:
248
            req = requests.head(
249
                web['url'],
250
                allow_redirects=True,
251
                verify=web['ssl_verify'],
252
                proxies=web['proxies'],
253
                timeout=web['timeout'],
254
            )
255
        except Exception as e:
256
            logger.debug(e)
257
            web['status'] = 'Error'
258
            web['elapsed'] = 0
259
        else:
260
            web['status'] = req.status_code
261
            web['elapsed'] = req.elapsed.total_seconds()
262
        return web
263
264
    def _port_scan(self, port):
265
        """Scan the port structure (dict) and update the status key."""
266
        if int(port['port']) == 0:
267
            return self._port_scan_icmp(port)
268
        else:
269
            return self._port_scan_tcp(port)
270
271
    def _resolv_name(self, hostname):
272
        """Convert hostname to IP address."""
273
        ip = hostname
274
        try:
275
            ip = socket.gethostbyname(hostname)
276
        except Exception as e:
277
            logger.debug("{}: Cannot convert {} to IP address ({})".format(self.plugin_name, hostname, e))
278
        return ip
279
280
    def _port_scan_icmp(self, port):
281
        """Scan the (ICMP) port structure (dict) and update the status key."""
282
        ret = None
283
284
        # Create the ping command
285
        # Use the system ping command because it already have the sticky bit set
286
        # Python can not create ICMP packet with non root right
287
        if WINDOWS:
288
            timeout_opt = '-w'
289
            count_opt = '-n'
290
        elif MACOS or BSD:
291
            timeout_opt = '-t'
292
            count_opt = '-c'
293
        else:
294
            # Linux and co...
295
            timeout_opt = '-W'
296
            count_opt = '-c'
297
        # Build the command line
298
        # Note: Only string are allowed
299
        cmd = [
300
            'ping',
301
            count_opt,
302
            '1',
303
            timeout_opt,
304
            str(self._resolv_name(port['timeout'])),
305
            self._resolv_name(port['host']),
306
        ]
307
        fnull = open(os.devnull, 'w')
308
309
        try:
310
            counter = Counter()
311
            ret = subprocess.check_call(cmd, stdout=fnull, stderr=fnull, close_fds=True)
312
            if ret == 0:
313
                port['status'] = counter.get()
314
            else:
315
                port['status'] = False
316
        except subprocess.CalledProcessError:
317
            # Correct issue #1084: No Offline status for timed-out ports
318
            port['status'] = False
319
        except Exception as e:
320
            logger.debug("{}: Error while pinging host {} ({})".format(self.plugin_name, port['host'], e))
321
322
        fnull.close()
323
324
        return ret
325
326
    def _port_scan_tcp(self, port):
327
        """Scan the (TCP) port structure (dict) and update the status key."""
328
        ret = None
329
330
        # Create and configure the scanning socket
331
        try:
332
            socket.setdefaulttimeout(port['timeout'])
333
            _socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
334
        except Exception as e:
335
            logger.debug("{}: Error while creating scanning socket ({})".format(self.plugin_name, e))
336
337
        # Scan port
338
        ip = self._resolv_name(port['host'])
339
        counter = Counter()
340
        try:
341
            ret = _socket.connect_ex((ip, int(port['port'])))
0 ignored issues
show
introduced by
The variable _socket does not seem to be defined for all execution paths.
Loading history...
342
        except Exception as e:
343
            logger.debug("{}: Error while scanning port {} ({})".format(self.plugin_name, port, e))
344
        else:
345
            if ret == 0:
346
                port['status'] = counter.get()
347
            else:
348
                port['status'] = False
349
        finally:
350
            _socket.close()
351
352
        return ret
353