Passed
Pull Request — master (#4216)
by Matěj
02:33 queued 10s
created

ssg.utils.parse_name()   A

Complexity

Conditions 2

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 1
dl 0
loc 17
ccs 9
cts 9
cp 1
crap 2
rs 9.95
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import multiprocessing
5 2
import errno
6 2
import os
7 2
import re
8 2
from collections import namedtuple
9
10 2
from .constants import MULTI_PLATFORM_LIST, MAKEFILE_ID_TO_PRODUCT_MAP
11
12
13 2
class SSGError(RuntimeError):
14 2
    pass
15
16
17 2
PRODUCT_NAME_PARSER = re.compile(r"([a-zA-Z\-]+)([0-9]+)")
18
19
20 2
def map_name(version):
21
    """Maps SSG Makefile internal product name to official product name"""
22
23 2
    if version.startswith("multi_platform_"):
24 2
        trimmed_version = version[len("multi_platform_"):]
25 2
        if trimmed_version not in MULTI_PLATFORM_LIST:
26 2
            raise RuntimeError(
27
                "%s is an invalid product version. If it's multi_platform the "
28
                "suffix has to be from (%s)."
29
                % (version, ", ".join(MULTI_PLATFORM_LIST))
30
            )
31 2
        return map_name(trimmed_version)
32
33
    # By sorting in reversed order, keys which are a longer version of other keys are
34
    # visited first (e.g., rhosp vs. rhel)
35 2
    for key in sorted(MAKEFILE_ID_TO_PRODUCT_MAP, reverse=True):
36 2
        if version.startswith(key):
37 2
            return MAKEFILE_ID_TO_PRODUCT_MAP[key]
38
39 2
    raise RuntimeError("Can't map version '%s' to any known product!"
40
                       % (version))
41
42
43 2
def parse_name(product):
44
    """
45
    Returns a namedtuple of (name, version) from parsing a given product;
46
    e.g., "rhel7" -> ("rhel", "7")
47
    """
48
49 2
    prod_tuple = namedtuple('product', ['name', 'version'])
50
51 2
    _product = product
52 2
    _product_version = None
53 2
    match = PRODUCT_NAME_PARSER.match(product)
54
55 2
    if match:
56 2
        _product = match.group(1)
57 2
        _product_version = match.group(2)
58
59 2
    return prod_tuple(_product, _product_version)
60
61
62 2
def is_applicable_for_product(platform, product):
63
    """Based on the platform dict specifier of the remediation script to
64
    determine if this remediation script is applicable for this product.
65
    Return 'True' if so, 'False' otherwise"""
66
67
    # If the platform is None, platform must not exist in the config, so exit with False.
68 2
    if not platform:
69
        return False
70
71 2
    product, product_version = parse_name(product)
72
73
    # Define general platforms
74 2
    multi_platforms = ['multi_platform_all',
75
                       'multi_platform_' + product]
76
77
    # First test if platform isn't for 'multi_platform_all' or
78
    # 'multi_platform_' + product
79 2
    for _platform in multi_platforms:
80 2
        if _platform in platform and product in MULTI_PLATFORM_LIST:
81 2
            return True
82
83 2
    product_name = ""
84
    # Get official name for product
85 2
    if product_version is not None:
86 2
        product_name = map_name(product) + ' ' + product_version
87
    else:
88
        product_name = map_name(product)
89
90
    # Test if this is for the concrete product version
91 2
    for _name_part in platform.split(','):
92 2
        if product_name == _name_part.strip():
93 2
            return True
94
95
    # Remediation script isn't neither a multi platform one, nor isn't
96
    # applicable for this product => return False to indicate that
97 2
    return False
98
99
100 2
def is_applicable(platform, product):
101
    """
102
    Function to check if a platform is applicable for the product.
103
    Handles when a platform is really a list of products, i.e., a
104
    prodtype field from a rule.yml.
105
106
    Returns true iff product is applicable for the platform or list
107
    of products
108
    """
109
110 2
    if platform == 'all' or platform == 'multi_platform_all':
111 2
        return True
112
113 2
    if is_applicable_for_product(platform, product):
114 2
        return True
115
116 2
    if 'osp7' in product and 'osp7' in platform:
117
        return True
118
119 2
    return product in platform.split(',')
120
121
122 2
def required_key(_dict, _key):
123
    """
124
    Returns the value of _key if it is in _dict; otherwise, raise an
125
    exception stating that it was not found but is required.
126
    """
127
128 2
    if _key in _dict:
129 2
        return _dict[_key]
130
131 2
    raise ValueError("%s is required but was not found in:\n%s" %
132
                     (_key, repr(_dict)))
133
134
135 2
def get_cpu_count():
136
    """
137
    Returns the most likely estimate of the number of CPUs in the machine
138
    for threading purposes, gracefully handling errors and possible
139
    exceptions.
140
    """
141
142
    try:
143
        return max(1, multiprocessing.cpu_count())
144
145
    except NotImplementedError:
146
        # 2 CPUs is the most probable
147
        return 2
148
149
150 2
def merge_dicts(left, right):
151
    """
152
    Merges two dictionaries, keeing left and right as passed. If there are any
153
    common keys between left and right, the value from right is use.
154
155
    Returns the merger of the left and right dictionaries
156
    """
157 2
    result = left.copy()
158 2
    result.update(right)
159 2
    return result
160
161
162 2
def subset_dict(dictionary, keys):
163
    """
164
    Restricts dictionary to only have keys from keys. Does not modify either
165
    dictionary or keys, returning the result instead.
166
    """
167
168 2
    result = dictionary.copy()
169 2
    for original_key in dictionary:
170 2
        if original_key not in keys:
171 2
            del result[original_key]
172
173 2
    return result
174
175
176 2
def read_file_list(path):
177
    """
178
    Reads the given file path and returns the contents as a list.
179
    """
180
181 2
    file_contents = open(path, 'r').read().split("\n")
182 2
    if file_contents[-1] == '':
183 2
        file_contents = file_contents[:-1]
184 2
    return file_contents
185
186
187 2
def write_list_file(path, contents):
188
    """
189
    Writes the given contents to path.
190
    """
191
192
    _contents = "\n".join(contents) + "\n"
193
    _f = open(path, 'w')
194
    _f.write(_contents)
195
    _f.flush()
196
    _f.close()
197
198
199
# Taken from https://stackoverflow.com/a/600612/592892
200 2
def mkdir_p(path):
201
    try:
202
        os.makedirs(path)
203
    except OSError as exc:  # Python >2.5
204
        if exc.errno == errno.EEXIST and os.path.isdir(path):
205
            pass
206
        else:
207
            raise
208