GlancesCursesBrowser.__catch_key()   F
last analyzed

Complexity

Conditions 15

Size

Total Lines 53
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 15
eloc 40
nop 2
dl 0
loc 53
rs 2.9998
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.outputs.glances_curses_browser.GlancesCursesBrowser.__catch_key() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Curses browser interface class ."""
11
12
import math
13
import curses
14
from glances.outputs.glances_curses import _GlancesCurses
15
16
from glances.logger import logger
17
from glances.timer import Timer
18
19
20
class GlancesCursesBrowser(_GlancesCurses):
21
    """Class for the Glances curse client browser."""
22
23
    def __init__(self, args=None):
24
        """Init the father class."""
25
        super(GlancesCursesBrowser, self).__init__(args=args)
26
27
        _colors_list = {
28
            'UNKNOWN': self.no_color,
29
            'SNMP': self.default_color2,
30
            'ONLINE': self.default_color2,
31
            'OFFLINE': self.ifCRITICAL_color2,
32
            'PROTECTED': self.ifWARNING_color2,
33
        }
34
        self.colors_list.update(_colors_list)
35
36
        # First time scan tag
37
        # Used to display a specific message when the browser is started
38
        self.first_scan = True
39
40
        # Init refresh time
41
        self.__refresh_time = args.time
42
43
        # Init the cursor position for the client browser
44
        self.cursor_position = 0
45
46
        # Active Glances server number
47
        self._active_server = None
48
49
        self._current_page = 0
50
        self._page_max = 0
51
        self._page_max_lines = 0
52
53
        self.is_end = False
54
        self._revesed_sorting = False
55
        self._stats_list = None
56
57
    @property
58
    def active_server(self):
59
        """Return the active server or None if it's the browser list."""
60
        return self._active_server
61
62
    @active_server.setter
63
    def active_server(self, index):
64
        """Set the active server or None if no server selected."""
65
        self._active_server = index
66
67
    @property
68
    def cursor(self):
69
        """Get the cursor position."""
70
        return self.cursor_position
71
72
    @cursor.setter
73
    def cursor(self, position):
74
        """Set the cursor position."""
75
        self.cursor_position = position
76
77
    def get_pagelines(self, stats):
78
        if self._current_page == self._page_max - 1:
79
            page_lines = len(stats) % self._page_max_lines
80
        else:
81
            page_lines = self._page_max_lines
82
        return page_lines
83
84
    def _get_status_count(self, stats):
85
        counts = {}
86
        for item in stats:
87
            color = item['status']
88
            counts[color] = counts.get(color, 0) + 1
89
90
        result = ''
91
        for key in counts.keys():
92
            result += key + ': ' + str(counts[key]) + ' '
93
94
        return result
95
96
    def _get_stats(self, stats):
97
        stats_list = None
98
        if self._stats_list is not None:
99
            stats_list = self._stats_list
100
            stats_list.sort(
101
                reverse=self._revesed_sorting,
102
                key=lambda x: {'UNKNOWN': 0, 'OFFLINE': 1, 'PROTECTED': 2, 'SNMP': 3, 'ONLINE': 4}.get(x['status'], 99),
103
            )
104
        else:
105
            stats_list = stats
106
107
        return stats_list
108
109
    def cursor_up(self, stats):
110
        """Set the cursor to position N-1 in the list."""
111
        if 0 <= self.cursor_position - 1:
112
            self.cursor_position -= 1
113
        else:
114
            if self._current_page - 1 < 0:
115
                self._current_page = self._page_max - 1
116
                self.cursor_position = (len(stats) - 1) % self._page_max_lines
117
            else:
118
                self._current_page -= 1
119
                self.cursor_position = self._page_max_lines - 1
120
121
    def cursor_down(self, stats):
122
        """Set the cursor to position N-1 in the list."""
123
124
        if self.cursor_position + 1 < self.get_pagelines(stats):
125
            self.cursor_position += 1
126
        else:
127
            if self._current_page + 1 < self._page_max:
128
                self._current_page += 1
129
            else:
130
                self._current_page = 0
131
            self.cursor_position = 0
132
133
    def cursor_pageup(self, stats):
134
        """Set prev page."""
135
        if self._current_page - 1 < 0:
136
            self._current_page = self._page_max - 1
137
        else:
138
            self._current_page -= 1
139
        self.cursor_position = 0
140
141
    def cursor_pagedown(self, stats):
142
        """Set next page."""
143
        if self._current_page + 1 < self._page_max:
144
            self._current_page += 1
145
        else:
146
            self._current_page = 0
147
        self.cursor_position = 0
148
149
    def __catch_key(self, stats):
150
        # Catch the browser pressed key
151
        self.pressedkey = self.get_key(self.term_window)
152
        refresh = False
153
        if self.pressedkey != -1:
154
            logger.debug("Key pressed. Code=%s" % self.pressedkey)
155
156
        # Actions...
157
        if self.pressedkey == ord('\x1b') or self.pressedkey == ord('q'):
158
            # 'ESC'|'q' > Quit
159
            self.end()
160
            logger.info("Stop Glances client browser")
161
            # sys.exit(0)
162
            self.is_end = True
163
        elif self.pressedkey == 10:
164
            # 'ENTER' > Run Glances on the selected server
165
            self.active_server = self._current_page * self._page_max_lines + self.cursor_position
166
            logger.debug("Server {}/{} selected".format(self.active_server, len(stats)))
167
        elif self.pressedkey == curses.KEY_UP or self.pressedkey == 65:
168
            # 'UP' > Up in the server list
169
            self.cursor_up(stats)
170
            logger.debug("Server {}/{} selected".format(self.cursor + 1, len(stats)))
171
        elif self.pressedkey == curses.KEY_DOWN or self.pressedkey == 66:
172
            # 'DOWN' > Down in the server list
173
            self.cursor_down(stats)
174
            logger.debug("Server {}/{} selected".format(self.cursor + 1, len(stats)))
175
        elif self.pressedkey == curses.KEY_PPAGE:
176
            # 'Page UP' > Prev page in the server list
177
            self.cursor_pageup(stats)
178
            logger.debug("PageUP: Server ({}/{}) pages.".format(self._current_page + 1, self._page_max))
179
        elif self.pressedkey == curses.KEY_NPAGE:
180
            # 'Page Down' > Next page in the server list
181
            self.cursor_pagedown(stats)
182
            logger.debug("PageDown: Server {}/{} pages".format(self._current_page + 1, self._page_max))
183
        elif self.pressedkey == ord('1'):
184
            self._stats_list = None
185
            refresh = True
186
        elif self.pressedkey == ord('2'):
187
            self._revesed_sorting = False
188
            self._stats_list = stats.copy()
189
            refresh = True
190
        elif self.pressedkey == ord('3'):
191
            self._revesed_sorting = True
192
            self._stats_list = stats.copy()
193
            refresh = True
194
195
        if refresh:
196
            self._current_page = 0
197
            self.cursor_position = 0
198
            self.flush(stats)
199
200
        # Return the key code
201
        return self.pressedkey
202
203
    def update(self, stats, duration=3, cs_status=None, return_to_browser=False):
204
        """Update the servers' list screen.
205
206
        Wait for __refresh_time sec / catch key every 100 ms.
207
208
        :param stats: Dict of dict with servers stats
209
        :param cs_status:
210
        :param duration:
211
        :param return_to_browser:
212
        """
213
        # Flush display
214
        logger.debug('Servers list: {}'.format(stats))
215
        self.flush(stats)
216
217
        # Wait
218
        exitkey = False
219
        countdown = Timer(self.__refresh_time)
220
        while not countdown.finished() and not exitkey:
221
            # Getkey
222
            pressedkey = self.__catch_key(stats)
223
            # Is it an exit or select server key ?
224
            exitkey = pressedkey == ord('\x1b') or pressedkey == ord('q') or pressedkey == 10
225
            if not exitkey and pressedkey > -1:
226
                # Redraw display
227
                self.flush(stats)
228
            # Wait 100ms...
229
            self.wait()
230
231
        return self.active_server
232
233
    def flush(self, stats):
234
        """Update the servers' list screen.
235
236
        :param stats: List of dict with servers stats
237
        """
238
        self.erase()
239
        self.display(stats)
240
241
    def display(self, stats, cs_status=None):
242
        """Display the servers list.
243
244
        :return: True if the stats have been displayed else False (no server available)
245
        """
246
        # Init the internal line/column for Glances Curses
247
        self.init_line_column()
248
249
        # Get the current screen size
250
        screen_x = self.screen.getmaxyx()[1]
251
        screen_y = self.screen.getmaxyx()[0]
252
        stats_max = screen_y - 3
253
        stats_len = len(stats)
254
255
        self._page_max_lines = stats_max
256
        self._page_max = int(math.ceil(stats_len / stats_max))
257
        # Init position
258
        x = 0
259
        y = 0
260
261
        # Display top header
262
        if stats_len == 0:
263
            if self.first_scan and not self.args.disable_autodiscover:
264
                msg = 'Glances is scanning your network. Please wait...'
265
                self.first_scan = False
266
            else:
267
                msg = 'No Glances server available'
268
        elif len(stats) == 1:
269
            msg = 'One Glances server available'
270
        else:
271
            msg = '{} Glances servers available'.format(stats_len)
272
        if self.args.disable_autodiscover:
273
            msg += ' (auto discover is disabled)'
274
        if screen_y > 1:
275
            self.term_window.addnstr(y, x, msg, screen_x - x, self.colors_list['TITLE'])
276
277
            msg = '{}'.format(self._get_status_count(stats))
278
            self.term_window.addnstr(y + 1, x, msg, screen_x - x)
279
280
        if stats_len > stats_max and screen_y > 2:
281
            msg = '{} servers displayed.({}/{}) {}'.format(
282
                self.get_pagelines(stats), self._current_page + 1, self._page_max, self._get_status_count(stats)
283
            )
284
            self.term_window.addnstr(y + 1, x, msg, screen_x - x)
285
286
        if stats_len == 0:
287
            return False
288
289
        # Display the Glances server list
290
        # ================================
291
292
        # Table of table
293
        # Item description: [stats_id, column name, column size]
294
        column_def = [
295
            ['name', 'Name', 16],
296
            ['alias', None, None],
297
            ['load_min5', 'LOAD', 6],
298
            ['cpu_percent', 'CPU%', 5],
299
            ['mem_percent', 'MEM%', 5],
300
            ['status', 'STATUS', 9],
301
            ['ip', 'IP', 15],
302
            # ['port', 'PORT', 5],
303
            ['hr_name', 'OS', 16],
304
        ]
305
        y = 2
306
307
        # Display table header
308
        xc = x + 2
309
        for cpt, c in enumerate(column_def):
310
            if xc < screen_x and y < screen_y and c[1] is not None:
311
                self.term_window.addnstr(y, xc, c[1], screen_x - x, self.colors_list['BOLD'])
312
                xc += c[2] + self.space_between_column
313
        y += 1
314
315
        # If a servers has been deleted from the list...
316
        # ... and if the cursor is in the latest position
317
        if self.cursor > len(stats) - 1:
318
            # Set the cursor position to the latest item
319
            self.cursor = len(stats) - 1
320
321
        stats_list = self._get_stats(stats)
322
        start_line = self._page_max_lines * self._current_page
323
        end_line = start_line + self.get_pagelines(stats_list)
324
        current_page = stats_list[start_line:end_line]
325
326
        # Display table
327
        line = 0
328
        for v in current_page:
329
            # Limit the number of displayed server (see issue #1256)
330
            if line >= stats_max:
331
                continue
332
            # Get server stats
333
            server_stat = {}
334
            for c in column_def:
335
                try:
336
                    server_stat[c[0]] = v[c[0]]
337
                except KeyError as e:
338
                    logger.debug("Cannot grab stats {} from server (KeyError: {})".format(c[0], e))
339
                    server_stat[c[0]] = '?'
340
                # Display alias instead of name
341
                try:
342
                    if c[0] == 'alias' and v[c[0]] is not None:
343
                        server_stat['name'] = v[c[0]]
344
                except KeyError:
345
                    pass
346
347
            # Display line for server stats
348
            cpt = 0
349
            xc = x
350
351
            # Is the line selected ?
352
            if line == self.cursor:
353
                # Display cursor
354
                self.term_window.addnstr(y, xc, ">", screen_x - xc, self.colors_list['BOLD'])
355
356
            # Display the line
357
            xc += 2
358
            for c in column_def:
359
                if xc < screen_x and y < screen_y and c[1] is not None:
360
                    # Display server stats
361
                    self.term_window.addnstr(y, xc, format(server_stat[c[0]]), c[2], self.colors_list[v['status']])
362
                    xc += c[2] + self.space_between_column
363
                cpt += 1
364
            # Next line, next server...
365
            y += 1
366
            line += 1
367
368
        return True
369