Passed
Push — master ( 8b9f7f...914492 )
by Marek
02:15
created

ssg.build_remediations   F

Complexity

Total Complexity 66

Size/Duplication

Total Lines 510
Duplicated Lines 0 %

Test Coverage

Coverage 28.36%

Importance

Changes 0
Metric Value
eloc 243
dl 0
loc 510
ccs 57
cts 201
cp 0.2836
rs 3.12
c 0
b 0
f 0
wmc 66

9 Functions

Rating   Name   Duplication   Size   Complexity  
B is_applicable_for_product() 0 36 8
A get_available_functions() 0 32 5
A is_supported_filename() 0 12 2
B get_fixgroup_for_type() 0 33 5
A get_populate_replacement() 0 16 2
A write_fixes() 0 31 4
F expand_xccdf_subs() 0 249 29
B parse_from_file() 0 36 7
A process_fix() 0 20 4

How to fix   Complexity   

Complexity

Complex classes like ssg.build_remediations often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 2
from __future__ import absolute_import
0 ignored issues
show
Coding Style introduced by
This module should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
2 2
from __future__ import print_function
3
4 2
import sys
5 2
import os
6 2
import os.path
7 2
import re
8 2
import codecs
9 2
from collections import defaultdict, namedtuple
10
11 2
from .jinja import process_file as jinja_process_file
12 2
from .xml import ElementTree
13 2
from .products import parse_name, multi_list, map_name
14
15 2
REMEDIATION_TO_EXT_MAP = {
16
    'anaconda': '.anaconda',
17
    'ansible': '.yml',
18
    'bash': '.sh',
19
    'puppet': '.pp'
20
}
21
22 2
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
23
24 2
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
25
                           'strategy']
26 2
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
27
28
29 2
def is_applicable_for_product(platform, product):
30
    """Based on the platform dict specifier of the remediation script to
31
    determine if this remediation script is applicable for this product.
32
    Return 'True' if so, 'False' otherwise"""
33
34
    # If the platform is None, platform must not exist in the config, so exit with False.
35 2
    if not platform:
36
        return False
37
38 2
    product, product_version = parse_name(product)
39
40
    # Define general platforms
41 2
    multi_platforms = ['multi_platform_all',
42
                       'multi_platform_' + product]
43
44
    # First test if platform isn't for 'multi_platform_all' or
45
    # 'multi_platform_' + product
46 2
    for _platform in multi_platforms:
47 2
        if _platform in platform and product in multi_list:
48 2
            return True
49
50 2
    product_name = ""
51
    # Get official name for product
52 2
    if product_version is not None:
53 2
        product_name = map_name(product) + ' ' + product_version
54
    else:
55
        product_name = map_name(product)
56
57
    # Test if this is for the concrete product version
58 2
    for _name_part in platform.split(','):
59 2
        if product_name == _name_part.strip():
60 2
            return True
61
62
    # Remediation script isn't neither a multi platform one, nor isn't
63
    # applicable for this product => return False to indicate that
64 2
    return False
65
66
67 2
def get_available_functions(build_dir):
68
    """Parse the content of "$CMAKE_BINARY_DIR/bash-remediation-functions.xml"
69
    XML file to obtain the list of currently known SCAP Security Guide internal
70
    remediation functions"""
71
72
    # If location of /shared directory is known
73
    if build_dir is None or not os.path.isdir(build_dir):
74
        sys.stderr.write("Expected '%s' to be the build directory. It doesn't "
75
                         "exist or is not a directory." % (build_dir))
76
        sys.exit(1)
77
78
    # Construct the final path of XML file with remediation functions
79
    xmlfilepath = \
80
        os.path.join(build_dir, "bash-remediation-functions.xml")
81
82
    if not os.path.isfile(xmlfilepath):
83
        sys.stderr.write("Expected '%s' to contain the remediation functions. "
84
                         "The file was not found!\n" % (xmlfilepath))
85
        sys.exit(1)
86
87
    remediation_functions = []
88
    with codecs.open(xmlfilepath, "r", encoding="utf-8") as xmlfile:
89
        filestring = xmlfile.read()
90
        # This regex looks implementation dependent but we can rely on
91
        # ElementTree sorting XML attrs alphabetically. Hidden is guaranteed
92
        # to be the first attr and ID is guaranteed to be second.
93
        remediation_functions = re.findall(
94
            r'<Value hidden=\"true\" id=\"function_(\S+)\"',
95
            filestring, re.DOTALL
96
        )
97
98
    return remediation_functions
99
100
101 2
def get_fixgroup_for_type(fixcontent, remediation_type):
102
    """
103
    For a given remediation type, return a new subelement of that type.
104
105
    Exits if passed an unknown remediation type.
106
    """
107
    if remediation_type == 'anaconda':
108
        return ElementTree.SubElement(
109
            fixcontent, "fix-group", id="anaconda",
110
            system="urn:redhat:anaconda:pre",
111
            xmlns="http://checklists.nist.gov/xccdf/1.1")
112
113
    elif remediation_type == 'ansible':
114
        return ElementTree.SubElement(
115
            fixcontent, "fix-group", id="ansible",
116
            system="urn:xccdf:fix:script:ansible",
117
            xmlns="http://checklists.nist.gov/xccdf/1.1")
118
119
    elif remediation_type == 'bash':
120
        return ElementTree.SubElement(
121
            fixcontent, "fix-group", id="bash",
122
            system="urn:xccdf:fix:script:sh",
123
            xmlns="http://checklists.nist.gov/xccdf/1.1")
124
125
    elif remediation_type == 'puppet':
126
        return ElementTree.SubElement(
127
            fixcontent, "fix-group", id="puppet",
128
            system="urn:xccdf:fix:script:puppet",
129
            xmlns="http://checklists.nist.gov/xccdf/1.1")
130
131
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
132
                     % (remediation_type))
133
    sys.exit(1)
134
135
136 2
def is_supported_filename(remediation_type, filename):
137
    """
138
    Checks if filename has a supported extension for remediation_type.
139
140
    Exits when remediation_type is of an unknown type.
141
    """
142 2
    if remediation_type in REMEDIATION_TO_EXT_MAP:
143 2
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
144
145
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
146
                     % (remediation_type))
147
    sys.exit(1)
148
149
150 2
def get_populate_replacement(remediation_type, text):
151
    """
152
    Return varname, fixtextcontribution
153
    """
154
155
    if remediation_type == 'bash':
156
        # Extract variable name
157
        varname = re.search(r'\npopulate (\S+)\n',
158
                            text, re.DOTALL).group(1)
159
        # Define fix text part to contribute to main fix text
160
        fixtextcontribution = '\n%s="' % varname
161
        return (varname, fixtextcontribution)
162
163
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
164
                     % (remediation_type))
165
    sys.exit(1)
166
167
168 2
def parse_from_file(file_path, env_yaml):
169
    """
170
    Parses a remediation from a file. As remediations contain jinja macros,
171
    we need a env_yaml context to process these. In practice, no remediations
172
    use jinja in the configuration, so for extracting only the configuration,
173
    env_yaml can be an abritrary product.yml dictionary.
174
175
    If the logic of configuration parsing changes significantly, please also
176
    update ssg.fixes.parse_platform(...).
177
    """
178
179 2
    mod_file = []
180 2
    config = defaultdict(lambda: None)
181
182 2
    fix_file_lines = jinja_process_file(file_path, env_yaml).splitlines()
183
184
    # Assignment automatically escapes shell characters for XML
185 2
    for line in fix_file_lines:
186 2
        if line.startswith(FILE_GENERATED_HASH_COMMENT):
187
            continue
188
189 2
        if line.startswith('#') and line.count('=') == 1:
190 2
            (key, value) = line.strip('#').split('=')
191 2
            if key.strip() in REMEDIATION_CONFIG_KEYS:
192 2
                config[key.strip()] = value.strip()
193 2
                continue
194
195
        # If our parsed line wasn't a config item, add it to the
196
        # returned file contents. This includes when the line
197
        # begins with a '#' and contains an equals sign, but
198
        # the "key" isn't one of the known keys from
199
        # REMEDIATION_CONFIG_KEYS.
200 2
        mod_file.append(line)
201
202 2
    remediation = namedtuple('remediation', ['contents', 'config'])
203 2
    return remediation(mod_file, config)
204
205
206 2
def process_fix(fixes, remediation_type, env_yaml, product, file_path, fix_name):
0 ignored issues
show
best-practice introduced by
Too many arguments (6/5)
Loading history...
207
    """
208
    Process a fix, adding it to fixes iff the file is of a valid extension
209
    for the remediation type and the fix is valid for the current product.
210
211
    Note that platform is a required field in the contents of the fix.
212
    """
213
214 2
    if not is_supported_filename(remediation_type, file_path):
215
        return
216
217 2
    result = parse_from_file(file_path, env_yaml)
218
219 2
    if not result.config['platform']:
220
        raise RuntimeError(
221
            "The '%s' remediation script does not contain the "
222
            "platform identifier!" % (file_path))
223
224 2
    if is_applicable_for_product(result.config['platform'], product):
225 2
        fixes[fix_name] = result
226
227
228 2
def write_fixes(remediation_type, build_dir, output_path, fixes):
229
    """
230
    Builds a fix-content XML tree from the contents of fixes
231
    and writes it to output_path.
232
    """
233
234
    fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh",
235
                                     xmlns="http://checklists.nist.gov/xccdf/1.1")
236
    fixgroup = get_fixgroup_for_type(fixcontent, remediation_type)
237
238
    remediation_functions = get_available_functions(build_dir)
239
240
    for fix_name in fixes:
241
        fix_contents, config = fixes[fix_name]
242
243
        fix_elm = ElementTree.SubElement(fixgroup, "fix")
244
        fix_elm.set("rule", fix_name)
245
246
        for key in REMEDIATION_ELM_KEYS:
247
            if config[key]:
248
                fix_elm.set(key, config[key])
249
250
        fix_elm.text = "\n".join(fix_contents)
251
        fix_elm.text += "\n"
252
253
        # Expand shell variables and remediation functions
254
        # into corresponding XCCDF <sub> elements
255
        expand_xccdf_subs(fix_elm, remediation_type, remediation_functions)
256
257
    tree = ElementTree.ElementTree(fixcontent)
258
    tree.write(output_path)
259
260
261 2
def expand_xccdf_subs(fix, remediation_type, remediation_functions):
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (27/15).
Loading history...
262
    """For those remediation scripts utilizing some of the internal SCAP
263
    Security Guide remediation functions expand the selected shell variables
264
    and remediation functions calls with <xccdf:sub> element
265
266
    This routine translates any instance of the 'populate' function call in
267
    the form of:
268
269
            populate variable_name
270
271
    into
272
273
            variable_name="<sub idref="variable_name"/>"
274
275
    Also transforms any instance of the 'ansible-populate' function call in the
276
    form of:
277
            (ansible-populate variable_name)
278
    into
279
280
            <sub idref="variable_name"/>
281
282
    Also transforms any instance of some other known remediation function (e.g.
283
    'replace_or_append' etc.) from the form of:
284
285
            function_name "arg1" "arg2" ... "argN"
286
287
    into:
288
289
            <sub idref="function_function_name"/>
290
            function_name "arg1" "arg2" ... "argN"
291
    """
292
293
    if remediation_type == "ansible":
294
        fix_text = fix.text
295
296
        if "(ansible-populate " in fix_text:
297
            raise RuntimeError(
298
                "(ansible-populate VAR) has been deprecated. Please use "
299
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
300
                "make an ansible variable out of XCCDF Value as opposed to "
301
                "substituting directly."
302
            )
303
304
        # we use the horrid "!!str |-" syntax to force strings without using
305
        # quotes. quotes enable yaml escaping rules so we'd have to escape all
306
        # the backslashes and at this point we don't know if there are any.
307
        fix_text = re.sub(
308
            r"- \(xccdf-var\s+(\S+)\)",
309
            r"- name: XCCDF Value \1 # promote to variable\n"
310
            r"  set_fact:\n"
311
            r"    \1: !!str |-\n"
312
            r"        (ansible-populate \1)\n"
313
            r"  tags:\n"
314
            r"    - always",
315
            fix_text
316
        )
317
318
        pattern = r'\(ansible-populate\s*(\S+)\)'
319
320
        # we will get list what looks like
321
        # [text, varname, text, varname, ..., text]
322
        parts = re.split(pattern, fix_text)
323
324
        fix.text = parts[0]  # add first "text"
325
        for index in range(1, len(parts), 2):
326
            varname = parts[index]
327
            text_between_vars = parts[index + 1]
328
329
            # we cannot combine elements and text easily
330
            # so text is in ".tail" of element
331
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
332
            xccdfvarsub.tail = text_between_vars
333
        return
334
335
    elif remediation_type == "puppet":
336
        pattern = r'\(puppet-populate\s*(\S+)\)'
337
338
        # we will get list what looks like
339
        # [text, varname, text, varname, ..., text]
340
        parts = re.split(pattern, fix.text)
341
342
        fix.text = parts[0]  # add first "text"
343
        for index in range(1, len(parts), 2):
344
            varname = parts[index]
345
            text_between_vars = parts[index + 1]
346
347
            # we cannot combine elements and text easily
348
            # so text is in ".tail" of element
349
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
350
            xccdfvarsub.tail = text_between_vars
351
        return
352
353
    elif remediation_type == "anaconda":
354
        pattern = r'\(anaconda-populate\s*(\S+)\)'
355
356
        # we will get list what looks like
357
        # [text, varname, text, varname, ..., text]
358
        parts = re.split(pattern, fix.text)
359
360
        fix.text = parts[0]  # add first "text"
361
        for index in range(1, len(parts), 2):
362
            varname = parts[index]
363
            text_between_vars = parts[index + 1]
364
365
            # we cannot combine elements and text easily
366
            # so text is in ".tail" of element
367
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
368
            xccdfvarsub.tail = text_between_vars
369
        return
370
371
    elif remediation_type == "bash":
0 ignored issues
show
unused-code introduced by
Too many nested blocks (7/5)
Loading history...
372
        # This remediation script doesn't utilize internal remediation functions
373
        # Skip it without any further processing
374
        if 'remediation_functions' not in fix.text:
375
            return
376
377
        # This remediation script utilizes some of internal remediation functions
378
        # Expand shell variables and remediation functions calls with <xccdf:sub>
379
        # elements
380
        pattern = r'\n+(\s*(?:' + r'|'.join(remediation_functions) + r')[^\n]*)\n'
381
        patcomp = re.compile(pattern, re.DOTALL)
382
        fixparts = re.split(patcomp, fix.text)
383
        if fixparts[0] is not None:
384
            # Split the portion of fix.text from fix start to first call of
385
            # remediation function, keeping only the third part:
386
            # * tail        to hold part of the fix.text after inclusion,
387
            #               but before first call of remediation function
388
            try:
389
                rfpattern = '(.*remediation_functions)(.*)'
390
                rfpatcomp = re.compile(rfpattern, re.DOTALL)
391
                _, _, tail, _ = re.split(rfpatcomp, fixparts[0], maxsplit=2)
392
            except ValueError:
393
                sys.stderr.write("Processing fix.text for: %s rule\n"
394
                                 % fix.get('rule'))
395
                sys.stderr.write("Unable to extract part of the fix.text "
396
                                 "after inclusion of remediation functions."
397
                                 " Aborting..\n")
398
                sys.exit(1)
399
            # If the 'tail' is not empty, make it new fix.text.
400
            # Otherwise use ''
401
            fix.text = tail if tail is not None else ''
0 ignored issues
show
introduced by
The variable tail does not seem to be defined for all execution paths.
Loading history...
402
            # Drop the first element of 'fixparts' since it has been processed
403
            fixparts.pop(0)
404
            # Perform sanity check on new 'fixparts' list content (to continue
405
            # successfully 'fixparts' has to contain even count of elements)
406
            if len(fixparts) % 2 != 0:
407
                sys.stderr.write("Error performing XCCDF expansion on "
408
                                 "remediation script: %s\n"
409
                                 % fix.get("rule"))
410
                sys.stderr.write("Invalid count of elements. Exiting!\n")
411
                sys.exit(1)
412
            # Process remaining 'fixparts' elements in pairs
413
            # First pair element is remediation function to be XCCDF expanded
414
            # Second pair element (if not empty) is the portion of the original
415
            # fix text to be used in newly added sublement's tail
416
            for idx in range(0, len(fixparts), 2):
417
                # We previously removed enclosing newlines when creating
418
                # fixparts list. Add them back and reuse the above 'pattern'
419
                fixparts[idx] = "\n%s\n" % fixparts[idx]
420
                # Sanity check (verify the first field truly contains call of
421
                # some of the remediation functions)
422
                if re.match(pattern, fixparts[idx], re.DOTALL) is not None:
423
                    # This chunk contains call of 'populate' function
424
                    if "populate" in fixparts[idx]:
425
                        varname, fixtextcontrib = get_populate_replacement(remediation_type,
426
                                                                           fixparts[idx])
427
                        # Define new XCCDF <sub> element for the variable
428
                        xccdfvarsub = ElementTree.Element("sub", idref=varname)
429
430
                        # If this is first sub element,
431
                        # the textcontribution needs to go to fix text
432
                        # otherwise, append to last subelement
433
                        nfixchildren = len(list(fix))
434
                        if nfixchildren == 0:
435
                            fix.text += fixtextcontrib
436
                        else:
437
                            previouselem = fix[nfixchildren-1]
438
                            previouselem.tail += fixtextcontrib
439
440
                        # If second pair element is not empty, append it as
441
                        # tail for the subelement (prefixed with closing '"')
442
                        if fixparts[idx + 1] is not None:
443
                            xccdfvarsub.tail = '"' + '\n' + fixparts[idx + 1]
444
                        # Otherwise append just enclosing '"'
445
                        else:
446
                            xccdfvarsub.tail = '"' + '\n'
447
                        # Append the new subelement to the fix element
448
                        fix.append(xccdfvarsub)
449
                    # This chunk contains call of other remediation function
450
                    else:
451
                        # Extract remediation function name
452
                        funcname = re.search(r'\n\s*(\S+)(| .*)\n',
453
                                             fixparts[idx],
454
                                             re.DOTALL).group(1)
455
                        # Define new XCCDF <sub> element for the function
456
                        xccdffuncsub = ElementTree.Element(
457
                            "sub", idref='function_%s' % funcname)
458
                        # Append original function call into tail of the
459
                        # subelement
460
                        xccdffuncsub.tail = fixparts[idx]
461
                        # If the second element of the pair is not empty,
462
                        # append it to the tail of the subelement too
463
                        if fixparts[idx + 1] is not None:
464
                            xccdffuncsub.tail += fixparts[idx + 1]
465
                        # Append the new subelement to the fix element
466
                        fix.append(xccdffuncsub)
467
                        # Ensure the newly added <xccdf:sub> element for the
468
                        # function will be always inserted at newline
469
                        # If xccdffuncsub is the first <xccdf:sub> element
470
                        # being added as child of <fix> and fix.text doesn't
471
                        # end up with newline character, append the newline
472
                        # to the fix.text
473
                        if list(fix).index(xccdffuncsub) == 0:
474
                            if re.search(r'.*\n$', fix.text) is None:
475
                                fix.text += '\n'
476
                        # If xccdffuncsub isn't the first child (first
477
                        # <xccdf:sub> being added), and tail of previous
478
                        # child doesn't end up with newline, append the newline
479
                        # to the tail of previous child
480
                        else:
481
                            previouselem = fix[list(fix).index(xccdffuncsub) - 1]
482
                            if re.search(r'.*\n$', previouselem.tail) is None:
483
                                previouselem.tail += '\n'
484
485
        # Perform a sanity check if all known remediation function calls have been
486
        # properly XCCDF substituted. Exit with failure if some wasn't
487
488
        # First concat output form of modified fix text (including text appended
489
        # to all children of the fix)
490
        modfix = [fix.text]
491
        for child in fix.getchildren():
492
            if child is not None and child.text is not None:
493
                modfix.append(child.text)
494
        modfixtext = "".join(modfix)
495
        for func in remediation_functions:
496
            # Then efine expected XCCDF sub element form for this function
497
            funcxccdfsub = "<sub idref=\"function_%s\"" % func
498
            # Finally perform the sanity check -- if function was properly XCCDF
499
            # substituted both the original function call and XCCDF <sub> element
500
            # for that function need to be present in the modified text of the fix
501
            # Otherwise something went wrong, thus exit with failure
502
            if func in modfixtext and funcxccdfsub not in modfixtext:
503
                sys.stderr.write("Error performing XCCDF <sub> substitution "
504
                                 "for function %s in %s fix. Exiting...\n"
505
                                 % (func, fix.get("rule")))
506
                sys.exit(1)
507
    else:
508
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
509
        sys.exit(1)
510