utils.create_srg_export.main()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 20
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 17
nop 0
dl 0
loc 20
rs 9.55
c 0
b 0
f 0
1
#!/usr/bin/python3
2
3
import argparse
4
import csv
5
import datetime
6
import json
7
import pathlib
8
import os
9
import re
10
import sys
11
12
import yaml
13
from typing import TextIO
14
import xml.etree.ElementTree as ET
15
16
from utils.srg_export import html, md, xlsx
17
from utils.srg_export.data import HEADERS, get_iacontrol_mapping
18
19
try:
20
    import ssg.build_stig
21
    import ssg.build_yaml
22
    import ssg.constants
23
    import ssg.controls
24
    import ssg.environment
25
    import ssg.rules
26
    import ssg.yaml
27
except (ModuleNotFoundError, ImportError):
28
    sys.stderr.write("Unable to load ssg python modules.\n")
29
    sys.stderr.write("Hint: run source ./.pyenv.sh\n")
30
    exit(3)
31
32
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
33
RULES_JSON = os.path.join(SSG_ROOT, "build", "rule_dirs.json")
34
BUILD_CONFIG = os.path.join(SSG_ROOT, "build", "build_config.yml")
35
OUTPUT = os.path.join(SSG_ROOT, 'build',
36
                      f'{datetime.datetime.now().strftime("%s")}_stig_export.csv')
37
SRG_PATH = os.path.join(SSG_ROOT, 'shared', 'references', 'disa-os-srg-v2r3.xml')
38
NS = {'scap': ssg.constants.datastream_namespace,
39
      'xccdf-1.2': ssg.constants.XCCDF12_NS,
40
      'xccdf-1.1': ssg.constants.XCCDF11_NS}
41
42
43
def get_iacontrol(srg_str: str) -> str:
44
    srgs = srg_str.split(',')
45
    result = list()
46
    for srg in srgs:
47
        mapping = get_iacontrol_mapping(srg)
48
        if mapping is None:
49
            continue
50
        if srg in mapping:
51
            result.append(mapping[srg])
52
    result_set = set(result)
53
    return ','.join(str(srg) for srg in result_set)
54
55
56
class DisaStatus:
57
    PENDING = "pending"
58
    PLANNED = "planned"
59
    NOT_APPLICABLE = "Not Applicable"
60
    INHERENTLY_MET = "Applicable - Inherently Met"
61
    DOCUMENTATION = "documentation"
62
    PARTIAL = "Applicable - Configurable"
63
    SUPPORTED = "supported"
64
    AUTOMATED = "Applicable - Configurable"
65
    DOES_NOT_MEET = "Applicable - Does Not Meet"
66
67
    @staticmethod
68
    def from_string(source: str) -> str:
69
        data = {
70
            ssg.controls.Status.INHERENTLY_MET: DisaStatus.INHERENTLY_MET,
71
            ssg.controls.Status.DOES_NOT_MEET: DisaStatus.DOES_NOT_MEET,
72
            ssg.controls.Status.NOT_APPLICABLE: DisaStatus.NOT_APPLICABLE,
73
            ssg.controls.Status.AUTOMATED: DisaStatus.AUTOMATED,
74
            ssg.controls.Status.MANUAL: DisaStatus.AUTOMATED
75
        }
76
        return data.get(source, source)
77
78
    STATUSES = {AUTOMATED, INHERENTLY_MET, DOES_NOT_MEET, NOT_APPLICABLE}
79
80
81
def html_plain_text(source: str) -> str:
82
    if source is None:
83
        return ""
84
    # Quick and dirty way to clean up HTML fields.
85
    # Add line breaks
86
    result = source.replace("<br />", "\n")
87
    result = result.replace("<br/>", "\n")
88
    result = result.replace("<tt>", '"')
89
    result = result.replace("</tt>", '"')
90
    # Remove all other tags
91
    result = re.sub(r"(?s)<.*?>", " ", result)
92
93
    # only replace this after replacing other tags as < and >
94
    # would be caught by the generic substitution
95
    result = result.replace("&gt;", ">")
96
    result = result.replace("&lt;", "<")
97
    return result
98
99
100
def get_variable_value(root_path: str, product: str, name: str, selector: str) -> str:
101
    product_value_full_path = pathlib.Path(root_path).joinpath('build').joinpath(product)\
102
        .joinpath('values').joinpath(f'{name}.yml').absolute()
103
    if not product_value_full_path.exists():
104
        sys.stderr.write(f'Undefined variable {name}\n')
105
        exit(7)
106
    with open(product_value_full_path, 'r') as f:
107
        var_yaml = yaml.load(Loader=yaml.BaseLoader, stream=f)
108
        if 'options' not in var_yaml:
109
            sys.stderr.write(f'No options for {name}\n')
110
            exit(8)
111
        if not selector and 'default' not in var_yaml['options']:
112
            sys.stderr.write(f'No default selector for {name}\n')
113
            exit(10)
114
        if not selector:
115
            return str(var_yaml['options']['default'])
116
117
        if selector not in var_yaml['options']:
118
            sys.stderr.write(f'Option {selector} does not exist for {name}\n')
119
            exit(9)
120
        else:
121
            return str(var_yaml['options'][selector])
122
123
124
def replace_variables(source: str, variables: dict, root_path: str, product: str) -> str:
125
    result = source
126
    if source:
127
        sub_element_regex = r'<sub idref="([a-z0-9_]+)" \/>'
128
        matches = re.finditer(sub_element_regex, source, re.MULTILINE)
129
        for match in matches:
130
            name = match.group(1)
131
            value = get_variable_value(
132
                root_path, product, name, variables.get(name))
133
            result = result.replace(match.group(), value)
134
    return result
135
136
137
def handle_variables(source: str, variables: dict, root_path: str, product: str) -> str:
138
    result = replace_variables(source, variables, root_path, product)
139
    return html_plain_text(result)
140
141
142
def get_description_root(srg: ET.Element) -> ET.Element:
143
    # DISA adds escaped XML to the description field
144
    # This method unescapes that XML and parses it
145
    description_xml = "<root>"
146
    description_xml += srg.find('xccdf-1.1:description', NS).text.replace('&lt;', '<') \
147
        .replace('&gt;', '>').replace(' & ', '')
148
    description_xml += "</root>"
149
    description_root = ET.ElementTree(ET.fromstring(description_xml)).getroot()
150
    return description_root
151
152
153
def get_srg_dict(xml_path: str) -> dict:
154
    if not pathlib.Path(xml_path).exists():
155
        sys.stderr.write("XML for SRG was not found\n")
156
        sys.stderr.write(f"Could not open file {xml_path} \n")
157
        exit(1)
158
    root = ET.parse(xml_path).getroot()
159
    srgs = dict()
160
    for group in root.findall('xccdf-1.1:Group', NS):
161
        for srg in group.findall('xccdf-1.1:Rule', NS):
162
            srg_id = srg.find('xccdf-1.1:version', NS).text
163
            srgs[srg_id] = dict()
164
            srgs[srg_id]['severity'] = ssg.build_stig.get_severity(srg.get('severity'))
165
            srgs[srg_id]['title'] = srg.find('xccdf-1.1:title', NS).text
166
            description_root = get_description_root(srg)
167
            srgs[srg_id]['vuln_discussion'] = \
168
                html_plain_text(description_root.find('VulnDiscussion').text)
169
            srgs[srg_id]['cci'] = \
170
                srg.find("xccdf-1.1:ident[@system='http://cyber.mil/cci']", NS).text
171
            srgs[srg_id]['fix'] = srg.find('xccdf-1.1:fix', NS).text
172
            srgs[srg_id]['check'] = \
173
                html_plain_text(srg.find('xccdf-1.1:check/xccdf-1.1:check-content', NS).text)
174
            srgs[srg_id]['ia_controls'] = description_root.find('IAControls').text
175
    return srgs
176
177
178
def handle_rule_yaml(product: str, rule_dir: str, env_yaml: dict) -> ssg.build_yaml.Rule:
179
    rule_file = ssg.rules.get_rule_dir_yaml(rule_dir)
180
181
    rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, env_yaml=env_yaml)
182
    rule_yaml.normalize(product)
183
184
    return rule_yaml
185
186
187
def parse_args() -> argparse.Namespace:
188
    parser = argparse.ArgumentParser()
189
    parser.add_argument('-c', '--control', type=str, action="store", required=True,
190
                        help="The control file to parse")
191
    parser.add_argument('-o', '--output', type=str,
192
                        help=f"The path to the output. Defaults to {OUTPUT}",
193
                        default=OUTPUT)
194
    parser.add_argument("-r", "--root", type=str, action="store", default=SSG_ROOT,
195
                        help=f"Path to SSG root directory (defaults to {SSG_ROOT})")
196
    parser.add_argument("-j", "--json", type=str, action="store", default=RULES_JSON,
197
                        help=f"Path to the rules_dir.json (defaults to {RULES_JSON})")
198
    parser.add_argument("-p", "--product", type=str, action="store", required=True,
199
                        help="What product to get STIGs for")
200
    parser.add_argument("-b", "--build-config-yaml", default=BUILD_CONFIG,
201
                        help="YAML file with information about the build configuration.")
202
    parser.add_argument("-m", "--manual", type=str, action="store",
203
                        help="Path to XML XCCDF manual file to use as the source of the SRGs",
204
                        default=SRG_PATH)
205
    parser.add_argument("--prefer-controls", action="store_true",
206
                        help="When creating rows prefer checks and fixes from controls over rules",
207
                        default=False)
208
    parser.add_argument("-f", "--out-format", type=str, choices=("csv", "xlsx", "html", "md"),
209
                        action="store", help="The format the output should take. Defaults to csv",
210
                        default="csv")
211
    return parser.parse_args()
212
213
214
def get_policy_specific_content(key: str, rule_object: ssg.build_yaml.Rule) -> str:
215
    psc = rule_object.policy_specific_content
216
    if not psc:
217
        return ""
218
    stig = psc.get('stig')
219
    if not stig:
220
        return ""
221
    return stig.get(key, "")
222
223
224
def use_rule_content(control: ssg.controls.Control, prefer_controls: bool) -> bool:
225
    if control.fixtext is not None and control.check is not None and prefer_controls:
226
        return False
227
    return True
228
229
230
def handle_control(product: str, control: ssg.controls.Control, env_yaml: ssg.environment,
231
                   rule_json: dict, srgs: dict, used_rules: list, root_path: str,
232
                   prefer_controls: bool) -> list:
233
    if len(control.selections) > 0 and use_rule_content(control, prefer_controls):
234
        rows = list()
235
        for selection in control.selections:
236
            if selection not in used_rules and selection in control.selected:
237
                rule_object = handle_rule_yaml(product, rule_json[selection]['dir'], env_yaml)
238
                row = create_base_row(control, srgs, rule_object)
239
                if control.levels is not None:
240
                    row['Severity'] = ssg.build_stig.get_severity(control.levels[0])
241
                requirement = get_policy_specific_content('srg_requirement', rule_object)
242
                row['Requirement'] = handle_variables(requirement,
243
                                                      control.variables, root_path,
244
                                                      product)
245
                rationale = get_policy_specific_content('vuldiscussion', rule_object)
246
                row['Vul Discussion'] = handle_variables(rationale, control.variables,
247
                                                         root_path, product)
248
                checktext = get_policy_specific_content('checktext', rule_object)
249
                row['Check'] = handle_variables(checktext, control.variables, root_path, product)
250
                fixtext = get_policy_specific_content('fixtext', rule_object)
251
                row['Fix'] = handle_variables(fixtext, control.variables, root_path,
252
                                              product)
253
                row['STIGID'] = rule_object.identifiers.get('cce', "")
254
                if control.status is not None:
255
                    row['Status'] = DisaStatus.from_string(control.status)
256
                else:
257
                    row['Status'] = DisaStatus.AUTOMATED
258
                used_rules.append(selection)
259
                rows.append(row)
260
        return rows
261
262
    else:
263
        return [no_selections_row(control, srgs)]
264
265
266
def no_selections_row(control, srgs):
267
    row = create_base_row(control, srgs, ssg.build_yaml.Rule('null'))
268
    row['Requirement'] = control.title
269
    row['Status'] = DisaStatus.from_string(control.status)
270
    row['Fix'] = control.fixtext
271
    row['Check'] = control.check
272
    row['Vul Discussion'] = html_plain_text(control.rationale)
273
    row["STIGID"] = ""
274
    return row
275
276
277
def create_base_row(item: ssg.controls.Control, srgs: dict,
278
                    rule_object: ssg.build_yaml.Rule) -> dict:
279
    row = dict()
280
    srg_id = item.id
281
    if srg_id not in srgs:
282
        print(f"Unable to find SRG {srg_id}. Id in the control must be a valid SRGID.")
283
        exit(4)
284
    srg = srgs[srg_id]
285
286
    row['SRGID'] = rule_object.references.get('srg', srg_id)
287
    row['CCI'] = rule_object.references.get('disa', srg['cci'])
288
    row['SRG Requirement'] = srg['title']
289
    row['SRG VulDiscussion'] = html_plain_text(srg['vuln_discussion'])
290
    row['SRG Check'] = html_plain_text(srg['check'])
291
    row['SRG Fix'] = srg['fix']
292
    row['Severity'] = ssg.build_stig.get_severity(srg.get('severity'))
293
    row['IA Control'] = get_iacontrol(row['SRGID'])
294
    row['Mitigation'] = item.mitigation
295
    row['Artifact Description'] = item.artifact_description
296
    row['Status Justification'] = item.status_justification
297
    return row
298
299
300
def setup_csv_writer(csv_file: TextIO) -> csv.DictWriter:
301
    csv_writer = csv.DictWriter(csv_file, HEADERS)
302
    csv_writer.writeheader()
303
    return csv_writer
304
305
306
def get_rule_json(json_path: str) -> dict:
307
    with open(json_path, 'r') as json_file:
308
        rule_json = json.load(json_file)
309
    return rule_json
310
311
312
def check_paths(control_path: str, rule_json_path: str) -> None:
313
    control_full_path = pathlib.Path(control_path).absolute()
314
    if not control_full_path.exists():
315
        sys.stderr.write(f"Unable to find control file {control_full_path}\n")
316
        exit(5)
317
    rule_json_full_path = pathlib.Path(rule_json_path).absolute()
318
    if not rule_json_full_path.exists():
319
        sys.stderr.write(f"Unable to find rule_dirs.json file {rule_json_full_path}\n")
320
        sys.stderr.write("Hint: run ./utils/rule_dir_json.py\n")
321
        exit(2)
322
323
324
def check_product_value_path(root_path: str, product: str) -> None:
325
    product_value_full_path = pathlib.Path(root_path).joinpath('build')\
326
        .joinpath(product).joinpath('values').absolute()
327
    if not pathlib.Path.exists(product_value_full_path):
328
        sys.stderr.write(f"Unable to find values directory for"
329
                         f" {product} in {product_value_full_path}\n")
330
        sys.stderr.write(f"Have you built {product}\n")
331
        exit(6)
332
333
334
def get_policy(args, env_yaml) -> ssg.controls.Policy:
335
    policy = ssg.controls.Policy(args.control, env_yaml=env_yaml)
336
    policy.load()
337
    return policy
338
339
340
def handle_csv_output(output: str, results: list) -> str:
341
    with open(output, 'w') as csv_file:
342
        csv_writer = setup_csv_writer(csv_file)
343
        for row in results:
344
            csv_writer.writerow(row)
345
        return output
346
347
348
def handle_xlsx_output(output: str, product: str, results: list) -> str:
349
    output = output.replace('.csv', '.xlsx')
350
    for row in results:
351
        row['IA Control'] = get_iacontrol(row['SRGID'])
352
    xlsx.handle_dict(results, output, f'{product} SRG Mapping')
353
    return output
354
355
356
def handle_html_output(output: str, product: str, results: list) -> str:
357
    for row in results:
358
        row['IA Control'] = get_iacontrol(row['SRGID'])
359
    output = output.replace('.csv', '.html')
360
    html.handle_dict(results, output, f'{product} SRG Mapping')
361
    return output
362
363
364
def handle_md_output(output: str, product: str, results: list) -> str:
365
    output = output.replace('.csv', '.md')
366
    for row in results:
367
        row['IA Control'] = get_iacontrol(row['SRGID'])
368
    md.handle_dict(results, output, f'{product} SRG Mapping')
369
    return output
370
371
372
def handle_output(output: str, results: list, format_type: str, product: str) -> None:
373
    if format_type == 'csv':
374
        output = handle_csv_output(output, results)
375
    elif format_type == 'xlsx':
376
        output = handle_xlsx_output(output, product, results)
377
    elif format_type == 'md':
378
        output = handle_md_output(output, product, results)
379
    elif format_type == 'html':
380
        output = handle_html_output(output, product, results)
381
382
    print(f'Wrote output to {output}')
383
384
385
def get_env_yaml(root: str, product_path: str, build_config_yaml: str) -> dict:
386
    product_dir = os.path.join(root, "products", product_path)
387
    product_yaml_path = os.path.join(product_dir, "product.yml")
388
    env_yaml = ssg.environment.open_environment(
389
            build_config_yaml, product_yaml_path, os.path.join(root, "product_properties"))
390
    return env_yaml
391
392
393
def rows_match(old: dict, new: dict) -> bool:
394
    must_match = ['Requirement', 'Fix', 'Check']
395
    for k in must_match:
396
        if old[k] != new[k]:
397
            return False
398
    return True
399
400
401
def merge_rows(old: dict, new: dict) -> None:
402
    old["SRGID"] = old["SRGID"] + "," + new["SRGID"]
403
404
405
def extend_results(results: list, rows: list, prefer_controls: bool) -> None:
406
    # We always extend with rows that are generated based on rule selection
407
    # We also only attempt to merge the new row if we prefer control-based
408
    # policy data over rule-based
409
    if len(rows) > 1 or not prefer_controls:
410
        results.extend(rows)
411
        return
412
413
    if len(rows) == 0:
414
        return
415
416
    # If we only have one row, possibly with check and fix from the control
417
    # file and not rules, let's find a row with the same requirement, fix
418
    # and check and if they match, merge
419
    new_row = rows[0]
420
    for r in results:
421
        if rows_match(r, new_row):
422
            merge_rows(r, new_row)
423
            return
424
425
    # We didn't find any match, so we just add the row as new
426
    results.extend(rows)
427
428
429
def main() -> None:
430
    args = parse_args()
431
    check_paths(args.control, args.json)
432
    check_product_value_path(args.root, args.product)
433
434
    srgs = ssg.build_stig.parse_srgs(args.manual)
435
    product_dir = os.path.join(args.root, "products", args.product)
436
    env_yaml = get_env_yaml(
437
            args.root, args.product, args.build_config_yaml)
438
    policy = get_policy(args, env_yaml)
439
    rule_json = get_rule_json(args.json)
440
441
    used_rules = list()
442
    results = list()
443
    for control in policy.controls:
444
        rows = handle_control(args.product, control, env_yaml, rule_json, srgs, used_rules,
445
                              args.root, args.prefer_controls)
446
        extend_results(results, rows, args.prefer_controls)
447
448
    handle_output(args.output, results, args.out_format, args.product)
449
450
451
if __name__ == '__main__':
452
    main()
453