Test Failed
Push — develop ( fa57d5...734632 )
by Nicolas
03:11 queued 40s
created

glances.programs.compute_nprocs()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
from collections import Counter
10
11
# from glances.logger import logger
12
13
# This constant defines the list of available processes sort key
14
sort_programs_key_list = ['cpu_percent', 'memory_percent', 'cpu_times', 'io_counters', 'name']
15
16
17
def create_program_dict(p):
18
    """Create a new entry in the dict (new program)"""
19
    return {
20
        'time_since_update': p['time_since_update'],
21
        # some values can be None, e.g. macOS system processes
22
        'num_threads': p['num_threads'] or 0,
23
        'cpu_percent': p['cpu_percent'] or 0,
24
        'memory_percent': p['memory_percent'] or 0,
25
        'cpu_times': p['cpu_times'] or {},
26
        'memory_info': p['memory_info'] or {},
27
        'io_counters': p['io_counters'] or {},
28
        'childrens': [p['pid']],
29
        # Others keys are not used
30
        # but should be set to be compliant with the existing process_list
31
        'name': p['name'],
32
        'cmdline': [p['name']],
33
        'pid': '_',
34
        'username': p.get('username', '_'),
35
        'nice': p['nice'],
36
        'status': p['status'],
37
    }
38
39
40
def update_program_dict(program, p):
41
    """Update an existing entry in the dict (existing program)"""
42
    # some values can be None, e.g. macOS system processes
43
    program['num_threads'] += p['num_threads'] or 0
44
    program['cpu_percent'] += p['cpu_percent'] or 0
45
    program['memory_percent'] += p['memory_percent'] or 0
46
    program['cpu_times'] = dict(Counter(program['cpu_times'] or {}) + Counter(p['cpu_times'] or {}))
47
    program['memory_info'] = dict(Counter(program['memory_info'] or {}) + Counter(p['memory_info'] or {}))
48
49
    program['io_counters'] += p['io_counters']
50
    program['childrens'].append(p['pid'])
51
    # If all the subprocess has the same value, display it
52
    program['username'] = p.get('username', '_') if p.get('username') == program['username'] else '_'
53
    program['nice'] = p['nice'] if p['nice'] == program['nice'] else '_'
54
    program['status'] = p['status'] if p['status'] == program['status'] else '_'
55
56
57
def compute_nprocs(p):
58
    p['nprocs'] = len(p['childrens'])
59
    return p
60
61
62
def processes_to_programs(processes):
63
    """Convert a list of processes to a list of programs."""
64
    # Start to build a dict of programs (key is program name)
65
    programs_dict = {}
66
    key = 'name'
67
    for p in processes:
68
        if p[key] not in programs_dict:
69
            programs_dict[p[key]] = create_program_dict(p)
70
        else:
71
            update_program_dict(programs_dict[p[key]], p)
72
73
    # Convert the dict to a list of programs
74
    return [compute_nprocs(p) for p in programs_dict.values()]
75