1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of Glances. |
||
4 | # |
||
5 | # SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]> |
||
6 | # |
||
7 | # SPDX-License-Identifier: LGPL-3.0-only |
||
8 | # |
||
9 | |||
10 | # flake8: noqa |
||
11 | # pylint: skip-file |
||
12 | """Python 2/3 compatibility shims.""" |
||
13 | |||
14 | from __future__ import print_function, unicode_literals |
||
15 | |||
16 | import operator |
||
17 | import sys |
||
18 | import unicodedata |
||
19 | import types |
||
20 | import subprocess |
||
21 | import os |
||
22 | from datetime import datetime |
||
23 | import re |
||
24 | |||
25 | from glances.logger import logger |
||
26 | |||
27 | PY3 = sys.version_info[0] == 3 |
||
28 | |||
29 | if PY3: |
||
30 | import queue |
||
31 | from configparser import ConfigParser, NoOptionError, NoSectionError |
||
32 | from statistics import mean |
||
33 | from xmlrpc.client import Fault, ProtocolError, ServerProxy, Transport, Server |
||
34 | from xmlrpc.server import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer |
||
35 | from urllib.request import Request, urlopen, base64 |
||
36 | from urllib.error import HTTPError, URLError |
||
37 | from urllib.parse import urlparse |
||
38 | |||
39 | # Correct issue #1025 by monkey path the xmlrpc lib |
||
40 | from defusedxml.xmlrpc import monkey_patch |
||
41 | |||
42 | monkey_patch() |
||
43 | |||
44 | input = input |
||
45 | range = range |
||
46 | map = map |
||
47 | |||
48 | text_type = str |
||
49 | binary_type = bytes |
||
50 | bool_type = bool |
||
51 | long = int |
||
52 | |||
53 | PermissionError = OSError |
||
54 | FileNotFoundError = FileNotFoundError |
||
55 | |||
56 | viewkeys = operator.methodcaller('keys') |
||
57 | viewvalues = operator.methodcaller('values') |
||
58 | viewitems = operator.methodcaller('items') |
||
59 | |||
60 | def printandflush(string): |
||
61 | """Print and flush (used by stdout* outputs modules)""" |
||
62 | print(string, flush=True) |
||
63 | |||
64 | def to_ascii(s): |
||
65 | """Convert the bytes string to a ASCII string |
||
66 | |||
67 | Useful to remove accent (diacritics) |
||
68 | """ |
||
69 | if isinstance(s, binary_type): |
||
70 | return s.decode() |
||
71 | return s.encode('ascii', 'ignore').decode() |
||
72 | |||
73 | def to_hex(s): |
||
74 | """Convert the bytes string to a hex string""" |
||
75 | return s.hex() |
||
76 | |||
77 | def listitems(d): |
||
78 | return list(d.items()) |
||
79 | |||
80 | def listkeys(d): |
||
81 | return list(d.keys()) |
||
82 | |||
83 | def listvalues(d): |
||
84 | return list(d.values()) |
||
85 | |||
86 | def iteritems(d): |
||
87 | return iter(d.items()) |
||
88 | |||
89 | def iterkeys(d): |
||
90 | return iter(d.keys()) |
||
91 | |||
92 | def itervalues(d): |
||
93 | return iter(d.values()) |
||
94 | |||
95 | def u(s, errors='replace'): |
||
96 | if isinstance(s, text_type): |
||
97 | return s |
||
98 | return s.decode('utf-8', errors=errors) |
||
99 | |||
100 | def b(s, errors='replace'): |
||
101 | if isinstance(s, binary_type): |
||
102 | return s |
||
103 | return s.encode('utf-8', errors=errors) |
||
104 | |||
105 | def n(s): |
||
106 | '''Only in Python 2... |
||
107 | from future.utils import bytes_to_native_str as n |
||
108 | ''' |
||
109 | return s |
||
110 | |||
111 | def nativestr(s, errors='replace'): |
||
112 | if isinstance(s, text_type): |
||
113 | return s |
||
114 | elif isinstance(s, (int, float)): |
||
115 | return s.__str__() |
||
116 | else: |
||
117 | return s.decode('utf-8', errors=errors) |
||
118 | |||
119 | def system_exec(command): |
||
120 | """Execute a system command and return the result as a str""" |
||
121 | try: |
||
122 | res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8') |
||
123 | except Exception as e: |
||
124 | logger.debug('Can not evaluate command {} ({})'.format(command, e)) |
||
125 | res = '' |
||
126 | return res.rstrip() |
||
127 | |||
128 | else: |
||
129 | from future.utils import bytes_to_native_str as n |
||
130 | import Queue as queue |
||
131 | from itertools import imap as map |
||
132 | from ConfigParser import SafeConfigParser as ConfigParser, NoOptionError, NoSectionError |
||
133 | from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer |
||
134 | from xmlrpclib import Fault, ProtocolError, ServerProxy, Transport, Server |
||
135 | from urllib2 import Request, urlopen, HTTPError, URLError, base64 |
||
136 | from urlparse import urlparse |
||
137 | |||
138 | # Correct issue #1025 by monkey path the xmlrpc lib |
||
139 | from defusedxml.xmlrpc import monkey_patch |
||
140 | |||
141 | monkey_patch() |
||
142 | |||
143 | input = raw_input |
||
144 | range = xrange |
||
145 | ConfigParser.read_file = ConfigParser.readfp |
||
146 | |||
147 | text_type = unicode |
||
148 | binary_type = str |
||
149 | bool_type = types.BooleanType |
||
150 | long = long |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
151 | |||
152 | PermissionError = OSError |
||
153 | FileNotFoundError = IOError |
||
154 | |||
155 | viewkeys = operator.methodcaller('viewkeys') |
||
156 | viewvalues = operator.methodcaller('viewvalues') |
||
157 | viewitems = operator.methodcaller('viewitems') |
||
158 | |||
159 | def printandflush(string): |
||
160 | """Print and flush (used by stdout* outputs modules)""" |
||
161 | print(string) |
||
162 | sys.stdout.flush() |
||
163 | |||
164 | def mean(numbers): |
||
165 | return float(sum(numbers)) / max(len(numbers), 1) |
||
166 | |||
167 | def to_ascii(s): |
||
168 | """Convert the unicode 's' to a ASCII string |
||
169 | |||
170 | Useful to remove accent (diacritics) |
||
171 | """ |
||
172 | if isinstance(s, binary_type): |
||
173 | return s |
||
174 | return unicodedata.normalize('NFKD', s).encode('ascii', 'ignore') |
||
175 | |||
176 | def to_hex(s): |
||
177 | """Convert the string to a hex string in Python 2""" |
||
178 | return s.encode('hex') |
||
179 | |||
180 | def listitems(d): |
||
181 | return d.items() |
||
182 | |||
183 | def listkeys(d): |
||
184 | return d.keys() |
||
185 | |||
186 | def listvalues(d): |
||
187 | return d.values() |
||
188 | |||
189 | def iteritems(d): |
||
190 | return d.iteritems() |
||
191 | |||
192 | def iterkeys(d): |
||
193 | return d.iterkeys() |
||
194 | |||
195 | def itervalues(d): |
||
196 | return d.itervalues() |
||
197 | |||
198 | def u(s, errors='replace'): |
||
199 | if isinstance(s, text_type): |
||
200 | return s.encode('utf-8', errors=errors) |
||
201 | return s.decode('utf-8', errors=errors) |
||
202 | |||
203 | def b(s, errors='replace'): |
||
204 | if isinstance(s, binary_type): |
||
205 | return s |
||
206 | return s.encode('utf-8', errors=errors) |
||
207 | |||
208 | def nativestr(s, errors='replace'): |
||
209 | if isinstance(s, binary_type): |
||
210 | return s |
||
211 | elif isinstance(s, (int, float)): |
||
212 | return s.__str__() |
||
213 | else: |
||
214 | return s.encode('utf-8', errors=errors) |
||
215 | |||
216 | def system_exec(command): |
||
217 | """Execute a system command and return the result as a str""" |
||
218 | try: |
||
219 | res = subprocess.check_output(command.split(' ')) |
||
220 | except Exception as e: |
||
221 | logger.debug('Can not execute command {} ({})'.format(command, e)) |
||
222 | res = '' |
||
223 | return res.rstrip() |
||
224 | |||
225 | |||
226 | # Globals functions for both Python 2 and 3 |
||
227 | |||
228 | |||
229 | def subsample(data, sampling): |
||
230 | """Compute a simple mean subsampling. |
||
231 | |||
232 | Data should be a list of numerical itervalues |
||
233 | |||
234 | :return: a sub-sampled list of sampling length |
||
235 | """ |
||
236 | if len(data) <= sampling: |
||
237 | return data |
||
238 | sampling_length = int(round(len(data) / float(sampling))) |
||
239 | return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)] |
||
240 | |||
241 | |||
242 | def time_serie_subsample(data, sampling): |
||
243 | """Compute a simple mean subsampling. |
||
244 | |||
245 | Data should be a list of set (time, value) |
||
246 | |||
247 | :return: a sub-sampled list of sampling length |
||
248 | """ |
||
249 | if len(data) <= sampling: |
||
250 | return data |
||
251 | t = [t[0] for t in data] |
||
252 | v = [t[1] for t in data] |
||
253 | sampling_length = int(round(len(data) / float(sampling))) |
||
254 | t_sub_sampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)] |
||
255 | v_sub_sampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)] |
||
256 | return list(zip(t_sub_sampled, v_sub_sampled)) |
||
257 | |||
258 | |||
259 | def to_fahrenheit(celsius): |
||
260 | """Convert Celsius to Fahrenheit.""" |
||
261 | return celsius * 1.8 + 32 |
||
262 | |||
263 | |||
264 | def is_admin(): |
||
265 | """Return if current user is an admin or not |
||
266 | |||
267 | The inner function fails unless you have Windows XP SP2 or higher. |
||
268 | The failure causes a traceback to be printed and this function to return False. |
||
269 | |||
270 | https://stackoverflow.com/a/19719292 |
||
271 | |||
272 | :return: True if the current user is an 'Admin' whatever that means (root on Unix), otherwise False. |
||
273 | """ |
||
274 | |||
275 | if os.name == 'nt': |
||
276 | import ctypes |
||
277 | import traceback |
||
278 | |||
279 | # WARNING: requires Windows XP SP2 or higher! |
||
280 | try: |
||
281 | return ctypes.windll.shell32.IsUserAnAdmin() |
||
282 | except Exception as e: |
||
283 | traceback.print_exc() |
||
284 | return False |
||
285 | else: |
||
286 | # Check for root on Posix |
||
287 | return os.getuid() == 0 |
||
288 | |||
289 | |||
290 | def key_exist_value_not_none(k, d): |
||
291 | # Return True if: |
||
292 | # - key k exists |
||
293 | # - d[k] is not None |
||
294 | return k in d and d[k] is not None |
||
295 | |||
296 | |||
297 | def key_exist_value_not_none_not_v(k, d, value='', lengh=None): |
||
298 | # Return True if: |
||
299 | # - key k exists |
||
300 | # - d[k] is not None |
||
301 | # - d[k] != value |
||
302 | # - if lengh is not None and len(d[k]) >= lengh |
||
303 | return k in d and d[k] is not None and d[k] != value and (lengh is None or len(d[k]) >= lengh) |
||
304 | |||
305 | |||
306 | def disable(class_name, var): |
||
307 | """Set disable_<var> to True in the class class_name.""" |
||
308 | setattr(class_name, 'enable_' + var, False) |
||
309 | setattr(class_name, 'disable_' + var, True) |
||
310 | |||
311 | |||
312 | def enable(class_name, var): |
||
313 | """Set disable_<var> to False in the class class_name.""" |
||
314 | setattr(class_name, 'enable_' + var, True) |
||
315 | setattr(class_name, 'disable_' + var, False) |
||
316 | |||
317 | |||
318 | def pretty_date(time=False): |
||
319 | """ |
||
320 | Get a datetime object or a int() Epoch timestamp and return a |
||
321 | pretty string like 'an hour ago', 'Yesterday', '3 months ago', |
||
322 | 'just now', etc |
||
323 | Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python |
||
324 | """ |
||
325 | now = datetime.now() |
||
326 | if type(time) is int: |
||
327 | diff = now - datetime.fromtimestamp(time) |
||
328 | elif isinstance(time, datetime): |
||
329 | diff = now - time |
||
330 | elif not time: |
||
331 | diff = 0 |
||
332 | second_diff = diff.seconds |
||
0 ignored issues
–
show
|
|||
333 | day_diff = diff.days |
||
334 | |||
335 | if day_diff < 0: |
||
336 | return '' |
||
337 | |||
338 | if day_diff == 0: |
||
339 | if second_diff < 10: |
||
340 | return "just now" |
||
341 | if second_diff < 60: |
||
342 | return str(second_diff) + " secs" |
||
343 | if second_diff < 120: |
||
344 | return "a min" |
||
345 | if second_diff < 3600: |
||
346 | return str(second_diff // 60) + " mins" |
||
347 | if second_diff < 7200: |
||
348 | return "an hour" |
||
349 | if second_diff < 86400: |
||
350 | return str(second_diff // 3600) + " hours" |
||
351 | if day_diff == 1: |
||
352 | return "yesterday" |
||
353 | if day_diff < 7: |
||
354 | return str(day_diff) + " days" |
||
355 | if day_diff < 31: |
||
356 | return str(day_diff // 7) + " weeks" |
||
357 | if day_diff < 365: |
||
358 | return str(day_diff // 30) + " months" |
||
359 | return str(day_diff // 365) + " years" |
||
360 | |||
361 | |||
362 | def urlopen_auth(url, username, password): |
||
363 | """Open a url with basic auth""" |
||
364 | return urlopen( |
||
365 | Request( |
||
366 | url, |
||
367 | headers={'Authorization': 'Basic ' + base64.b64encode(('%s:%s' % (username, password)).encode()).decode()}, |
||
368 | ) |
||
369 | ) |
||
370 | |||
371 | |||
372 | def string_value_to_float(s): |
||
373 | """Convert a string with a value and an unit to a float. |
||
374 | Example: |
||
375 | '12.5 MB' -> 12500000.0 |
||
376 | '32.5 GB' -> 32500000000.0 |
||
377 | Args: |
||
378 | s (string): Input string with value and unit |
||
379 | Output: |
||
380 | float: The value in float |
||
381 | """ |
||
382 | convert_dict = { |
||
383 | None: 1, |
||
384 | 'B': 1, |
||
385 | 'KB': 1000, |
||
386 | 'MB': 1000000, |
||
387 | 'GB': 1000000000, |
||
388 | 'TB': 1000000000000, |
||
389 | 'PB': 1000000000000000, |
||
390 | } |
||
391 | unpack_string = [ |
||
392 | i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', '')) |
||
393 | ] |
||
394 | if len(unpack_string) == 2: |
||
395 | value, unit = unpack_string |
||
396 | elif len(unpack_string) == 1: |
||
397 | value = unpack_string[0] |
||
398 | unit = None |
||
399 | else: |
||
400 | return None |
||
401 | try: |
||
402 | value = float(unpack_string[0]) |
||
403 | except ValueError: |
||
404 | return None |
||
405 | return value * convert_dict[unit] |
||
406 |