utils.create_srg_export.handle_control()   B
last analyzed

Complexity

Conditions 8

Size

Total Lines 34
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 31
nop 8
dl 0
loc 34
rs 7.2693
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#!/usr/bin/python3
2
3
import argparse
4
import csv
5
import datetime
6
import json
7
import pathlib
8
import os
9
import re
10
import sys
11
12
import yaml
13
from typing import TextIO
14
import xml.etree.ElementTree as ET
15
16
from utils.srg_export import html, md, xlsx
17
from utils.srg_export.data import HEADERS, get_iacontrol_mapping
18
19
try:
20
    import ssg.build_stig
21
    import ssg.build_yaml
22
    import ssg.constants
23
    import ssg.controls
24
    import ssg.environment
25
    import ssg.rules
26
    import ssg.yaml
27
except (ModuleNotFoundError, ImportError):
28
    sys.stderr.write("Unable to load ssg python modules.\n")
29
    sys.stderr.write("Hint: run source ./.pyenv.sh\n")
30
    exit(3)
31
32
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
33
RULES_JSON = os.path.join(SSG_ROOT, "build", "rule_dirs.json")
34
BUILD_CONFIG = os.path.join(SSG_ROOT, "build", "build_config.yml")
35
OUTPUT = os.path.join(SSG_ROOT, 'build',
36
                      f'{datetime.datetime.now().strftime("%s")}_stig_export.csv')
37
SRG_PATH = os.path.join(SSG_ROOT, 'shared', 'references', 'disa-os-srg-v2r3.xml')
38
NS = {'scap': ssg.constants.datastream_namespace,
39
      'xccdf-1.2': ssg.constants.XCCDF12_NS,
40
      'xccdf-1.1': ssg.constants.XCCDF11_NS}
41
42
43
def get_iacontrol(srg_str: str) -> str:
44
    srgs = srg_str.split(',')
45
    result = list()
46
    for srg in srgs:
47
        mapping = get_iacontrol_mapping(srg)
48
        if mapping is None:
49
            continue
50
        if srg in mapping:
51
            result.append(mapping[srg])
52
    result_set = set(result)
53
    return ','.join(str(srg) for srg in result_set)
54
55
56
class DisaStatus:
57
    PENDING = "pending"
58
    PLANNED = "planned"
59
    NOT_APPLICABLE = "Not Applicable"
60
    INHERENTLY_MET = "Applicable - Inherently Met"
61
    DOCUMENTATION = "documentation"
62
    PARTIAL = "Applicable - Configurable"
63
    SUPPORTED = "supported"
64
    AUTOMATED = "Applicable - Configurable"
65
    DOES_NOT_MEET = "Applicable - Does Not Meet"
66
67
    @staticmethod
68
    def from_string(source: str) -> str:
69
        data = {
70
            ssg.controls.Status.INHERENTLY_MET: DisaStatus.INHERENTLY_MET,
71
            ssg.controls.Status.DOES_NOT_MEET: DisaStatus.DOES_NOT_MEET,
72
            ssg.controls.Status.NOT_APPLICABLE: DisaStatus.NOT_APPLICABLE,
73
            ssg.controls.Status.AUTOMATED: DisaStatus.AUTOMATED,
74
            ssg.controls.Status.MANUAL: DisaStatus.AUTOMATED
75
        }
76
        return data.get(source, source)
77
78
    STATUSES = {AUTOMATED, INHERENTLY_MET, DOES_NOT_MEET, NOT_APPLICABLE}
79
80
81
def html_plain_text(source: str) -> str:
82
    if source is None:
83
        return ""
84
    # Quick and dirty way to clean up HTML fields.
85
    # Add line breaks
86
    result = source.replace("<br />", "\n")
87
    result = result.replace("<br/>", "\n")
88
    result = result.replace("<tt>", '"')
89
    result = result.replace("</tt>", '"')
90
    # Remove all other tags
91
    result = re.sub(r"(?s)<.*?>", " ", result)
92
93
    # only replace this after replacing other tags as < and >
94
    # would be caught by the generic substitution
95
    result = result.replace("&gt;", ">")
96
    result = result.replace("&lt;", "<")
97
    return result
98
99
100
def get_variable_value(root_path: str, product: str, name: str, selector: str) -> str:
101
    product_value_full_path = pathlib.Path(root_path).joinpath('build').joinpath(product)\
102
        .joinpath('values').joinpath(f'{name}.yml').absolute()
103
    if not product_value_full_path.exists():
104
        sys.stderr.write(f'Undefined variable {name}\n')
105
        exit(7)
106
    with open(product_value_full_path, 'r') as f:
107
        var_yaml = yaml.load(Loader=yaml.BaseLoader, stream=f)
108
        if 'options' not in var_yaml:
109
            sys.stderr.write(f'No options for {name}\n')
110
            exit(8)
111
        if not selector and 'default' not in var_yaml['options']:
112
            sys.stderr.write(f'No default selector for {name}\n')
113
            exit(10)
114
        if not selector:
115
            return str(var_yaml['options']['default'])
116
117
        if selector not in var_yaml['options']:
118
            sys.stderr.write(f'Option {selector} does not exist for {name}\n')
119
            exit(9)
120
        else:
121
            return str(var_yaml['options'][selector])
122
123
124
def replace_variables(source: str, variables: dict, root_path: str, product: str) -> str:
125
    result = source
126
    if source:
127
        sub_element_regex = r'<sub idref="([a-z0-9_]+)" \/>'
128
        matches = re.finditer(sub_element_regex, source, re.MULTILINE)
129
        for match in matches:
130
            name = match.group(1)
131
            value = get_variable_value(
132
                root_path, product, name, variables.get(name))
133
            result = result.replace(match.group(), value)
134
    return result
135
136
137
def handle_variables(source: str, variables: dict, root_path: str, product: str) -> str:
138
    result = replace_variables(source, variables, root_path, product)
139
    return html_plain_text(result)
140
141
142
def get_description_root(srg: ET.Element) -> ET.Element:
143
    # DISA adds escaped XML to the description field
144
    # This method unescapes that XML and parses it
145
    description_xml = "<root>"
146
    description_xml += srg.find('xccdf-1.1:description', NS).text.replace('&lt;', '<') \
147
        .replace('&gt;', '>').replace(' & ', '')
148
    description_xml += "</root>"
149
    description_root = ET.ElementTree(ET.fromstring(description_xml)).getroot()
150
    return description_root
151
152
153
def get_srg_dict(xml_path: str) -> dict:
154
    if not pathlib.Path(xml_path).exists():
155
        sys.stderr.write("XML for SRG was not found\n")
156
        sys.stderr.write(f"Could not open file {xml_path} \n")
157
        exit(1)
158
    root = ET.parse(xml_path).getroot()
159
    srgs = dict()
160
    for group in root.findall('xccdf-1.1:Group', NS):
161
        for srg in group.findall('xccdf-1.1:Rule', NS):
162
            srg_id = srg.find('xccdf-1.1:version', NS).text
163
            srgs[srg_id] = dict()
164
            srgs[srg_id]['severity'] = ssg.build_stig.get_severity(srg.get('severity'))
165
            srgs[srg_id]['title'] = srg.find('xccdf-1.1:title', NS).text
166
            description_root = get_description_root(srg)
167
            srgs[srg_id]['vuln_discussion'] = \
168
                html_plain_text(description_root.find('VulnDiscussion').text)
169
            srgs[srg_id]['cci'] = \
170
                srg.find("xccdf-1.1:ident[@system='http://cyber.mil/cci']", NS).text
171
            srgs[srg_id]['fix'] = srg.find('xccdf-1.1:fix', NS).text
172
            srgs[srg_id]['check'] = \
173
                html_plain_text(srg.find('xccdf-1.1:check/xccdf-1.1:check-content', NS).text)
174
            srgs[srg_id]['ia_controls'] = description_root.find('IAControls').text
175
    return srgs
176
177
178
def handle_rule_yaml(product: str, rule_dir: str, env_yaml: dict) -> ssg.build_yaml.Rule:
179
    rule_file = ssg.rules.get_rule_dir_yaml(rule_dir)
180
181
    rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, env_yaml=env_yaml)
182
    rule_yaml.normalize(product)
183
184
    return rule_yaml
185
186
187
def parse_args() -> argparse.Namespace:
188
    parser = argparse.ArgumentParser()
189
    parser.add_argument('-c', '--control', type=str, action="store", required=True,
190
                        help="The control file to parse")
191
    parser.add_argument('-o', '--output', type=str,
192
                        help=f"The path to the output. Defaults to {OUTPUT}",
193
                        default=OUTPUT)
194
    parser.add_argument("-r", "--root", type=str, action="store", default=SSG_ROOT,
195
                        help=f"Path to SSG root directory (defaults to {SSG_ROOT})")
196
    parser.add_argument("-j", "--json", type=str, action="store", default=RULES_JSON,
197
                        help=f"Path to the rules_dir.json (defaults to {RULES_JSON})")
198
    parser.add_argument("-p", "--product", type=str, action="store", required=True,
199
                        help="What product to get STIGs for")
200
    parser.add_argument("-b", "--build-config-yaml", default=BUILD_CONFIG,
201
                        help="YAML file with information about the build configuration.")
202
    parser.add_argument("-m", "--manual", type=str, action="store",
203
                        help="Path to XML XCCDF manual file to use as the source of the SRGs",
204
                        default=SRG_PATH)
205
    parser.add_argument("--prefer-controls", action="store_true",
206
                        help="When creating rows prefer checks and fixes from controls over rules",
207
                        default=False)
208
    parser.add_argument("-f", "--out-format", type=str, choices=("csv", "xlsx", "html", "md"),
209
                        action="store", help="The format the output should take. Defaults to csv",
210
                        default="csv")
211
    return parser.parse_args()
212
213
214
def get_policy_specific_content(key: str, rule_object: ssg.build_yaml.Rule) -> str:
215
    psc = rule_object.policy_specific_content
216
    if not psc:
217
        return ""
218
    stig = psc.get('stig')
219
    if not stig:
220
        return ""
221
    return stig.get(key, "")
222
223
224
def use_rule_content(control: ssg.controls.Control, prefer_controls: bool) -> bool:
225
    if control.fixtext is not None and control.check is not None and prefer_controls:
226
        return False
227
    return True
228
229
230
def handle_control(product: str, control: ssg.controls.Control, env_yaml: ssg.environment,
231
                   rule_json: dict, srgs: dict, used_rules: list, root_path: str,
232
                   prefer_controls: bool) -> list:
233
    if len(control.selections) > 0 and use_rule_content(control, prefer_controls):
234
        rows = list()
235
        for selection in control.selections:
236
            if selection not in used_rules and selection in control.selected:
237
                rule_object = handle_rule_yaml(product, rule_json[selection]['dir'], env_yaml)
238
                row = create_base_row(control, srgs, rule_object)
239
                if control.levels is not None:
240
                    row['Severity'] = ssg.build_stig.get_severity(control.levels[0])
241
                requirement = get_policy_specific_content('srg_requirement', rule_object)
242
                row['Requirement'] = handle_variables(requirement,
243
                                                      control.variables, root_path,
244
                                                      product)
245
                rationale = get_policy_specific_content('vuldiscussion', rule_object)
246
                row['Vul Discussion'] = handle_variables(rationale, control.variables,
247
                                                         root_path, product)
248
                checktext = get_policy_specific_content('checktext', rule_object)
249
                row['Check'] = handle_variables(checktext, control.variables, root_path, product)
250
                fixtext = get_policy_specific_content('fixtext', rule_object)
251
                row['Fix'] = handle_variables(fixtext, control.variables, root_path,
252
                                              product)
253
                row['STIGID'] = rule_object.identifiers.get('cce', "")
254
                if control.status is not None:
255
                    row['Status'] = DisaStatus.from_string(control.status)
256
                else:
257
                    row['Status'] = DisaStatus.AUTOMATED
258
                used_rules.append(selection)
259
                rows.append(row)
260
        return rows
261
262
    else:
263
        return [no_selections_row(control, srgs)]
264
265
266
def no_selections_row(control, srgs):
267
    row = create_base_row(control, srgs, ssg.build_yaml.Rule('null'))
268
    row['Requirement'] = control.title
269
    row['Status'] = DisaStatus.from_string(control.status)
270
    row['Fix'] = control.fixtext
271
    row['Check'] = control.check
272
    row['Vul Discussion'] = html_plain_text(control.rationale)
273
    row["STIGID"] = ""
274
    return row
275
276
277
def create_base_row(item: ssg.controls.Control, srgs: dict,
278
                    rule_object: ssg.build_yaml.Rule) -> dict:
279
    row = dict()
280
    srg_id = item.id
281
    if srg_id not in srgs:
282
        print(f"Unable to find SRG {srg_id}. Id in the control must be a valid SRGID.")
283
        exit(4)
284
    srg = srgs[srg_id]
285
286
    row['SRGID'] = rule_object.references.get('srg', srg_id)
287
    row['CCI'] = rule_object.references.get('disa', srg['cci'])
288
    row['SRG Requirement'] = srg['title']
289
    row['SRG VulDiscussion'] = html_plain_text(srg['vuln_discussion'])
290
    row['SRG Check'] = html_plain_text(srg['check'])
291
    row['SRG Fix'] = srg['fix']
292
    row['Severity'] = ssg.build_stig.get_severity(srg.get('severity'))
293
    row['IA Control'] = get_iacontrol(row['SRGID'])
294
    row['Mitigation'] = item.mitigation
295
    row['Artifact Description'] = item.artifact_description
296
    row['Status Justification'] = item.status_justification
297
    return row
298
299
300
def setup_csv_writer(csv_file: TextIO) -> csv.DictWriter:
301
    csv_writer = csv.DictWriter(csv_file, HEADERS)
302
    csv_writer.writeheader()
303
    return csv_writer
304
305
306
def get_rule_json(json_path: str) -> dict:
307
    with open(json_path, 'r') as json_file:
308
        rule_json = json.load(json_file)
309
    return rule_json
310
311
312
def check_paths(control_path: str, rule_json_path: str) -> None:
313
    control_full_path = pathlib.Path(control_path).absolute()
314
    if not control_full_path.exists():
315
        sys.stderr.write(f"Unable to find control file {control_full_path}\n")
316
        exit(5)
317
    rule_json_full_path = pathlib.Path(rule_json_path).absolute()
318
    if not rule_json_full_path.exists():
319
        sys.stderr.write(f"Unable to find rule_dirs.json file {rule_json_full_path}\n")
320
        sys.stderr.write("Hint: run ./utils/rule_dir_json.py\n")
321
        exit(2)
322
323
324
def check_product_value_path(root_path: str, product: str) -> None:
325
    product_value_full_path = pathlib.Path(root_path).joinpath('build')\
326
        .joinpath(product).joinpath('values').absolute()
327
    if not pathlib.Path.exists(product_value_full_path):
328
        sys.stderr.write(f"Unable to find values directory for"
329
                         f" {product} in {product_value_full_path}\n")
330
        sys.stderr.write(f"Have you built {product}\n")
331
        exit(6)
332
333
334
def get_policy(args, env_yaml) -> ssg.controls.Policy:
335
    policy = ssg.controls.Policy(args.control, env_yaml=env_yaml)
336
    policy.load()
337
    return policy
338
339
340
def handle_csv_output(output: str, results: list) -> str:
341
    with open(output, 'w') as csv_file:
342
        csv_writer = setup_csv_writer(csv_file)
343
        for row in results:
344
            csv_writer.writerow(row)
345
        return output
346
347
348
def handle_xlsx_output(output: str, product: str, results: list) -> str:
349
    output = output.replace('.csv', '.xlsx')
350
    for row in results:
351
        row['IA Control'] = get_iacontrol(row['SRGID'])
352
    xlsx.handle_dict(results, output, f'{product} SRG Mapping')
353
    return output
354
355
356
def handle_html_output(output: str, product: str, results: list) -> str:
357
    for row in results:
358
        row['IA Control'] = get_iacontrol(row['SRGID'])
359
    output = output.replace('.csv', '.html')
360
    html.handle_dict(results, output, f'{product} SRG Mapping')
361
    return output
362
363
364
def handle_md_output(output: str, product: str, results: list) -> str:
365
    output = output.replace('.csv', '.md')
366
    for row in results:
367
        row['IA Control'] = get_iacontrol(row['SRGID'])
368
    md.handle_dict(results, output, f'{product} SRG Mapping')
369
    return output
370
371
372
def handle_output(output: str, results: list, format_type: str, product: str) -> None:
373
    if format_type == 'csv':
374
        output = handle_csv_output(output, results)
375
    elif format_type == 'xlsx':
376
        output = handle_xlsx_output(output, product, results)
377
    elif format_type == 'md':
378
        output = handle_md_output(output, product, results)
379
    elif format_type == 'html':
380
        output = handle_html_output(output, product, results)
381
382
    print(f'Wrote output to {output}')
383
384
385
def get_env_yaml(root: str, product_path: str, build_config_yaml: str) -> dict:
386
    product_dir = os.path.join(root, "products", product_path)
387
    product_yaml_path = os.path.join(product_dir, "product.yml")
388
    env_yaml = ssg.environment.open_environment(
389
            build_config_yaml, product_yaml_path, os.path.join(root, "product_properties"))
390
    return env_yaml
391
392
393
def rows_match(old: dict, new: dict) -> bool:
394
    must_match = ['Requirement', 'Fix', 'Check']
395
    for k in must_match:
396
        if old[k] != new[k]:
397
            return False
398
    return True
399
400
401
def merge_rows(old: dict, new: dict) -> None:
402
    old["SRGID"] = old["SRGID"] + "," + new["SRGID"]
403
404
405
def extend_results(results: list, rows: list, prefer_controls: bool) -> None:
406
    # We always extend with rows that are generated based on rule selection
407
    # We also only attempt to merge the new row if we prefer control-based
408
    # policy data over rule-based
409
    if len(rows) > 1 or not prefer_controls:
410
        results.extend(rows)
411
        return
412
413
    if len(rows) == 0:
414
        return
415
416
    # If we only have one row, possibly with check and fix from the control
417
    # file and not rules, let's find a row with the same requirement, fix
418
    # and check and if they match, merge
419
    new_row = rows[0]
420
    for r in results:
421
        if rows_match(r, new_row):
422
            merge_rows(r, new_row)
423
            return
424
425
    # We didn't find any match, so we just add the row as new
426
    results.extend(rows)
427
428
429
def main() -> None:
430
    args = parse_args()
431
    check_paths(args.control, args.json)
432
    check_product_value_path(args.root, args.product)
433
434
    srgs = ssg.build_stig.parse_srgs(args.manual)
435
    product_dir = os.path.join(args.root, "products", args.product)
436
    env_yaml = get_env_yaml(
437
            args.root, args.product, args.build_config_yaml)
438
    policy = get_policy(args, env_yaml)
439
    rule_json = get_rule_json(args.json)
440
441
    used_rules = list()
442
    results = list()
443
    for control in policy.controls:
444
        rows = handle_control(args.product, control, env_yaml, rule_json, srgs, used_rules,
445
                              args.root, args.prefer_controls)
446
        extend_results(results, rows, args.prefer_controls)
447
448
    handle_output(args.output, results, args.out_format, args.product)
449
450
451
if __name__ == '__main__':
452
    main()
453