Test Failed
Push — develop ( 1153d1...e38e2c )
by Nicolas
02:28
created

VmExtension.generate_stats()   C

Complexity

Conditions 10

Size

Total Lines 15
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 14
nop 3
dl 0
loc 15
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like glances.plugins.vms.engines.multipass.VmExtension.generate_stats() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Multipass Extension unit for Glances' Vms plugin."""
10
11
import os
12
from typing import Any, Dict, List, Tuple
13
14
import orjson
15
16
from glances.globals import nativestr
17
from glances.secure import secure_popen
18
19
# Check if multipass binary exist
20
# TODO: make this path configurable from the Glances configuration file
21
MULTIPASS_PATH = '/snap/bin/multipass'
22
MULTIPASS_VERSION_OPTIONS = 'version --format json'
23
MULTIPASS_INFO_OPTIONS = 'info --format json'
24
import_multipass_error_tag = not os.path.exists(MULTIPASS_PATH)
25
26
27
class VmExtension:
28
    """Glances' Vms Plugin's Vm Extension unit"""
29
30
    CONTAINER_ACTIVE_STATUS = ['running']
31
32
    def __init__(self):
33
        if import_multipass_error_tag:
34
            raise Exception(f"Multipass binary ({MULTIPASS_PATH}) is mandatory to get VM stats")
35
36
        self.ext_name = "Multipass (Vm)"
37
38
    def update_version(self):
39
        # > multipass version --format json
40
        # {
41
        #     "multipass": "1.13.1",
42
        #     "multipassd": "1.13.1"
43
        # }
44
        ret_cmd = secure_popen(f'{MULTIPASS_PATH} {MULTIPASS_VERSION_OPTIONS}')
45
        try:
46
            ret = orjson.loads(ret_cmd)
47
        except orjson.JSONDecodeError:
48
            return {}
49
        else:
50
            return ret.get('multipass', None)
51
52
    def update_info(self):
53
        # > multipass info  --format json
54
        # {
55
        #     "errors": [
56
        #     ],
57
        #     "info": {
58
        #         "adapted-budgerigar": {
59
        #             "cpu_count": "1",
60
        #             "disks": {
61
        #                 "sda1": {
62
        #                     "total": "5116440064",
63
        #                     "used": "2287162880"
64
        #                 }
65
        #             },
66
        #             "image_hash": "182dc760bfca26c45fb4e4668049ecd4d0ecdd6171b3bae81d0135e8f1e9d93e",
67
        #             "image_release": "24.04 LTS",
68
        #             "ipv4": [
69
        #                 "10.160.166.174"
70
        #             ],
71
        #             "load": [
72
        #                 0,
73
        #                 0.03,
74
        #                 0
75
        #             ],
76
        #             "memory": {
77
        #                 "total": 1002500096,
78
        #                 "used": 432058368
79
        #             },
80
        #             "mounts": {
81
        #             },
82
        #             "release": "Ubuntu 24.04 LTS",
83
        #             "snapshot_count": "0",
84
        #             "state": "Running"
85
        #         }
86
        #     }
87
        # }
88
        ret_cmd = secure_popen(f'{MULTIPASS_PATH} {MULTIPASS_INFO_OPTIONS}')
89
        try:
90
            ret = orjson.loads(ret_cmd)
91
        except orjson.JSONDecodeError:
92
            return {}
93
        else:
94
            return ret.get('info', {})
95
96
    def update(self, all_tag) -> Tuple[Dict, List[Dict]]:
97
        """Update Vm stats using the input method."""
98
        version_stats = self.update_version()
99
100
        # TODO: manage all_tag option
101
        info_stats = self.update_info()
102
103
        returned_stats = []
104
        for k, v in info_stats.items():
105
            returned_stats.append(self.generate_stats(k, v))
106
107
        return version_stats, returned_stats
108
109
    @property
110
    def key(self) -> str:
111
        """Return the key of the list."""
112
        return 'name'
113
114
    def generate_stats(self, vm_name, vm_stats) -> Dict[str, Any]:
115
        # Init the stats for the current vm
116
        return {
117
            'key': self.key,
118
            'name': nativestr(vm_name),
119
            'id': vm_stats.get('image_hash'),
120
            'status': vm_stats.get('state').lower() if vm_stats.get('state') else None,
121
            'release': vm_stats.get('release') if len(vm_stats.get('release')) > 0 else vm_stats.get('image_release'),
122
            'cpu_count': int(vm_stats.get('cpu_count', 1)) if len(vm_stats.get('cpu_count', 1)) > 0 else None,
123
            'memory_usage': vm_stats.get('memory').get('used') if vm_stats.get('memory') else None,
124
            'memory_total': vm_stats.get('memory').get('total') if vm_stats.get('memory') else None,
125
            'load_1min': vm_stats.get('load')[0] if vm_stats.get('load') else None,
126
            'load_5min': vm_stats.get('load')[1] if vm_stats.get('load') else None,
127
            'load_15min': vm_stats.get('load')[2] if vm_stats.get('load') else None,
128
            'ipv4': vm_stats.get('ipv4')[0] if len(vm_stats.get('ipv4')) > 0 else None,
129
            # TODO: disk
130
        }
131