Passed
Branch master (53139b)
by Matěj
02:07
created

ssg.build_remediations.is_supported_filename()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2.2559

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 2
dl 0
loc 12
rs 10
c 0
b 0
f 0
ccs 3
cts 5
cp 0.6
crap 2.2559
1 2
from __future__ import absolute_import
2 2
from __future__ import print_function
3
4 2
import sys
5 2
import os
6 2
import os.path
7 2
import re
8 2
import codecs
9 2
from collections import defaultdict, namedtuple
10
11 2
from .jinja import process_file as jinja_process_file
12 2
from .xml import ElementTree
13 2
from .products import parse_name, map_name
14 2
from .constants import MULTI_PLATFORM_LIST
15
16 2
REMEDIATION_TO_EXT_MAP = {
17
    'anaconda': '.anaconda',
18
    'ansible': '.yml',
19
    'bash': '.sh',
20
    'puppet': '.pp'
21
}
22
23 2
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
24
25 2
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
26
                           'strategy']
27 2
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
28
29
30 2
def is_applicable_for_product(platform, product):
31
    """Based on the platform dict specifier of the remediation script to
32
    determine if this remediation script is applicable for this product.
33
    Return 'True' if so, 'False' otherwise"""
34
35
    # If the platform is None, platform must not exist in the config, so exit with False.
36 2
    if not platform:
37
        return False
38
39 2
    product, product_version = parse_name(product)
40
41
    # Define general platforms
42 2
    multi_platforms = ['multi_platform_all',
43
                       'multi_platform_' + product]
44
45
    # First test if platform isn't for 'multi_platform_all' or
46
    # 'multi_platform_' + product
47 2
    for _platform in multi_platforms:
48 2
        if _platform in platform and product in MULTI_PLATFORM_LIST:
49 2
            return True
50
51 2
    product_name = ""
52
    # Get official name for product
53 2
    if product_version is not None:
54 2
        product_name = map_name(product) + ' ' + product_version
55
    else:
56
        product_name = map_name(product)
57
58
    # Test if this is for the concrete product version
59 2
    for _name_part in platform.split(','):
60 2
        if product_name == _name_part.strip():
61 2
            return True
62
63
    # Remediation script isn't neither a multi platform one, nor isn't
64
    # applicable for this product => return False to indicate that
65 2
    return False
66
67
68 2
def get_available_functions(build_dir):
69
    """Parse the content of "$CMAKE_BINARY_DIR/bash-remediation-functions.xml"
70
    XML file to obtain the list of currently known SCAP Security Guide internal
71
    remediation functions"""
72
73
    # If location of /shared directory is known
74
    if build_dir is None or not os.path.isdir(build_dir):
75
        sys.stderr.write("Expected '%s' to be the build directory. It doesn't "
76
                         "exist or is not a directory." % (build_dir))
77
        sys.exit(1)
78
79
    # Construct the final path of XML file with remediation functions
80
    xmlfilepath = \
81
        os.path.join(build_dir, "bash-remediation-functions.xml")
82
83
    if not os.path.isfile(xmlfilepath):
84
        sys.stderr.write("Expected '%s' to contain the remediation functions. "
85
                         "The file was not found!\n" % (xmlfilepath))
86
        sys.exit(1)
87
88
    remediation_functions = []
89
    with codecs.open(xmlfilepath, "r", encoding="utf-8") as xmlfile:
90
        filestring = xmlfile.read()
91
        # This regex looks implementation dependent but we can rely on
92
        # ElementTree sorting XML attrs alphabetically. Hidden is guaranteed
93
        # to be the first attr and ID is guaranteed to be second.
94
        remediation_functions = re.findall(
95
            r'<Value hidden=\"true\" id=\"function_(\S+)\"',
96
            filestring, re.DOTALL
97
        )
98
99
    return remediation_functions
100
101
102 2
def get_fixgroup_for_type(fixcontent, remediation_type):
103
    """
104
    For a given remediation type, return a new subelement of that type.
105
106
    Exits if passed an unknown remediation type.
107
    """
108
    if remediation_type == 'anaconda':
109
        return ElementTree.SubElement(
110
            fixcontent, "fix-group", id="anaconda",
111
            system="urn:redhat:anaconda:pre",
112
            xmlns="http://checklists.nist.gov/xccdf/1.1")
113
114
    elif remediation_type == 'ansible':
115
        return ElementTree.SubElement(
116
            fixcontent, "fix-group", id="ansible",
117
            system="urn:xccdf:fix:script:ansible",
118
            xmlns="http://checklists.nist.gov/xccdf/1.1")
119
120
    elif remediation_type == 'bash':
121
        return ElementTree.SubElement(
122
            fixcontent, "fix-group", id="bash",
123
            system="urn:xccdf:fix:script:sh",
124
            xmlns="http://checklists.nist.gov/xccdf/1.1")
125
126
    elif remediation_type == 'puppet':
127
        return ElementTree.SubElement(
128
            fixcontent, "fix-group", id="puppet",
129
            system="urn:xccdf:fix:script:puppet",
130
            xmlns="http://checklists.nist.gov/xccdf/1.1")
131
132
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
133
                     % (remediation_type))
134
    sys.exit(1)
135
136
137 2
def is_supported_filename(remediation_type, filename):
138
    """
139
    Checks if filename has a supported extension for remediation_type.
140
141
    Exits when remediation_type is of an unknown type.
142
    """
143 2
    if remediation_type in REMEDIATION_TO_EXT_MAP:
144 2
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
145
146
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
147
                     % (remediation_type))
148
    sys.exit(1)
149
150
151 2
def get_populate_replacement(remediation_type, text):
152
    """
153
    Return varname, fixtextcontribution
154
    """
155
156
    if remediation_type == 'bash':
157
        # Extract variable name
158
        varname = re.search(r'\npopulate (\S+)\n',
159
                            text, re.DOTALL).group(1)
160
        # Define fix text part to contribute to main fix text
161
        fixtextcontribution = '\n%s="' % varname
162
        return (varname, fixtextcontribution)
163
164
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
165
                     % (remediation_type))
166
    sys.exit(1)
167
168
169 2
def parse_from_file(file_path, env_yaml):
170
    """
171
    Parses a remediation from a file. As remediations contain jinja macros,
172
    we need a env_yaml context to process these. In practice, no remediations
173
    use jinja in the configuration, so for extracting only the configuration,
174
    env_yaml can be an abritrary product.yml dictionary.
175
176
    If the logic of configuration parsing changes significantly, please also
177
    update ssg.fixes.parse_platform(...).
178
    """
179
180 2
    mod_file = []
181 2
    config = defaultdict(lambda: None)
182
183 2
    fix_file_lines = jinja_process_file(file_path, env_yaml).splitlines()
184
185
    # Assignment automatically escapes shell characters for XML
186 2
    for line in fix_file_lines:
187 2
        if line.startswith(FILE_GENERATED_HASH_COMMENT):
188
            continue
189
190 2
        if line.startswith('#') and line.count('=') == 1:
191 2
            (key, value) = line.strip('#').split('=')
192 2
            if key.strip() in REMEDIATION_CONFIG_KEYS:
193 2
                config[key.strip()] = value.strip()
194 2
                continue
195
196
        # If our parsed line wasn't a config item, add it to the
197
        # returned file contents. This includes when the line
198
        # begins with a '#' and contains an equals sign, but
199
        # the "key" isn't one of the known keys from
200
        # REMEDIATION_CONFIG_KEYS.
201 2
        mod_file.append(line)
202
203 2
    remediation = namedtuple('remediation', ['contents', 'config'])
204 2
    return remediation(mod_file, config)
205
206
207 2
def process_fix(fixes, remediation_type, env_yaml, product, file_path, fix_name):
208
    """
209
    Process a fix, adding it to fixes iff the file is of a valid extension
210
    for the remediation type and the fix is valid for the current product.
211
212
    Note that platform is a required field in the contents of the fix.
213
    """
214
215 2
    if not is_supported_filename(remediation_type, file_path):
216
        return
217
218 2
    result = parse_from_file(file_path, env_yaml)
219
220 2
    if not result.config['platform']:
221
        raise RuntimeError(
222
            "The '%s' remediation script does not contain the "
223
            "platform identifier!" % (file_path))
224
225 2
    if is_applicable_for_product(result.config['platform'], product):
226 2
        fixes[fix_name] = result
227
228
229 2
def write_fixes(remediation_type, build_dir, output_path, fixes):
230
    """
231
    Builds a fix-content XML tree from the contents of fixes
232
    and writes it to output_path.
233
    """
234
235
    fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh",
236
                                     xmlns="http://checklists.nist.gov/xccdf/1.1")
237
    fixgroup = get_fixgroup_for_type(fixcontent, remediation_type)
238
239
    remediation_functions = get_available_functions(build_dir)
240
241
    for fix_name in fixes:
242
        fix_contents, config = fixes[fix_name]
243
244
        fix_elm = ElementTree.SubElement(fixgroup, "fix")
245
        fix_elm.set("rule", fix_name)
246
247
        for key in REMEDIATION_ELM_KEYS:
248
            if config[key]:
249
                fix_elm.set(key, config[key])
250
251
        fix_elm.text = "\n".join(fix_contents)
252
        fix_elm.text += "\n"
253
254
        # Expand shell variables and remediation functions
255
        # into corresponding XCCDF <sub> elements
256
        expand_xccdf_subs(fix_elm, remediation_type, remediation_functions)
257
258
    tree = ElementTree.ElementTree(fixcontent)
259
    tree.write(output_path)
260
261
262 2
def expand_xccdf_subs(fix, remediation_type, remediation_functions):
263
    """For those remediation scripts utilizing some of the internal SCAP
264
    Security Guide remediation functions expand the selected shell variables
265
    and remediation functions calls with <xccdf:sub> element
266
267
    This routine translates any instance of the 'populate' function call in
268
    the form of:
269
270
            populate variable_name
271
272
    into
273
274
            variable_name="<sub idref="variable_name"/>"
275
276
    Also transforms any instance of the 'ansible-populate' function call in the
277
    form of:
278
            (ansible-populate variable_name)
279
    into
280
281
            <sub idref="variable_name"/>
282
283
    Also transforms any instance of some other known remediation function (e.g.
284
    'replace_or_append' etc.) from the form of:
285
286
            function_name "arg1" "arg2" ... "argN"
287
288
    into:
289
290
            <sub idref="function_function_name"/>
291
            function_name "arg1" "arg2" ... "argN"
292
    """
293
294
    if remediation_type == "ansible":
295
        fix_text = fix.text
296
297
        if "(ansible-populate " in fix_text:
298
            raise RuntimeError(
299
                "(ansible-populate VAR) has been deprecated. Please use "
300
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
301
                "make an ansible variable out of XCCDF Value as opposed to "
302
                "substituting directly."
303
            )
304
305
        # If you change this string make sure it still matches the pattern
306
        # defined in OpenSCAP. Otherwise you break variable handling in
307
        # 'oscap xccdf generate fix' and the variables won't be customizable!
308
        # https://github.com/OpenSCAP/openscap/blob/1.2.17/src/XCCDF_POLICY/xccdf_policy_remediate.c#L588
309
        #   const char *pattern =
310
        #     "- name: XCCDF Value [^ ]+ # promote to variable\n  set_fact:\n"
311
        #     "    ([^:]+): (.+)\n  tags:\n    - always\n";
312
        # We use !!str typecast to prevent treating values as different types
313
        # eg. yes as a bool or 077 as an octal number
314
        fix_text = re.sub(
315
            r"- \(xccdf-var\s+(\S+)\)",
316
            r"- name: XCCDF Value \1 # promote to variable\n"
317
            r"  set_fact:\n"
318
            r"    \1: !!str (ansible-populate \1)\n"
319
            r"  tags:\n"
320
            r"    - always",
321
            fix_text
322
        )
323
324
        pattern = r'\(ansible-populate\s*(\S+)\)'
325
326
        # we will get list what looks like
327
        # [text, varname, text, varname, ..., text]
328
        parts = re.split(pattern, fix_text)
329
330
        fix.text = parts[0]  # add first "text"
331
        for index in range(1, len(parts), 2):
332
            varname = parts[index]
333
            text_between_vars = parts[index + 1]
334
335
            # we cannot combine elements and text easily
336
            # so text is in ".tail" of element
337
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
338
            xccdfvarsub.tail = text_between_vars
339
        return
340
341
    elif remediation_type == "puppet":
342
        pattern = r'\(puppet-populate\s*(\S+)\)'
343
344
        # we will get list what looks like
345
        # [text, varname, text, varname, ..., text]
346
        parts = re.split(pattern, fix.text)
347
348
        fix.text = parts[0]  # add first "text"
349
        for index in range(1, len(parts), 2):
350
            varname = parts[index]
351
            text_between_vars = parts[index + 1]
352
353
            # we cannot combine elements and text easily
354
            # so text is in ".tail" of element
355
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
356
            xccdfvarsub.tail = text_between_vars
357
        return
358
359
    elif remediation_type == "anaconda":
360
        pattern = r'\(anaconda-populate\s*(\S+)\)'
361
362
        # we will get list what looks like
363
        # [text, varname, text, varname, ..., text]
364
        parts = re.split(pattern, fix.text)
365
366
        fix.text = parts[0]  # add first "text"
367
        for index in range(1, len(parts), 2):
368
            varname = parts[index]
369
            text_between_vars = parts[index + 1]
370
371
            # we cannot combine elements and text easily
372
            # so text is in ".tail" of element
373
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
374
            xccdfvarsub.tail = text_between_vars
375
        return
376
377
    elif remediation_type == "bash":
378
        # This remediation script doesn't utilize internal remediation functions
379
        # Skip it without any further processing
380
        if 'remediation_functions' not in fix.text:
381
            return
382
383
        # This remediation script utilizes some of internal remediation functions
384
        # Expand shell variables and remediation functions calls with <xccdf:sub>
385
        # elements
386
        pattern = r'\n+(\s*(?:' + r'|'.join(remediation_functions) + r')[^\n]*)\n'
387
        patcomp = re.compile(pattern, re.DOTALL)
388
        fixparts = re.split(patcomp, fix.text)
389
        if fixparts[0] is not None:
390
            # Split the portion of fix.text from fix start to first call of
391
            # remediation function, keeping only the third part:
392
            # * tail        to hold part of the fix.text after inclusion,
393
            #               but before first call of remediation function
394
            try:
395
                rfpattern = '(.*remediation_functions)(.*)'
396
                rfpatcomp = re.compile(rfpattern, re.DOTALL)
397
                _, _, tail, _ = re.split(rfpatcomp, fixparts[0], maxsplit=2)
398
            except ValueError:
399
                sys.stderr.write("Processing fix.text for: %s rule\n"
400
                                 % fix.get('rule'))
401
                sys.stderr.write("Unable to extract part of the fix.text "
402
                                 "after inclusion of remediation functions."
403
                                 " Aborting..\n")
404
                sys.exit(1)
405
            # If the 'tail' is not empty, make it new fix.text.
406
            # Otherwise use ''
407
            fix.text = tail if tail is not None else ''
0 ignored issues
show
introduced by
The variable tail does not seem to be defined for all execution paths.
Loading history...
408
            # Drop the first element of 'fixparts' since it has been processed
409
            fixparts.pop(0)
410
            # Perform sanity check on new 'fixparts' list content (to continue
411
            # successfully 'fixparts' has to contain even count of elements)
412
            if len(fixparts) % 2 != 0:
413
                sys.stderr.write("Error performing XCCDF expansion on "
414
                                 "remediation script: %s\n"
415
                                 % fix.get("rule"))
416
                sys.stderr.write("Invalid count of elements. Exiting!\n")
417
                sys.exit(1)
418
            # Process remaining 'fixparts' elements in pairs
419
            # First pair element is remediation function to be XCCDF expanded
420
            # Second pair element (if not empty) is the portion of the original
421
            # fix text to be used in newly added sublement's tail
422
            for idx in range(0, len(fixparts), 2):
423
                # We previously removed enclosing newlines when creating
424
                # fixparts list. Add them back and reuse the above 'pattern'
425
                fixparts[idx] = "\n%s\n" % fixparts[idx]
426
                # Sanity check (verify the first field truly contains call of
427
                # some of the remediation functions)
428
                if re.match(pattern, fixparts[idx], re.DOTALL) is not None:
429
                    # This chunk contains call of 'populate' function
430
                    if "populate" in fixparts[idx]:
431
                        varname, fixtextcontrib = get_populate_replacement(remediation_type,
432
                                                                           fixparts[idx])
433
                        # Define new XCCDF <sub> element for the variable
434
                        xccdfvarsub = ElementTree.Element("sub", idref=varname)
435
436
                        # If this is first sub element,
437
                        # the textcontribution needs to go to fix text
438
                        # otherwise, append to last subelement
439
                        nfixchildren = len(list(fix))
440
                        if nfixchildren == 0:
441
                            fix.text += fixtextcontrib
442
                        else:
443
                            previouselem = fix[nfixchildren-1]
444
                            previouselem.tail += fixtextcontrib
445
446
                        # If second pair element is not empty, append it as
447
                        # tail for the subelement (prefixed with closing '"')
448
                        if fixparts[idx + 1] is not None:
449
                            xccdfvarsub.tail = '"' + '\n' + fixparts[idx + 1]
450
                        # Otherwise append just enclosing '"'
451
                        else:
452
                            xccdfvarsub.tail = '"' + '\n'
453
                        # Append the new subelement to the fix element
454
                        fix.append(xccdfvarsub)
455
                    # This chunk contains call of other remediation function
456
                    else:
457
                        # Extract remediation function name
458
                        funcname = re.search(r'\n\s*(\S+)(| .*)\n',
459
                                             fixparts[idx],
460
                                             re.DOTALL).group(1)
461
                        # Define new XCCDF <sub> element for the function
462
                        xccdffuncsub = ElementTree.Element(
463
                            "sub", idref='function_%s' % funcname)
464
                        # Append original function call into tail of the
465
                        # subelement
466
                        xccdffuncsub.tail = fixparts[idx]
467
                        # If the second element of the pair is not empty,
468
                        # append it to the tail of the subelement too
469
                        if fixparts[idx + 1] is not None:
470
                            xccdffuncsub.tail += fixparts[idx + 1]
471
                        # Append the new subelement to the fix element
472
                        fix.append(xccdffuncsub)
473
                        # Ensure the newly added <xccdf:sub> element for the
474
                        # function will be always inserted at newline
475
                        # If xccdffuncsub is the first <xccdf:sub> element
476
                        # being added as child of <fix> and fix.text doesn't
477
                        # end up with newline character, append the newline
478
                        # to the fix.text
479
                        if list(fix).index(xccdffuncsub) == 0:
480
                            if re.search(r'.*\n$', fix.text) is None:
481
                                fix.text += '\n'
482
                        # If xccdffuncsub isn't the first child (first
483
                        # <xccdf:sub> being added), and tail of previous
484
                        # child doesn't end up with newline, append the newline
485
                        # to the tail of previous child
486
                        else:
487
                            previouselem = fix[list(fix).index(xccdffuncsub) - 1]
488
                            if re.search(r'.*\n$', previouselem.tail) is None:
489
                                previouselem.tail += '\n'
490
491
        # Perform a sanity check if all known remediation function calls have been
492
        # properly XCCDF substituted. Exit with failure if some wasn't
493
494
        # First concat output form of modified fix text (including text appended
495
        # to all children of the fix)
496
        modfix = [fix.text]
497
        for child in fix.getchildren():
498
            if child is not None and child.text is not None:
499
                modfix.append(child.text)
500
        modfixtext = "".join(modfix)
501
        for func in remediation_functions:
502
            # Then efine expected XCCDF sub element form for this function
503
            funcxccdfsub = "<sub idref=\"function_%s\"" % func
504
            # Finally perform the sanity check -- if function was properly XCCDF
505
            # substituted both the original function call and XCCDF <sub> element
506
            # for that function need to be present in the modified text of the fix
507
            # Otherwise something went wrong, thus exit with failure
508
            if func in modfixtext and funcxccdfsub not in modfixtext:
509
                sys.stderr.write("Error performing XCCDF <sub> substitution "
510
                                 "for function %s in %s fix. Exiting...\n"
511
                                 % (func, fix.get("rule")))
512
                sys.exit(1)
513
    else:
514
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
515
        sys.exit(1)
516