ssg.utils   F
last analyzed

Complexity

Total Complexity 106

Size/Duplication

Total Lines 502
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 270
dl 0
loc 502
ccs 0
cts 229
cp 0
rs 2
c 0
b 0
f 0
wmc 106

30 Functions

Rating   Name   Duplication   Size   Complexity  
A name_to_platform() 0 8 2
A map_name() 0 21 5
A enum() 0 3 1
A get_cpu_count() 0 13 2
A escape_id() 0 5 1
A recurse_or_substitute_or_do_nothing() 0 8 3
A comparison_to_oval() 0 5 1
A apply_formatting_on_dict_values() 0 15 3
A escape_yaml_key() 0 6 1
A parse_template_boolean_value() 0 12 4
A merge_dicts() 0 10 1
A prodtype_to_name() 0 10 5
A escape_comparison() 0 5 1
A ensure_file_paths_and_file_regexes_are_correctly_defined() 0 27 5
A prodtype_to_platform() 0 8 2
A read_file_list() 0 7 2
A write_list_file() 0 10 1
A required_key() 0 11 2
A _map_comparison_op() 0 5 2
A sha256() 0 2 1
A mkdir_p() 0 13 5
C is_applicable_for_product() 0 38 10
A parse_name() 0 17 2
A subset_dict() 0 12 3
B check_conflict_regex_directory() 0 20 6
A split_string_content() 0 9 2
A banner_anchor_wrap() 0 2 1
A escape_regex() 0 6 1
A banner_regexify() 0 5 1
B is_applicable() 0 20 6

18 Methods

Rating   Name   Duplication   Size   Complexity  
A VersionSpecifierSet.__init__() 0 6 3
A VersionSpecifier.__repr__() 0 2 1
A VersionSpecifier.__init__() 0 3 1
A VersionSpecifier.evr_dict_to_str() 0 13 5
A VersionSpecifierSet.oval_id() 0 3 1
A VersionSpecifier.oval_id() 0 3 1
A VersionSpecifier.evr_op() 0 3 1
A VersionSpecifier.__hash__() 0 2 1
A VersionSpecifier.__lt__() 0 2 1
A VersionSpecifier.title() 0 3 1
A VersionSpecifier.__str__() 0 2 1
A VersionSpecifier.ver() 0 3 1
A VersionSpecifierSet.cpe_id() 0 3 1
A VersionSpecifier.__eq__() 0 2 1
A VersionSpecifierSet.title() 0 3 1
A VersionSpecifier.evr_ver() 0 3 1
A VersionSpecifier.cpe_id() 0 3 1
A VersionSpecifier.ev_ver() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like ssg.utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import absolute_import
2
from __future__ import print_function
3
4
import multiprocessing
5
import errno
6
import os
7
import re
8
from collections import namedtuple
9
import hashlib
10
11
from .constants import (FULL_NAME_TO_PRODUCT_MAPPING,
12
                        MAKEFILE_ID_TO_PRODUCT_MAP,
13
                        MULTI_PLATFORM_LIST,
14
                        MULTI_PLATFORM_MAPPING)
15
16
17
class SSGError(RuntimeError):
18
    pass
19
20
21
PRODUCT_NAME_PARSER = re.compile(r"([a-zA-Z\-]+)([0-9]+)")
22
23
24
class VersionSpecifierSet(set):
25
    def __init__(self, s=()):
26
        for el in s:
27
            if not isinstance(el, VersionSpecifier):
28
                raise ValueError('VersionSpecifierSet can only work with VersionSpecifier objects,'
29
                                 ' invalid object: {0}'.format(repr(el)))
30
        super(VersionSpecifierSet, self).__init__(s)
31
32
    @property
33
    def title(self):
34
        return ' and '.join([ver_spec.title for ver_spec in sorted(self)])
35
36
    @property
37
    def cpe_id(self):
38
        return ':'.join([ver_spec.cpe_id for ver_spec in sorted(self)])
39
40
    @property
41
    def oval_id(self):
42
        return '_'.join([ver_spec.oval_id for ver_spec in sorted(self)])
43
44
45
class VersionSpecifier:
46
    def __init__(self, op, evr_ver_dict):
47
        self._evr_ver_dict = evr_ver_dict
48
        self.op = op
49
50
    def __str__(self):
51
        return '{0} {1}'.format(self.op, self.ver)
52
53
    def __repr__(self):
54
        return '<VersionSpecifier({0},{1})>'.format(self.op, self.ver)
55
56
    def __hash__(self):
57
        return hash(self.op + self.ver)
58
59
    def __eq__(self, other):
60
        return self.op+self.ver == other.op+other.ver
61
62
    def __lt__(self, other):
63
        return self.op+self.ver < other.op+other.ver
64
65
    @property
66
    def evr_op(self):
67
        return comparison_to_oval(self.op)
68
69
    @property
70
    def ver(self):
71
        return VersionSpecifier.evr_dict_to_str(self._evr_ver_dict)
72
73
    @property
74
    def evr_ver(self):
75
        return VersionSpecifier.evr_dict_to_str(self._evr_ver_dict, True)
76
77
    @property
78
    def ev_ver(self):
79
        return VersionSpecifier.evr_dict_to_str(self._evr_ver_dict, True).split("-")[0]
80
81
    @property
82
    def title(self):
83
        return '{0} {1}'.format(comparison_to_oval(self.op), self.ver)
84
85
    @property
86
    def cpe_id(self):
87
        return '{0}:{1}'.format(escape_comparison(self.op), self.ver)
88
89
    @property
90
    def oval_id(self):
91
        return '{0}_{1}'.format(escape_comparison(self.op), escape_id(self.ver))
92
93
    @staticmethod
94
    def evr_dict_to_str(evr, fully_formed_evr_string=False):
95
        res = ''
96
        if evr['epoch'] is not None:
97
            res += evr['epoch'] + ':'
98
        elif fully_formed_evr_string:
99
            res += '0:'
100
        res += evr['version']
101
        if evr['release'] is not None:
102
            res += '-' + evr['release']
103
        elif fully_formed_evr_string:
104
            res += '-0'
105
        return res
106
107
108
def map_name(version):
109
    """Maps SSG Makefile internal product name to official product name"""
110
111
    if version.startswith("multi_platform_"):
112
        trimmed_version = version[len("multi_platform_"):]
113
        if trimmed_version not in MULTI_PLATFORM_LIST:
114
            raise RuntimeError(
115
                "%s is an invalid product version. If it's multi_platform the "
116
                "suffix has to be from (%s)."
117
                % (version, ", ".join(MULTI_PLATFORM_LIST))
118
            )
119
        return map_name(trimmed_version)
120
121
    # By sorting in reversed order, keys which are a longer version of other keys are
122
    # visited first (e.g., rhosp vs. rhel)
123
    for key in sorted(MAKEFILE_ID_TO_PRODUCT_MAP, reverse=True):
124
        if version.startswith(key):
125
            return MAKEFILE_ID_TO_PRODUCT_MAP[key]
126
127
    raise RuntimeError("Can't map version '%s' to any known product!"
128
                       % (version))
129
130
131
def prodtype_to_name(prod):
132
    """
133
    Converts a vaguely-prodtype-like thing into one or more full product names.
134
    """
135
    for name, prod_type in FULL_NAME_TO_PRODUCT_MAPPING.items():
136
        if prod == prod_type:
137
            return name
138
    if prod in MULTI_PLATFORM_LIST or prod == 'all':
139
        return "multi_platform_" + prod
140
    raise RuntimeError("Unknown product name: %s" % prod)
141
142
143
def name_to_platform(names):
144
    """
145
    Converts one or more full names to a string containing one or more
146
    <platform> elements.
147
    """
148
    if isinstance(names, str):
149
        return "<platform>%s</platform>" % names
150
    return "\n".join(map(name_to_platform, names))
151
152
153
def prodtype_to_platform(prods):
154
    """
155
    Converts one or more prodtypes into a string with one or more <platform>
156
    elements.
157
    """
158
    if isinstance(prods, str):
159
        return name_to_platform(prodtype_to_name(prods))
160
    return "\n".join(map(prodtype_to_platform, prods))
161
162
163
def parse_name(product):
164
    """
165
    Returns a namedtuple of (name, version) from parsing a given product;
166
    e.g., "rhel7" -> ("rhel", "7")
167
    """
168
169
    prod_tuple = namedtuple('product', ['name', 'version'])
170
171
    _product = product
172
    _product_version = None
173
    match = PRODUCT_NAME_PARSER.match(product)
174
175
    if match:
176
        _product = match.group(1)
177
        _product_version = match.group(2)
178
179
    return prod_tuple(_product, _product_version)
180
181
182
def is_applicable_for_product(platform, product):
183
    """Based on the platform dict specifier of the remediation script to
184
    determine if this remediation script is applicable for this product.
185
    Return 'True' if so, 'False' otherwise"""
186
187
    # If the platform is None, platform must not exist in the config, so exit with False.
188
    if not platform:
189
        return False
190
191
    product, product_version = parse_name(product)
192
193
    # Define general platforms
194
    multi_platforms = ['multi_platform_all',
195
                       'multi_platform_' + product]
196
197
    # First test if platform isn't for 'multi_platform_all' or
198
    # 'multi_platform_' + product
199
    for _platform in multi_platforms:
200
        if _platform in platform and product in MULTI_PLATFORM_LIST:
201
            return True
202
203
    product_name = ""
204
    # Get official name for product
205
    if product_version is not None:
206
        if product == "ubuntu" or product == "macos":
207
            product_version = product_version[:2] + "." + product_version[2:]
208
        product_name = map_name(product) + ' ' + product_version
209
    else:
210
        product_name = map_name(product)
211
212
    # Test if this is for the concrete product version
213
    for _name_part in platform.split(','):
214
        if product_name == _name_part.strip():
215
            return True
216
217
    # Remediation script isn't neither a multi platform one, nor isn't
218
    # applicable for this product => return False to indicate that
219
    return False
220
221
222
def is_applicable(platform, product):
223
    """
224
    Function to check if a platform is applicable for the product.
225
    Handles when a platform is really a list of products, i.e., a
226
    prodtype field from a rule.yml.
227
228
    Returns true iff product is applicable for the platform or list
229
    of products
230
    """
231
232
    if platform == 'all' or platform == 'multi_platform_all':
233
        return True
234
235
    if is_applicable_for_product(platform, product):
236
        return True
237
238
    if 'osp7' in product and 'osp7' in platform:
239
        return True
240
241
    return product in platform.split(',')
242
243
244
def required_key(_dict, _key):
245
    """
246
    Returns the value of _key if it is in _dict; otherwise, raise an
247
    exception stating that it was not found but is required.
248
    """
249
250
    if _key in _dict:
251
        return _dict[_key]
252
253
    raise ValueError("%s is required but was not found in:\n%s" %
254
                     (_key, repr(_dict)))
255
256
257
def get_cpu_count():
258
    """
259
    Returns the most likely estimate of the number of CPUs in the machine
260
    for threading purposes, gracefully handling errors and possible
261
    exceptions.
262
    """
263
264
    try:
265
        return max(1, multiprocessing.cpu_count())
266
267
    except NotImplementedError:
268
        # 2 CPUs is the most probable
269
        return 2
270
271
272
def merge_dicts(left, right):
273
    """
274
    Merges two dictionaries, keeing left and right as passed. If there are any
275
    common keys between left and right, the value from right is use.
276
277
    Returns the merger of the left and right dictionaries
278
    """
279
    result = left.copy()
280
    result.update(right)
281
    return result
282
283
284
def subset_dict(dictionary, keys):
285
    """
286
    Restricts dictionary to only have keys from keys. Does not modify either
287
    dictionary or keys, returning the result instead.
288
    """
289
290
    result = dictionary.copy()
291
    for original_key in dictionary:
292
        if original_key not in keys:
293
            del result[original_key]
294
295
    return result
296
297
298
def read_file_list(path):
299
    """
300
    Reads the given file path and returns the contents as a list.
301
    """
302
303
    with open(path, 'r') as f:
304
        return split_string_content(f.read())
305
306
307
def split_string_content(content):
308
    """
309
    Split the string content and returns as a list.
310
    """
311
312
    file_contents = content.split("\n")
313
    if file_contents[-1] == '':
314
        file_contents = file_contents[:-1]
315
    return file_contents
316
317
318
def write_list_file(path, contents):
319
    """
320
    Writes the given contents to path.
321
    """
322
323
    _contents = "\n".join(contents) + "\n"
324
    _f = open(path, 'w')
325
    _f.write(_contents)
326
    _f.flush()
327
    _f.close()
328
329
330
# Taken from https://stackoverflow.com/a/600612/592892
331
def mkdir_p(path):
332
    if os.path.isdir(path):
333
        return False
334
    # Python >=3.4.1
335
    # os.makedirs(path, exist_ok=True)
336
    try:
337
        os.makedirs(path)
338
        return True
339
    except OSError as exc:  # Python >2.5
340
        if exc.errno == errno.EEXIST and os.path.isdir(path):
341
            return False
342
        else:
343
            raise
344
345
346
def escape_regex(text):
347
    # We could use re.escape(), but it escapes too many characters, including plain white space.
348
    # In python 3.7 the set of charaters escaped by re.escape is reasonable, so lets mimic it.
349
    # See https://docs.python.org/3/library/re.html#re.sub
350
    # '!', '"', '%', "'", ',', '/', ':', ';', '<', '=', '>', '@', and "`" are not escaped.
351
    return re.sub(r"([#$&*+.^`|~:()-])", r"\\\1", text)
352
353
354
def escape_id(text):
355
    # Make a string used as an Id for OSCAP/XCCDF/OVAL entities more readable
356
    # and compatible with:
357
    # OVAL: r'oval:[A-Za-z0-9_\-\.]+:ste:[1-9][0-9]*'
358
    return re.sub(r"[^\w]+", "_", text).strip("_")
359
360
361
def escape_yaml_key(text):
362
    # Due to the limitation of OVAL's name argument of the filed type
363
    # we have to avoid using uppercase letters for keys. The probe would escape
364
    # them with '^' symbol.
365
    # myCamelCase^Key -> my^camel^case^^^key
366
    return re.sub(r'([A-Z^])', '^\\1', text).lower()
367
368
369
def _map_comparison_op(op, table):
370
    if op not in table:
371
        raise KeyError("Invalid comparison operator: %s (expected one of: %s)",
372
                       op, ', '.join(table.keys()))
373
    return table[op]
374
375
376
def escape_comparison(op):
377
    return _map_comparison_op(op, {
378
        '==': 'eq',       '!=': 'ne',
379
        '>': 'gt',        '<': 'le',
380
        '>=': 'gt_or_eq', '<=': 'le_or_eq',
381
    })
382
383
384
def comparison_to_oval(op):
385
    return _map_comparison_op(op, {
386
        '==': 'equals',                '!=': 'not equal',
387
        '>': 'greater than',           '<': 'less than',
388
        '>=': 'greater than or equal', '<=': 'less than or equal',
389
    })
390
391
392
def sha256(text):
393
    return hashlib.sha256(text.encode('utf-8')).hexdigest()
394
395
396
def banner_regexify(banner_text):
397
    return escape_regex(banner_text) \
398
        .replace("\n", "BFLMPSVZ") \
399
        .replace(" ", "[\\s\\n]+") \
400
        .replace("BFLMPSVZ", "(?:[\\n]+|(?:\\\\n)+)")
401
402
403
def banner_anchor_wrap(banner_text):
404
    return "^" + banner_text + "$"
405
406
407
def parse_template_boolean_value(data, parameter, default_value):
408
    value = data.get(parameter)
409
    if not value:
410
        return default_value
411
    if value == "true":
412
        return True
413
    elif value == "false":
414
        return False
415
    else:
416
        raise ValueError(
417
            "Template parameter {} used in rule {} cannot accept the "
418
            "value {}".format(parameter, data["_rule_id"], value))
419
420
421
def check_conflict_regex_directory(data):
422
    """
423
    Validate that either all path are directories OR file_regex exists.
424
425
    Throws ValueError.
426
    """
427
    for f in data["filepath"]:
428
        if "is_directory" in data and data["is_directory"] != f.endswith("/"):
429
            raise ValueError(
430
                "If passing a list of filepaths, all of them need to be "
431
                "either directories or files. Mixing is not possible. "
432
                "Please fix rules '{0}' filepath '{1}'".format(data["_rule_id"], f))
433
434
        data["is_directory"] = f.endswith("/")
435
436
        if "file_regex" in data and not data["is_directory"]:
437
            raise ValueError(
438
                "Used 'file_regex' key in rule '{0}' but filepath '{1}' does not "
439
                "specify a directory. Append '/' to the filepath or remove the "
440
                "'file_regex' key.".format(data["_rule_id"], f))
441
442
443
def enum(*args):
444
    enums = dict(zip(args, range(len(args))))
445
    return type('Enum', (), enums)
446
447
448
def recurse_or_substitute_or_do_nothing(
449
        v, string_dict, ignored_keys=frozenset()):
450
    if isinstance(v, dict):
451
        return apply_formatting_on_dict_values(v, string_dict, ignored_keys)
452
    elif isinstance(v, str):
453
        return v.format(**string_dict)
454
    else:
455
        return v
456
457
458
def apply_formatting_on_dict_values(source_dict, string_dict, ignored_keys=frozenset()):
459
    """
460
    Uses Python built-in string replacement.
461
    It replaces strings marked by {token} if "token" is a key in the string_dict parameter.
462
    It skips keys in source_dict which are listed in ignored_keys parameter.
463
    This works only for dictionaries whose values are dicts or strings
464
    """
465
    new_dict = {}
466
    for k, v in source_dict.items():
467
        if k not in ignored_keys:
468
            new_dict[k] = recurse_or_substitute_or_do_nothing(
469
                v, string_dict, ignored_keys)
470
        else:
471
            new_dict[k] = v
472
    return new_dict
473
474
475
def ensure_file_paths_and_file_regexes_are_correctly_defined(data):
476
    """
477
    This function is common for the file_owner, file_groupowner
478
    and file_permissions templates.
479
    It ensures that the data structure meets certain rules, e.g. the file_path
480
    item is a list and number of list items in file_regex
481
    equals to number of items in file_path.
482
    """
483
    # this avoids code duplicates
484
    if isinstance(data["filepath"], str):
485
        data["filepath"] = [data["filepath"]]
486
487
    if "file_regex" in data:
488
        # we can have a list of filepaths, but only one regex
489
        # instead of declaring the same regex multiple times
490
        if isinstance(data["file_regex"], str):
491
            data["file_regex"] = [data["file_regex"]] * len(data["filepath"])
492
493
        # if the length of filepaths and file_regex are not the same, then error.
494
        # in case we have multiple regexes for just one filepath, than we need
495
        # to declare that filepath multiple times
496
        if len(data["filepath"]) != len(data["file_regex"]):
497
            raise ValueError(
498
                "You should have one file_path per file_regex. Please check "
499
                "rule '{0}'".format(data["_rule_id"]))
500
501
    check_conflict_regex_directory(data)
502