Passed
Push — master ( 52f41a...a7a81a )
by Matěj
02:01
created

ssg.build_remediations.write_fixes_to_dir()   B

Complexity

Conditions 6

Size

Total Lines 18
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 34.8262

Importance

Changes 0
Metric Value
cc 6
eloc 14
nop 3
dl 0
loc 18
ccs 1
cts 14
cp 0.0714
crap 34.8262
rs 8.6666
c 0
b 0
f 0
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import sys
5 2
import os
6 2
import os.path
7 2
import re
8 2
import codecs
9 2
from collections import defaultdict, namedtuple
10
11 2
from .jinja import process_file as jinja_process_file
12 2
from .xml import ElementTree
13 2
from .products import parse_name, map_name
14 2
from .constants import MULTI_PLATFORM_LIST
15
16 2
REMEDIATION_TO_EXT_MAP = {
17
    'anaconda': '.anaconda',
18
    'ansible': '.yml',
19
    'bash': '.sh',
20
    'puppet': '.pp'
21
}
22
23 2
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
24
25 2
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
26
                           'strategy']
27 2
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
28
29
30 2
def is_applicable_for_product(platform, product):
31
    """Based on the platform dict specifier of the remediation script to
32
    determine if this remediation script is applicable for this product.
33
    Return 'True' if so, 'False' otherwise"""
34
35
    # If the platform is None, platform must not exist in the config, so exit with False.
36 2
    if not platform:
37
        return False
38
39 2
    product, product_version = parse_name(product)
40
41
    # Define general platforms
42 2
    multi_platforms = ['multi_platform_all',
43
                       'multi_platform_' + product]
44
45
    # First test if platform isn't for 'multi_platform_all' or
46
    # 'multi_platform_' + product
47 2
    for _platform in multi_platforms:
48 2
        if _platform in platform and product in MULTI_PLATFORM_LIST:
49 2
            return True
50
51 2
    product_name = ""
52
    # Get official name for product
53 2
    if product_version is not None:
54 2
        product_name = map_name(product) + ' ' + product_version
55
    else:
56
        product_name = map_name(product)
57
58
    # Test if this is for the concrete product version
59 2
    for _name_part in platform.split(','):
60 2
        if product_name == _name_part.strip():
61 2
            return True
62
63
    # Remediation script isn't neither a multi platform one, nor isn't
64
    # applicable for this product => return False to indicate that
65 2
    return False
66
67
68 2
def get_available_functions(build_dir):
69
    """Parse the content of "$CMAKE_BINARY_DIR/bash-remediation-functions.xml"
70
    XML file to obtain the list of currently known SCAP Security Guide internal
71
    remediation functions"""
72
73
    # If location of /shared directory is known
74
    if build_dir is None or not os.path.isdir(build_dir):
75
        sys.stderr.write("Expected '%s' to be the build directory. It doesn't "
76
                         "exist or is not a directory." % (build_dir))
77
        sys.exit(1)
78
79
    # Construct the final path of XML file with remediation functions
80
    xmlfilepath = \
81
        os.path.join(build_dir, "bash-remediation-functions.xml")
82
83
    if not os.path.isfile(xmlfilepath):
84
        sys.stderr.write("Expected '%s' to contain the remediation functions. "
85
                         "The file was not found!\n" % (xmlfilepath))
86
        sys.exit(1)
87
88
    remediation_functions = []
89
    with codecs.open(xmlfilepath, "r", encoding="utf-8") as xmlfile:
90
        filestring = xmlfile.read()
91
        # This regex looks implementation dependent but we can rely on
92
        # ElementTree sorting XML attrs alphabetically. Hidden is guaranteed
93
        # to be the first attr and ID is guaranteed to be second.
94
        remediation_functions = re.findall(
95
            r'<Value hidden=\"true\" id=\"function_(\S+)\"',
96
            filestring, re.DOTALL
97
        )
98
99
    return remediation_functions
100
101
102 2
def get_fixgroup_for_type(fixcontent, remediation_type):
103
    """
104
    For a given remediation type, return a new subelement of that type.
105
106
    Exits if passed an unknown remediation type.
107
    """
108
    if remediation_type == 'anaconda':
109
        return ElementTree.SubElement(
110
            fixcontent, "fix-group", id="anaconda",
111
            system="urn:redhat:anaconda:pre",
112
            xmlns="http://checklists.nist.gov/xccdf/1.1")
113
114
    elif remediation_type == 'ansible':
115
        return ElementTree.SubElement(
116
            fixcontent, "fix-group", id="ansible",
117
            system="urn:xccdf:fix:script:ansible",
118
            xmlns="http://checklists.nist.gov/xccdf/1.1")
119
120
    elif remediation_type == 'bash':
121
        return ElementTree.SubElement(
122
            fixcontent, "fix-group", id="bash",
123
            system="urn:xccdf:fix:script:sh",
124
            xmlns="http://checklists.nist.gov/xccdf/1.1")
125
126
    elif remediation_type == 'puppet':
127
        return ElementTree.SubElement(
128
            fixcontent, "fix-group", id="puppet",
129
            system="urn:xccdf:fix:script:puppet",
130
            xmlns="http://checklists.nist.gov/xccdf/1.1")
131
132
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
133
                     % (remediation_type))
134
    sys.exit(1)
135
136
137 2
def is_supported_filename(remediation_type, filename):
138
    """
139
    Checks if filename has a supported extension for remediation_type.
140
141
    Exits when remediation_type is of an unknown type.
142
    """
143 2
    if remediation_type in REMEDIATION_TO_EXT_MAP:
144 2
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
145
146
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
147
                     % (remediation_type))
148
    sys.exit(1)
149
150
151 2
def get_populate_replacement(remediation_type, text):
152
    """
153
    Return varname, fixtextcontribution
154
    """
155
156
    if remediation_type == 'bash':
157
        # Extract variable name
158
        varname = re.search(r'\npopulate (\S+)\n',
159
                            text, re.DOTALL).group(1)
160
        # Define fix text part to contribute to main fix text
161
        fixtextcontribution = '\n%s="' % varname
162
        return (varname, fixtextcontribution)
163
164
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
165
                     % (remediation_type))
166
    sys.exit(1)
167
168
169 2
def _split_remediation_content_and_metadata(fix_file_lines):
170 2
    remediation_contents = []
171 2
    config = defaultdict(lambda: None)
172
173
    # Assignment automatically escapes shell characters for XML
174 2
    for line in fix_file_lines:
175 2
        if line.startswith(FILE_GENERATED_HASH_COMMENT):
176
            continue
177
178 2
        if line.startswith('#') and line.count('=') == 1:
179 2
            (key, value) = line.strip('#').split('=')
180 2
            if key.strip() in REMEDIATION_CONFIG_KEYS:
181 2
                config[key.strip()] = value.strip()
182 2
                continue
183
184
        # If our parsed line wasn't a config item, add it to the
185
        # returned file contents. This includes when the line
186
        # begins with a '#' and contains an equals sign, but
187
        # the "key" isn't one of the known keys from
188
        # REMEDIATION_CONFIG_KEYS.
189 2
        remediation_contents.append(line)
190
191 2
    remediation = namedtuple('remediation', ['contents', 'config'])
192 2
    return remediation(remediation_contents, config)
193
194
195 2
def parse_from_file_with_jinja(file_path, env_yaml):
196
    """
197
    Parses a remediation from a file. As remediations contain jinja macros,
198
    we need a env_yaml context to process these. In practice, no remediations
199
    use jinja in the configuration, so for extracting only the configuration,
200
    env_yaml can be an abritrary product.yml dictionary.
201
202
    If the logic of configuration parsing changes significantly, please also
203
    update ssg.fixes.parse_platform(...).
204
    """
205
206 2
    fix_file_lines = jinja_process_file(file_path, env_yaml).splitlines()
207 2
    return _split_remediation_content_and_metadata(fix_file_lines)
208
209
210 2
def parse_from_file_without_jinja(file_path):
211
    """
212
    Parses a remediation from a file. Doesn't process the Jinja macros.
213
    This function is useful in build phases in which all the Jinja macros
214
    are already resolved.
215
    """
216
    with open(file_path, "r") as f:
217
        lines = f.read().splitlines()
218
        return _split_remediation_content_and_metadata(lines)
219
220
221 2
def process_fix(fixes, remediation_type, env_yaml, product, file_path, fix_name):
222
    """
223
    Process a fix, adding it to fixes iff the file is of a valid extension
224
    for the remediation type and the fix is valid for the current product.
225
226
    Note that platform is a required field in the contents of the fix.
227
    """
228
229 2
    if not is_supported_filename(remediation_type, file_path):
230
        return
231
232 2
    result = parse_from_file_with_jinja(file_path, env_yaml)
233
234 2
    if not result.config['platform']:
235
        raise RuntimeError(
236
            "The '%s' remediation script does not contain the "
237
            "platform identifier!" % (file_path))
238
239 2
    if is_applicable_for_product(result.config['platform'], product):
240 2
        fixes[fix_name] = result
241
242
243 2
def write_fixes_to_xml(remediation_type, build_dir, output_path, fixes):
244
    """
245
    Builds a fix-content XML tree from the contents of fixes
246
    and writes it to output_path.
247
    """
248
249
    fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh",
250
                                     xmlns="http://checklists.nist.gov/xccdf/1.1")
251
    fixgroup = get_fixgroup_for_type(fixcontent, remediation_type)
252
253
    remediation_functions = get_available_functions(build_dir)
254
255
    for fix_name in fixes:
256
        fix_contents, config = fixes[fix_name]
257
258
        fix_elm = ElementTree.SubElement(fixgroup, "fix")
259
        fix_elm.set("rule", fix_name)
260
261
        for key in REMEDIATION_ELM_KEYS:
262
            if config[key]:
263
                fix_elm.set(key, config[key])
264
265
        fix_elm.text = "\n".join(fix_contents)
266
        fix_elm.text += "\n"
267
268
        # Expand shell variables and remediation functions
269
        # into corresponding XCCDF <sub> elements
270
        expand_xccdf_subs(fix_elm, remediation_type, remediation_functions)
271
272
    tree = ElementTree.ElementTree(fixcontent)
273
    tree.write(output_path)
274
275
276 2
def write_fixes_to_dir(fixes, remediation_type, output_dir):
277
    """
278
    Writes fixes as files to output_dir, each fix as a separate file
279
    """
280
    try:
281
        extension = REMEDIATION_TO_EXT_MAP[remediation_type]
282
    except KeyError:
283
        raise ValueError("Unknown remediation type %s." % remediation_type)
284
285
    if not os.path.exists(output_dir):
286
        os.makedirs(output_dir)
287
    for fix_name, fix in fixes.items():
288
        fix_contents, config = fix
289
        fix_path = os.path.join(output_dir, fix_name + extension)
290
        with open(fix_path, "w") as f:
291
            for k, v in config.items():
292
                f.write("# %s = %s\n" % (k, v))
293
            f.write("\n".join(fix_contents))
294
295
296 2
def expand_xccdf_subs(fix, remediation_type, remediation_functions):
297
    """For those remediation scripts utilizing some of the internal SCAP
298
    Security Guide remediation functions expand the selected shell variables
299
    and remediation functions calls with <xccdf:sub> element
300
301
    This routine translates any instance of the 'populate' function call in
302
    the form of:
303
304
            populate variable_name
305
306
    into
307
308
            variable_name="<sub idref="variable_name"/>"
309
310
    Also transforms any instance of the 'ansible-populate' function call in the
311
    form of:
312
            (ansible-populate variable_name)
313
    into
314
315
            <sub idref="variable_name"/>
316
317
    Also transforms any instance of some other known remediation function (e.g.
318
    'replace_or_append' etc.) from the form of:
319
320
            function_name "arg1" "arg2" ... "argN"
321
322
    into:
323
324
            <sub idref="function_function_name"/>
325
            function_name "arg1" "arg2" ... "argN"
326
    """
327
328
    if remediation_type == "ansible":
329
        fix_text = fix.text
330
331
        if "(ansible-populate " in fix_text:
332
            raise RuntimeError(
333
                "(ansible-populate VAR) has been deprecated. Please use "
334
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
335
                "make an ansible variable out of XCCDF Value as opposed to "
336
                "substituting directly."
337
            )
338
339
        # If you change this string make sure it still matches the pattern
340
        # defined in OpenSCAP. Otherwise you break variable handling in
341
        # 'oscap xccdf generate fix' and the variables won't be customizable!
342
        # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588
343
        #   const char *pattern =
344
        #     "- name: XCCDF Value [^ ]+ # promote to variable\n  set_fact:\n"
345
        #     "    ([^:]+): (.+)\n  tags:\n    - always\n";
346
        # We use !!str typecast to prevent treating values as different types
347
        # eg. yes as a bool or 077 as an octal number
348
        fix_text = re.sub(
349
            r"- \(xccdf-var\s+(\S+)\)",
350
            r"- name: XCCDF Value \1 # promote to variable\n"
351
            r"  set_fact:\n"
352
            r"    \1: !!str (ansible-populate \1)\n"
353
            r"  tags:\n"
354
            r"    - always",
355
            fix_text
356
        )
357
358
        pattern = r'\(ansible-populate\s*(\S+)\)'
359
360
        # we will get list what looks like
361
        # [text, varname, text, varname, ..., text]
362
        parts = re.split(pattern, fix_text)
363
364
        fix.text = parts[0]  # add first "text"
365
        for index in range(1, len(parts), 2):
366
            varname = parts[index]
367
            text_between_vars = parts[index + 1]
368
369
            # we cannot combine elements and text easily
370
            # so text is in ".tail" of element
371
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
372
            xccdfvarsub.tail = text_between_vars
373
        return
374
375
    elif remediation_type == "puppet":
376
        pattern = r'\(puppet-populate\s*(\S+)\)'
377
378
        # we will get list what looks like
379
        # [text, varname, text, varname, ..., text]
380
        parts = re.split(pattern, fix.text)
381
382
        fix.text = parts[0]  # add first "text"
383
        for index in range(1, len(parts), 2):
384
            varname = parts[index]
385
            text_between_vars = parts[index + 1]
386
387
            # we cannot combine elements and text easily
388
            # so text is in ".tail" of element
389
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
390
            xccdfvarsub.tail = text_between_vars
391
        return
392
393
    elif remediation_type == "anaconda":
394
        pattern = r'\(anaconda-populate\s*(\S+)\)'
395
396
        # we will get list what looks like
397
        # [text, varname, text, varname, ..., text]
398
        parts = re.split(pattern, fix.text)
399
400
        fix.text = parts[0]  # add first "text"
401
        for index in range(1, len(parts), 2):
402
            varname = parts[index]
403
            text_between_vars = parts[index + 1]
404
405
            # we cannot combine elements and text easily
406
            # so text is in ".tail" of element
407
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
408
            xccdfvarsub.tail = text_between_vars
409
        return
410
411
    elif remediation_type == "bash":
412
        # This remediation script doesn't utilize internal remediation functions
413
        # Skip it without any further processing
414
        if 'remediation_functions' not in fix.text:
415
            return
416
417
        # This remediation script utilizes some of internal remediation functions
418
        # Expand shell variables and remediation functions calls with <xccdf:sub>
419
        # elements
420
        pattern = r'\n+(\s*(?:' + r'|'.join(remediation_functions) + r')[^\n]*)\n'
421
        patcomp = re.compile(pattern, re.DOTALL)
422
        fixparts = re.split(patcomp, fix.text)
423
        if fixparts[0] is not None:
424
            # Split the portion of fix.text from fix start to first call of
425
            # remediation function, keeping only the third part:
426
            # * tail        to hold part of the fix.text after inclusion,
427
            #               but before first call of remediation function
428
            try:
429
                rfpattern = '(.*remediation_functions)(.*)'
430
                rfpatcomp = re.compile(rfpattern, re.DOTALL)
431
                _, _, tail, _ = re.split(rfpatcomp, fixparts[0], maxsplit=2)
432
            except ValueError:
433
                sys.stderr.write("Processing fix.text for: %s rule\n"
434
                                 % fix.get('rule'))
435
                sys.stderr.write("Unable to extract part of the fix.text "
436
                                 "after inclusion of remediation functions."
437
                                 " Aborting..\n")
438
                sys.exit(1)
439
            # If the 'tail' is not empty, make it new fix.text.
440
            # Otherwise use ''
441
            fix.text = tail if tail is not None else ''
0 ignored issues
show
introduced by
The variable tail does not seem to be defined for all execution paths.
Loading history...
442
            # Drop the first element of 'fixparts' since it has been processed
443
            fixparts.pop(0)
444
            # Perform sanity check on new 'fixparts' list content (to continue
445
            # successfully 'fixparts' has to contain even count of elements)
446
            if len(fixparts) % 2 != 0:
447
                sys.stderr.write("Error performing XCCDF expansion on "
448
                                 "remediation script: %s\n"
449
                                 % fix.get("rule"))
450
                sys.stderr.write("Invalid count of elements. Exiting!\n")
451
                sys.exit(1)
452
            # Process remaining 'fixparts' elements in pairs
453
            # First pair element is remediation function to be XCCDF expanded
454
            # Second pair element (if not empty) is the portion of the original
455
            # fix text to be used in newly added sublement's tail
456
            for idx in range(0, len(fixparts), 2):
457
                # We previously removed enclosing newlines when creating
458
                # fixparts list. Add them back and reuse the above 'pattern'
459
                fixparts[idx] = "\n%s\n" % fixparts[idx]
460
                # Sanity check (verify the first field truly contains call of
461
                # some of the remediation functions)
462
                if re.match(pattern, fixparts[idx], re.DOTALL) is not None:
463
                    # This chunk contains call of 'populate' function
464
                    if "populate" in fixparts[idx]:
465
                        varname, fixtextcontrib = get_populate_replacement(remediation_type,
466
                                                                           fixparts[idx])
467
                        # Define new XCCDF <sub> element for the variable
468
                        xccdfvarsub = ElementTree.Element("sub", idref=varname)
469
470
                        # If this is first sub element,
471
                        # the textcontribution needs to go to fix text
472
                        # otherwise, append to last subelement
473
                        nfixchildren = len(list(fix))
474
                        if nfixchildren == 0:
475
                            fix.text += fixtextcontrib
476
                        else:
477
                            previouselem = fix[nfixchildren-1]
478
                            previouselem.tail += fixtextcontrib
479
480
                        # If second pair element is not empty, append it as
481
                        # tail for the subelement (prefixed with closing '"')
482
                        if fixparts[idx + 1] is not None:
483
                            xccdfvarsub.tail = '"' + '\n' + fixparts[idx + 1]
484
                        # Otherwise append just enclosing '"'
485
                        else:
486
                            xccdfvarsub.tail = '"' + '\n'
487
                        # Append the new subelement to the fix element
488
                        fix.append(xccdfvarsub)
489
                    # This chunk contains call of other remediation function
490
                    else:
491
                        # Extract remediation function name
492
                        funcname = re.search(r'\n\s*(\S+)(| .*)\n',
493
                                             fixparts[idx],
494
                                             re.DOTALL).group(1)
495
                        # Define new XCCDF <sub> element for the function
496
                        xccdffuncsub = ElementTree.Element(
497
                            "sub", idref='function_%s' % funcname)
498
                        # Append original function call into tail of the
499
                        # subelement
500
                        xccdffuncsub.tail = fixparts[idx]
501
                        # If the second element of the pair is not empty,
502
                        # append it to the tail of the subelement too
503
                        if fixparts[idx + 1] is not None:
504
                            xccdffuncsub.tail += fixparts[idx + 1]
505
                        # Append the new subelement to the fix element
506
                        fix.append(xccdffuncsub)
507
                        # Ensure the newly added <xccdf:sub> element for the
508
                        # function will be always inserted at newline
509
                        # If xccdffuncsub is the first <xccdf:sub> element
510
                        # being added as child of <fix> and fix.text doesn't
511
                        # end up with newline character, append the newline
512
                        # to the fix.text
513
                        if list(fix).index(xccdffuncsub) == 0:
514
                            if re.search(r'.*\n$', fix.text) is None:
515
                                fix.text += '\n'
516
                        # If xccdffuncsub isn't the first child (first
517
                        # <xccdf:sub> being added), and tail of previous
518
                        # child doesn't end up with newline, append the newline
519
                        # to the tail of previous child
520
                        else:
521
                            previouselem = fix[list(fix).index(xccdffuncsub) - 1]
522
                            if re.search(r'.*\n$', previouselem.tail) is None:
523
                                previouselem.tail += '\n'
524
525
        # Perform a sanity check if all known remediation function calls have been
526
        # properly XCCDF substituted. Exit with failure if some wasn't
527
528
        # First concat output form of modified fix text (including text appended
529
        # to all children of the fix)
530
        modfix = [fix.text]
531
        for child in fix.getchildren():
532
            if child is not None and child.text is not None:
533
                modfix.append(child.text)
534
        modfixtext = "".join(modfix)
535
        for func in remediation_functions:
536
            # Then efine expected XCCDF sub element form for this function
537
            funcxccdfsub = "<sub idref=\"function_%s\"" % func
538
            # Finally perform the sanity check -- if function was properly XCCDF
539
            # substituted both the original function call and XCCDF <sub> element
540
            # for that function need to be present in the modified text of the fix
541
            # Otherwise something went wrong, thus exit with failure
542
            if func in modfixtext and funcxccdfsub not in modfixtext:
543
                sys.stderr.write("Error performing XCCDF <sub> substitution "
544
                                 "for function %s in %s fix. Exiting...\n"
545
                                 % (func, fix.get("rule")))
546
                sys.exit(1)
547
    else:
548
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
549
        sys.exit(1)
550