Passed
Pull Request — master (#3188)
by Alexander
02:25
created

ssg.build_remediations.is_supported_filename()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2.2559

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 2
dl 0
loc 12
rs 10
c 0
b 0
f 0
ccs 3
cts 5
cp 0.6
crap 2.2559
1 1
from __future__ import absolute_import
0 ignored issues
show
Coding Style introduced by
This module should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
2 1
from __future__ import print_function
3
4 1
import sys
5 1
import os
6 1
import os.path
7 1
import re
8 1
import codecs
9 1
from collections import defaultdict, namedtuple
10
11 1
from .jinja import process_file as jinja_process_file
12 1
from .xml import ElementTree
13 1
from .products import parse_name, multi_list, map_name
14
15 1
REMEDIATION_TO_EXT_MAP = {
16
    'anaconda': '.anaconda',
17
    'ansible': '.yml',
18
    'bash': '.sh',
19
    'puppet': '.pp'
20
}
21
22 1
FILE_GENERATED_HASH_COMMENT = '# THIS FILE IS GENERATED'
23
24 1
REMEDIATION_CONFIG_KEYS = ['complexity', 'disruption', 'platform', 'reboot',
25
                           'strategy']
26 1
REMEDIATION_ELM_KEYS = ['complexity', 'disruption', 'reboot', 'strategy']
27
28
29 1
def is_applicable_for_product(platform, product):
30
    """Based on the platform dict specifier of the remediation script to
31
    determine if this remediation script is applicable for this product.
32
    Return 'True' if so, 'False' otherwise"""
33
34
    # If the platform is None, platform must not exist in the config, so exit with False.
35 1
    if not platform:
36
        return False
37
38 1
    product, product_version = parse_name(product)
39
40
    # Define general platforms
41 1
    multi_platforms = ['multi_platform_all',
42
                       'multi_platform_' + product]
43
44
    # First test if platform isn't for 'multi_platform_all' or
45
    # 'multi_platform_' + product
46 1
    for _platform in multi_platforms:
47 1
        if _platform in platform and product in multi_list:
48 1
            return True
49
50 1
    product_name = ""
51
    # Get official name for product
52 1
    if product_version is not None:
53 1
        product_name = map_name(product) + ' ' + product_version
54
    else:
55
        product_name = map_name(product)
56
57
    # Test if this is for the concrete product version
58 1
    for _name_part in platform.split(','):
59 1
        if product_name == _name_part.strip():
60 1
            return True
61
62
    # Remediation script isn't neither a multi platform one, nor isn't
63
    # applicable for this product => return False to indicate that
64 1
    return False
65
66
67 1
def get_available_functions(build_dir):
68
    """Parse the content of "$CMAKE_BINARY_DIR/bash-remediation-functions.xml"
69
    XML file to obtain the list of currently known SCAP Security Guide internal
70
    remediation functions"""
71
72
    # If location of /shared directory is known
73
    if build_dir is None or not os.path.isdir(build_dir):
74
        sys.stderr.write("Expected '%s' to be the build directory. It doesn't "
75
                         "exist or is not a directory." % (build_dir))
76
        sys.exit(1)
77
78
    # Construct the final path of XML file with remediation functions
79
    xmlfilepath = \
80
        os.path.join(build_dir, "bash-remediation-functions.xml")
81
82
    if not os.path.isfile(xmlfilepath):
83
        sys.stderr.write("Expected '%s' to contain the remediation functions. "
84
                         "The file was not found!\n" % (xmlfilepath))
85
        sys.exit(1)
86
87
    remediation_functions = []
88
    with codecs.open(xmlfilepath, "r", encoding="utf-8") as xmlfile:
89
        filestring = xmlfile.read()
90
        # This regex looks implementation dependent but we can rely on
91
        # ElementTree sorting XML attrs alphabetically. Hidden is guaranteed
92
        # to be the first attr and ID is guaranteed to be second.
93
        remediation_functions = re.findall(
94
            r'<Value hidden=\"true\" id=\"function_(\S+)\"',
95
            filestring, re.DOTALL
96
        )
97
98
    return remediation_functions
99
100
101 1
def get_fixgroup_for_type(fixcontent, remediation_type):
0 ignored issues
show
Coding Style introduced by
This function should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
102
    if remediation_type == 'anaconda':
103
        return ElementTree.SubElement(
104
            fixcontent, "fix-group", id="anaconda",
105
            system="urn:redhat:anaconda:pre",
106
            xmlns="http://checklists.nist.gov/xccdf/1.1")
107
108
    elif remediation_type == 'ansible':
109
        return ElementTree.SubElement(
110
            fixcontent, "fix-group", id="ansible",
111
            system="urn:xccdf:fix:script:ansible",
112
            xmlns="http://checklists.nist.gov/xccdf/1.1")
113
114
    elif remediation_type == 'bash':
115
        return ElementTree.SubElement(
116
            fixcontent, "fix-group", id="bash",
117
            system="urn:xccdf:fix:script:sh",
118
            xmlns="http://checklists.nist.gov/xccdf/1.1")
119
120
    elif remediation_type == 'puppet':
121
        return ElementTree.SubElement(
122
            fixcontent, "fix-group", id="puppet",
123
            system="urn:xccdf:fix:script:puppet",
124
            xmlns="http://checklists.nist.gov/xccdf/1.1")
125
126
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
127
                     % (remediation_type))
128
    sys.exit(1)
129
130
131 1
def is_supported_filename(remediation_type, filename):
132
    """
133
    Checks if filename has a supported extension for remediation_type.
134
135
    Exits when remediation_type is of an unknown type.
136
    """
137 1
    if remediation_type in REMEDIATION_TO_EXT_MAP:
138 1
        return filename.endswith(REMEDIATION_TO_EXT_MAP[remediation_type])
139
140
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
141
                     % (remediation_type))
142
    sys.exit(1)
143
144
145 1
def get_populate_replacement(remediation_type, text):
146
    """
147
    Return varname, fixtextcontribution
148
    """
149
150
    if remediation_type == 'bash':
151
        # Extract variable name
152
        varname = re.search(r'\npopulate (\S+)\n',
153
                            text, re.DOTALL).group(1)
154
        # Define fix text part to contribute to main fix text
155
        fixtextcontribution = '\n%s="' % varname
156
        return (varname, fixtextcontribution)
157
158
    sys.stderr.write("ERROR: Unknown remediation type '%s'!\n"
159
                     % (remediation_type))
160
    sys.exit(1)
161
162
163 1
def parse_from_file(file_path, env_yaml):
164
    """
165
    Parses a remediation from a file. As remediations contain jinja macros,
166
    we need a env_yaml context to process these. In practice, no remediations
167
    use jinja in the configuration, so for extracting only the configuration,
168
    env_yaml can be an abritrary product.yml dictionary.
169
    """
170
171 1
    mod_file = []
172 1
    config = defaultdict(lambda: None)
173
174 1
    fix_file_lines = jinja_process_file(file_path, env_yaml).splitlines()
175
176
    # Assignment automatically escapes shell characters for XML
177 1
    for line in fix_file_lines:
178 1
        if line.startswith('#'):
179 1
            try:
180 1
                (key, value) = line.strip('#').split('=')
181 1
                if key.strip() in REMEDIATION_CONFIG_KEYS:
182 1
                    config[key.strip()] = value.strip()
183
                else:
184
                    if not line.startswith(FILE_GENERATED_HASH_COMMENT):
185
                        mod_file.append(line)
186 1
            except ValueError:
187 1
                if not line.startswith(FILE_GENERATED_HASH_COMMENT):
188 1
                    mod_file.append(line)
189
        else:
190 1
            mod_file.append(line)
191
192 1
    remediation = namedtuple('remediation', ['contents', 'config'])
193 1
    return remediation(mod_file, config)
194
195
196 1
def process_fix(fixes, remediation_type, env_yaml, product, file_path, fix_name):
0 ignored issues
show
best-practice introduced by
Too many arguments (6/5)
Loading history...
197
    """
198
    Process a fix, adding it to fixes iff the file is of a valid extension
199
    for the remediation type and the fix is valid for the current product.
200
201
    Note that platform is a required field in the contents of the fix.
202
    """
203
204 1
    if not is_supported_filename(remediation_type, file_path):
205
        return
206
207 1
    result = parse_from_file(file_path, env_yaml)
208
209 1
    if not result.config['platform']:
210
        raise RuntimeError(
211
            "The '%s' remediation script does not contain the "
212
            "platform identifier!" % (file_path))
213
214 1
    if is_applicable_for_product(result.config['platform'], product):
215 1
        fixes[fix_name] = result
216
217
218 1
def write_fixes(remediation_type, build_dir, output_path, fixes):
219
    """
220
    Builds a fix-content XML tree from the contents of fixes
221
    and writes it to output_path.
222
    """
223
224
    fixcontent = ElementTree.Element("fix-content", system="urn:xccdf:fix:script:sh",
225
                                     xmlns="http://checklists.nist.gov/xccdf/1.1")
226
    fixgroup = get_fixgroup_for_type(fixcontent, remediation_type)
227
228
    remediation_functions = get_available_functions(build_dir)
229
230
    for fix_name in fixes:
231
        fix_contents, config = fixes[fix_name]
232
233
        fix_elm = ElementTree.SubElement(fixgroup, "fix")
234
        fix_elm.set("rule", fix_name)
235
236
        for key in REMEDIATION_ELM_KEYS:
237
            if config[key]:
238
                fix_elm.set(key, config[key])
239
240
        fix_elm.text = "\n".join(fix_contents)
241
        fix_elm.text += "\n"
242
243
        # Expand shell variables and remediation functions
244
        # into corresponding XCCDF <sub> elements
245
        expand_xccdf_subs(fix_elm, remediation_type, remediation_functions)
246
247
    tree = ElementTree.ElementTree(fixcontent)
248
    tree.write(output_path)
249
250
251 1
def expand_xccdf_subs(fix, remediation_type, remediation_functions):
0 ignored issues
show
Comprehensibility introduced by
This function exceeds the maximum number of variables (27/15).
Loading history...
252
    """For those remediation scripts utilizing some of the internal SCAP
253
    Security Guide remediation functions expand the selected shell variables
254
    and remediation functions calls with <xccdf:sub> element
255
256
    This routine translates any instance of the 'populate' function call in
257
    the form of:
258
259
            populate variable_name
260
261
    into
262
263
            variable_name="<sub idref="variable_name"/>"
264
265
    Also transforms any instance of the 'ansible-populate' function call in the
266
    form of:
267
            (ansible-populate variable_name)
268
    into
269
270
            <sub idref="variable_name"/>
271
272
    Also transforms any instance of some other known remediation function (e.g.
273
    'replace_or_append' etc.) from the form of:
274
275
            function_name "arg1" "arg2" ... "argN"
276
277
    into:
278
279
            <sub idref="function_function_name"/>
280
            function_name "arg1" "arg2" ... "argN"
281
    """
282
283
    if remediation_type == "ansible":
284
        fix_text = fix.text
285
286
        if "(ansible-populate " in fix_text:
287
            raise RuntimeError(
288
                "(ansible-populate VAR) has been deprecated. Please use "
289
                "(xccdf-var VAR) instead. Keep in mind that the latter will "
290
                "make an ansible variable out of XCCDF Value as opposed to "
291
                "substituting directly."
292
            )
293
294
        # we use the horrid "!!str |-" syntax to force strings without using
295
        # quotes. quotes enable yaml escaping rules so we'd have to escape all
296
        # the backslashes and at this point we don't know if there are any.
297
        fix_text = re.sub(
298
            r"- \(xccdf-var\s+(\S+)\)",
299
            r"- name: XCCDF Value \1 # promote to variable\n"
300
            r"  set_fact:\n"
301
            r"    \1: !!str |-\n"
302
            r"        (ansible-populate \1)\n"
303
            r"  tags:\n"
304
            r"    - always",
305
            fix_text
306
        )
307
308
        pattern = r'\(ansible-populate\s*(\S+)\)'
309
310
        # we will get list what looks like
311
        # [text, varname, text, varname, ..., text]
312
        parts = re.split(pattern, fix_text)
313
314
        fix.text = parts[0]  # add first "text"
315
        for index in range(1, len(parts), 2):
316
            varname = parts[index]
317
            text_between_vars = parts[index + 1]
318
319
            # we cannot combine elements and text easily
320
            # so text is in ".tail" of element
321
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
322
            xccdfvarsub.tail = text_between_vars
323
        return
324
325
    elif remediation_type == "puppet":
326
        pattern = r'\(puppet-populate\s*(\S+)\)'
327
328
        # we will get list what looks like
329
        # [text, varname, text, varname, ..., text]
330
        parts = re.split(pattern, fix.text)
331
332
        fix.text = parts[0]  # add first "text"
333
        for index in range(1, len(parts), 2):
334
            varname = parts[index]
335
            text_between_vars = parts[index + 1]
336
337
            # we cannot combine elements and text easily
338
            # so text is in ".tail" of element
339
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
340
            xccdfvarsub.tail = text_between_vars
341
        return
342
343
    elif remediation_type == "anaconda":
344
        pattern = r'\(anaconda-populate\s*(\S+)\)'
345
346
        # we will get list what looks like
347
        # [text, varname, text, varname, ..., text]
348
        parts = re.split(pattern, fix.text)
349
350
        fix.text = parts[0]  # add first "text"
351
        for index in range(1, len(parts), 2):
352
            varname = parts[index]
353
            text_between_vars = parts[index + 1]
354
355
            # we cannot combine elements and text easily
356
            # so text is in ".tail" of element
357
            xccdfvarsub = ElementTree.SubElement(fix, "sub", idref=varname)
358
            xccdfvarsub.tail = text_between_vars
359
        return
360
361
    elif remediation_type == "bash":
0 ignored issues
show
unused-code introduced by
Too many nested blocks (7/5)
Loading history...
362
        # This remediation script doesn't utilize internal remediation functions
363
        # Skip it without any further processing
364
        if 'remediation_functions' not in fix.text:
365
            return
366
367
        # This remediation script utilizes some of internal remediation functions
368
        # Expand shell variables and remediation functions calls with <xccdf:sub>
369
        # elements
370
        pattern = r'\n+(\s*(?:' + r'|'.join(remediation_functions) + r')[^\n]*)\n'
371
        patcomp = re.compile(pattern, re.DOTALL)
372
        fixparts = re.split(patcomp, fix.text)
373
        if fixparts[0] is not None:
374
            # Split the portion of fix.text from fix start to first call of
375
            # remediation function, keeping only the third part:
376
            # * tail        to hold part of the fix.text after inclusion,
377
            #               but before first call of remediation function
378
            try:
379
                rfpattern = '(.*remediation_functions)(.*)'
380
                rfpatcomp = re.compile(rfpattern, re.DOTALL)
381
                _, _, tail, _ = re.split(rfpatcomp, fixparts[0], maxsplit=2)
382
            except ValueError:
383
                sys.stderr.write("Processing fix.text for: %s rule\n"
384
                                 % fix.get('rule'))
385
                sys.stderr.write("Unable to extract part of the fix.text "
386
                                 "after inclusion of remediation functions."
387
                                 " Aborting..\n")
388
                sys.exit(1)
389
            # If the 'tail' is not empty, make it new fix.text.
390
            # Otherwise use ''
391
            fix.text = tail if tail is not None else ''
0 ignored issues
show
introduced by
The variable tail does not seem to be defined for all execution paths.
Loading history...
392
            # Drop the first element of 'fixparts' since it has been processed
393
            fixparts.pop(0)
394
            # Perform sanity check on new 'fixparts' list content (to continue
395
            # successfully 'fixparts' has to contain even count of elements)
396
            if len(fixparts) % 2 != 0:
397
                sys.stderr.write("Error performing XCCDF expansion on "
398
                                 "remediation script: %s\n"
399
                                 % fix.get("rule"))
400
                sys.stderr.write("Invalid count of elements. Exiting!\n")
401
                sys.exit(1)
402
            # Process remaining 'fixparts' elements in pairs
403
            # First pair element is remediation function to be XCCDF expanded
404
            # Second pair element (if not empty) is the portion of the original
405
            # fix text to be used in newly added sublement's tail
406
            for idx in range(0, len(fixparts), 2):
407
                # We previously removed enclosing newlines when creating
408
                # fixparts list. Add them back and reuse the above 'pattern'
409
                fixparts[idx] = "\n%s\n" % fixparts[idx]
410
                # Sanity check (verify the first field truly contains call of
411
                # some of the remediation functions)
412
                if re.match(pattern, fixparts[idx], re.DOTALL) is not None:
413
                    # This chunk contains call of 'populate' function
414
                    if "populate" in fixparts[idx]:
415
                        varname, fixtextcontrib = get_populate_replacement(remediation_type,
416
                                                                           fixparts[idx])
417
                        # Define new XCCDF <sub> element for the variable
418
                        xccdfvarsub = ElementTree.Element("sub", idref=varname)
419
420
                        # If this is first sub element,
421
                        # the textcontribution needs to go to fix text
422
                        # otherwise, append to last subelement
423
                        nfixchildren = len(list(fix))
424
                        if nfixchildren == 0:
425
                            fix.text += fixtextcontrib
426
                        else:
427
                            previouselem = fix[nfixchildren-1]
428
                            previouselem.tail += fixtextcontrib
429
430
                        # If second pair element is not empty, append it as
431
                        # tail for the subelement (prefixed with closing '"')
432
                        if fixparts[idx + 1] is not None:
433
                            xccdfvarsub.tail = '"' + '\n' + fixparts[idx + 1]
434
                        # Otherwise append just enclosing '"'
435
                        else:
436
                            xccdfvarsub.tail = '"' + '\n'
437
                        # Append the new subelement to the fix element
438
                        fix.append(xccdfvarsub)
439
                    # This chunk contains call of other remediation function
440
                    else:
441
                        # Extract remediation function name
442
                        funcname = re.search(r'\n\s*(\S+)(| .*)\n',
443
                                             fixparts[idx],
444
                                             re.DOTALL).group(1)
445
                        # Define new XCCDF <sub> element for the function
446
                        xccdffuncsub = ElementTree.Element(
447
                            "sub", idref='function_%s' % funcname)
448
                        # Append original function call into tail of the
449
                        # subelement
450
                        xccdffuncsub.tail = fixparts[idx]
451
                        # If the second element of the pair is not empty,
452
                        # append it to the tail of the subelement too
453
                        if fixparts[idx + 1] is not None:
454
                            xccdffuncsub.tail += fixparts[idx + 1]
455
                        # Append the new subelement to the fix element
456
                        fix.append(xccdffuncsub)
457
                        # Ensure the newly added <xccdf:sub> element for the
458
                        # function will be always inserted at newline
459
                        # If xccdffuncsub is the first <xccdf:sub> element
460
                        # being added as child of <fix> and fix.text doesn't
461
                        # end up with newline character, append the newline
462
                        # to the fix.text
463
                        if list(fix).index(xccdffuncsub) == 0:
464
                            if re.search(r'.*\n$', fix.text) is None:
465
                                fix.text += '\n'
466
                        # If xccdffuncsub isn't the first child (first
467
                        # <xccdf:sub> being added), and tail of previous
468
                        # child doesn't end up with newline, append the newline
469
                        # to the tail of previous child
470
                        else:
471
                            previouselem = fix[list(fix).index(xccdffuncsub) - 1]
472
                            if re.search(r'.*\n$', previouselem.tail) is None:
473
                                previouselem.tail += '\n'
474
475
        # Perform a sanity check if all known remediation function calls have been
476
        # properly XCCDF substituted. Exit with failure if some wasn't
477
478
        # First concat output form of modified fix text (including text appended
479
        # to all children of the fix)
480
        modfix = [fix.text]
481
        for child in fix.getchildren():
482
            if child is not None and child.text is not None:
483
                modfix.append(child.text)
484
        modfixtext = "".join(modfix)
485
        for func in remediation_functions:
486
            # Then efine expected XCCDF sub element form for this function
487
            funcxccdfsub = "<sub idref=\"function_%s\"" % func
488
            # Finally perform the sanity check -- if function was properly XCCDF
489
            # substituted both the original function call and XCCDF <sub> element
490
            # for that function need to be present in the modified text of the fix
491
            # Otherwise something went wrong, thus exit with failure
492
            if func in modfixtext and funcxccdfsub not in modfixtext:
493
                sys.stderr.write("Error performing XCCDF <sub> substitution "
494
                                 "for function %s in %s fix. Exiting...\n"
495
                                 % (func, fix.get("rule")))
496
                sys.exit(1)
497
    else:
498
        sys.stderr.write("Unknown remediation type '%s'\n" % (remediation_type))
499
        sys.exit(1)
500