Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2debug/st2debug/utils/system_info.py (1 issue)

Labels
Severity
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import re
17
18
from st2common.util.shell import run_command
19
from st2common.util.shell import quote_unix
20
21
__all__ = [
22
    'get_cpu_info',
23
    'get_memory_info',
24
    'get_package_list',
25
    'get_deb_package_list',
26
    'get_rpm_package_list'
27
]
28
29
CPU_INFO_PATH = '/proc/cpuinfo'
30
MEMORY_INFO_PATH = '/proc/meminfo'
31
32
33
def get_cpu_info():
34
    """
35
    Retrieve CPU information.
36
37
    :return: List which contain dictionary with information for each core / CPU.
38
    :rtype: ``list`` of ``dict``
39
    """
40
    try:
41
        with open(CPU_INFO_PATH) as fp:
42
            content = fp.read()
43
    except IOError:
44
        return {}
45
46
    lines = content.split('\n')
47
48
    result = []
49
    item = None
50
    lines_count = len(lines)
51
    for index, line in enumerate(lines):
52
        line = line.strip()
53
54
        if not line:
55
            if item and index != lines_count:
56
                result.append(item)
57
            continue
58
59
        split = line.split(':')
60
61
        if len(split) != 2:
62
            continue
63
64
        name = split[0].replace('\t', '').strip().replace(' ', '_')
65
        value = split[1].replace('\t', '').strip()
66
67
        if name == 'processor':
68
            # Info about new core / CPU
69
            item = {}
70
71
        item[name] = value
72
73
    return result
74
75
76
def get_memory_info():
77
    """
78
    Retrieve memory information.
79
80
    :rtype: ``dict``
81
    """
82
    try:
83
        with open(MEMORY_INFO_PATH) as fp:
84
            content = fp.read()
85
    except IOError:
86
        return {}
87
88
    lines = content.split('\n')
89
90
    result = {}
91
    for line in lines:
92
        line = line.strip()
93
94
        if not line:
95
            continue
96
97
        split = line.split(':')
98
        name = split[0].strip()
99
        value = split[1].replace('kB', '').strip()
100
101
        try:
102
            value = int(value)
103
        except Exception:
104
            continue
105
106
        result[name] = value
107
108
    return result
109
110
111
def get_package_list(name_startswith):
112
    """
113
    Retrieve system packages which name matches the provided startswith filter.
114
115
    Note: This function only supports Debian and RedHat based systems.
116
117
    :param name_startswith: Package name startswith filter string.
118
    :type name_startswith: ``str``
119
120
    :rtype: ``list`` of ``dict``
121
    """
122
    dpkg_exit_code, _, _ = run_command(cmd='dpkg', shell=True)
123
    rpm_exit_code, _, _ = run_command(cmd='rpm', shell=True)
124
125
    if dpkg_exit_code != 127:
126
        result = get_deb_package_list(name_startswith=name_startswith)
127
    elif rpm_exit_code != 127:
128
        result = get_rpm_package_list(name_startswith=name_startswith)
129
    else:
130
        raise Exception('Unsupported platform (dpkg or rpm binary not available)')
131
132
    return result
133
134
135
def get_deb_package_list(name_startswith):
136
    cmd = 'dpkg -l | grep %s' % (quote_unix(name_startswith))
137
    exit_code, stdout, _ = run_command(cmd=cmd, shell=True)
138
139
    lines = stdout.split('\n')
140
141
    packages = []
142
    for line in lines:
143
        line = line.strip()
144
145
        if not line:
146
            continue
147
148
        split = re.split('\s+', line)
0 ignored issues
show
A suspicious escape sequence \s was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
149
        name = split[1]
150
        version = split[2]
151
152
        if not name.startswith(name_startswith):
153
            continue
154
155
        item = {
156
            'name': name,
157
            'version': version
158
        }
159
        packages.append(item)
160
161
    return packages
162
163
164
def get_rpm_package_list(name_startswith):
165
    cmd = 'rpm -qa | grep %s' % (quote_unix(name_startswith))
166
    exit_code, stdout, _ = run_command(cmd=cmd, shell=True)
167
168
    lines = stdout.split('\n')
169
170
    packages = []
171
    for line in lines:
172
        line = line.strip()
173
174
        if not line:
175
            continue
176
177
        split = line.rsplit('.', 1)
178
        split = split[0].split('-', 1)
179
        name = split[0]
180
        version = split[1]
181
182
        if not name.startswith(name_startswith):
183
            continue
184
185
        item = {
186
            'name': name,
187
            'version': version
188
        }
189
        packages.append(item)
190
191
    return packages
192