Test Failed
Push — master ( 3872b0...d954a3 )
by Jan
01:15 queued 15s
created

utils.create_srg_export.get_srg_dict()   A

Complexity

Conditions 4

Size

Total Lines 22
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 22
nop 1
dl 0
loc 22
ccs 0
cts 19
cp 0
crap 20
rs 9.352
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
3
import argparse
4
import csv
5
import datetime
6
import json
7
import pathlib
8
import os
9
import re
10
import sys
11
from typing.io import TextIO
12
import xml.etree.ElementTree as ET
13
14
import ssg.build_yaml
15
import ssg.constants
16
import ssg.controls
17
import ssg.environment
18
import ssg.rules
19
import ssg.yaml
20
21
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
22
RULES_JSON = os.path.join(SSG_ROOT, "build", "rule_dirs.json")
23
BUILD_CONFIG = os.path.join(SSG_ROOT, "build", "build_config.yml")
24
OUTPUT = os.path.join(SSG_ROOT, 'build', f'{datetime.datetime.now().strftime("%s")}.csv')
25
SRG_PATH = os.path.join(SSG_ROOT, 'shared', 'references', 'disa-os-srg-v2r1.xml')
26
NS = {'scap': ssg.constants.datastream_namespace,
27
      'xccdf-1.2': ssg.constants.XCCDF12_NS,
28
      'xccdf-1.1': ssg.constants.XCCDF11_NS}
29
SEVERITY = {'low': 'CAT III', 'medium': 'CAT II', 'high': 'CAT I'}
30
31
32
class DisaStatus:
33
    """
34
    Convert control status to a string for the spreadsheet
35
    """
36
    PENDING = "pending"
37
    PLANNED = "planned"
38
    NOT_APPLICABLE = "Not Applicable"
39
    INHERENTLY_MET = "Applicable - Inherently Met"
40
    DOCUMENTATION = "documentation"
41
    PARTIAL = "Applicable - Configurable"
42
    SUPPORTED = "supported"
43
    AUTOMATED = "Applicable - Configurable"
44
45
    @staticmethod
46
    def from_string(source: str) -> str:
47
        if source == ssg.controls.Status.INHERENTLY_MET:
48
            return DisaStatus.INHERENTLY_MET
49
        return source
50
51
52
def html_plain_text(source: str) -> str:
53
    if source is None:
54
        return ""
55
    # Quick and dirty way to clean up HTML fields.
56
    # Add line breaks
57
    result = source.replace("<br />", "\n")
58
    # Remove all other tags
59
    result = re.sub(r"(?s)<.*?>", " ", result)
60
    return result
61
62
63
def get_description_root(srg: ET.Element) -> ET.Element:
64
    # DISA adds escaped XML to the description field
65
    # This method unescapes that XML and parses it
66
    description_xml = "<root>"
67
    description_xml += srg.find('xccdf-1.1:description', NS).text.replace('&lt;', '<') \
68
        .replace('&gt;', '>').replace(' & ', '')
69
    description_xml += "</root>"
70
    description_root = ET.ElementTree(ET.fromstring(description_xml)).getroot()
71
    return description_root
72
73
74
def get_srg_dict(xml_path: str) -> dict:
75
    if not pathlib.Path(xml_path).exists():
76
        sys.stderr.write("XML for SRG was not found\n")
77
        exit(1)
78
    root = ET.parse(xml_path).getroot()
79
    srgs = dict()
80
    for group in root.findall('xccdf-1.1:Group', NS):
81
        for srg in group.findall('xccdf-1.1:Rule', NS):
82
            srg_id = srg.find('xccdf-1.1:version', NS).text
83
            srgs[srg_id] = dict()
84
            srgs[srg_id]['severity'] = SEVERITY[srg.get('severity')]
85
            srgs[srg_id]['title'] = srg.find('xccdf-1.1:title', NS).text
86
            description_root = get_description_root(srg)
87
            srgs[srg_id]['vuln_discussion'] = \
88
                html_plain_text(description_root.find('VulnDiscussion').text)
89
            srgs[srg_id]['cci'] = \
90
                srg.find("xccdf-1.1:ident[@system='http://cyber.mil/cci']", NS).text
91
            srgs[srg_id]['fix'] = srg.find('xccdf-1.1:fix', NS).text
92
            srgs[srg_id]['check'] = \
93
                html_plain_text(srg.find('xccdf-1.1:check/xccdf-1.1:check-content', NS).text)
94
            srgs[srg_id]['ia_controls'] = description_root.find('IAControls').text
95
    return srgs
96
97
98
def handle_rule_yaml(product: str, rule_dir: str, env_yaml: dict) -> ssg.build_yaml.Rule:
99
    rule_file = ssg.rules.get_rule_dir_yaml(rule_dir)
100
101
    rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, env_yaml=env_yaml)
102
    rule_yaml.normalize(product)
103
104
    return rule_yaml
105
106
107
def parse_args() -> argparse.Namespace:
108
    parser = argparse.ArgumentParser()
109
    parser.add_argument('-c', '--control', type=str, action="store", required=True,
110
                        help="The control file to parse")
111
    parser.add_argument('-o', '--output', type=str, help="The path to the output",
112
                        default=OUTPUT)
113
    parser.add_argument("-r", "--root", type=str, action="store", default=SSG_ROOT,
114
                        help=f"Path to SSG root directory (defaults to {SSG_ROOT})")
115
    parser.add_argument("-j", "--json", type=str, action="store", default=RULES_JSON,
116
                        help=f"Path to the rules_dir.json (defaults to {RULES_JSON})")
117
    parser.add_argument("-p", "--product", type=str, action="store", required=True,
118
                        help="What product to get STIGs for")
119
    parser.add_argument("-b", "--build-config-yaml", default=BUILD_CONFIG,
120
                        help="YAML file with information about the build configuration. ")
121
    parser.add_argument("-m", "--manual", type=str, action="store",
122
                        help="Path to XML XCCDF manual file to use as the source of the SRGs",
123
                        default=SRG_PATH)
124
    return parser.parse_args()
125
126
127
def handle_control(product: str, control: ssg.controls.Control, csv_writer: csv.DictWriter,
128
                   env_yaml: ssg.environment, rule_json: dict, srgs: dict) -> None:
129
    if control.selections:
130
        for rule in control.selections:
131
            row = create_base_row(control, srgs)
132
            rule_object = handle_rule_yaml(product, rule_json[rule]['dir'], env_yaml)
133
            row['Requirement'] = html_plain_text(rule_object.description)
134
            row['Vul Discussion'] = html_plain_text(rule_object.rationale)
135
            row['Check'] = html_plain_text(rule_object.ocil)
136
            row['Fix'] = html_plain_text(rule_object.fix)
137
            # If then control has rule by definition the status is "Applicable - Configurable"
138
            row['Status'] = DisaStatus.AUTOMATED
139
            csv_writer.writerow(row)
140
    else:
141
        row = create_base_row(control, srgs)
142
        row['Mitigation'] = control.mitigation
143
        row['Artifact Description'] = control.artifact_description
144
        row['Status Justification'] = control.status_justification
145
        row['Status'] = DisaStatus.from_string(control.status)
146
        csv_writer.writerow(row)
147
148
149
def create_base_row(item: ssg.controls.Control, srgs: dict) -> dict:
150
    row = dict()
151
    srg_id = item.id
152
    if srg_id not in srgs:
153
        print(f"Unable to find SRG {srg_id}. Id in the control must be an srg.")
154
        exit(1)
155
    srg = srgs[srg_id]
156
    row['SRGID'] = srg_id
157
    row['CCI'] = srg['cci']
158
    row['SRG Requirement'] = srg['title']
159
    row['SRG VulDiscussion'] = srg['vuln_discussion']
160
    row['SRG Check'] = srg['check']
161
    row['SRG Fix'] = srg['fix']
162
    row['Severity'] = srg['severity']
163
    row['IA Control'] = srg['ia_controls']
164
    return row
165
166
167
def setup_csv_writer(csv_file: TextIO) -> csv.DictWriter:
168
    headers = ['IA Control', 'CCI', 'SRGID', 'SRG Requirement', 'Requirement',
169
               'SRG VulDiscussion', 'Vul Discussion', 'Status', 'SRG Check', 'Check', 'SRG Fix',
170
               'Fix', 'Severity', 'Mitigation', 'Artifact Description', 'Status Justification']
171
    csv_writer = csv.DictWriter(csv_file, headers)
172
    csv_writer.writeheader()
173
    return csv_writer
174
175
176
def get_rule_json(json_path: str) -> dict:
177
    with open(json_path, 'r') as json_file:
178
        rule_json = json.load(json_file)
179
    return rule_json
180
181
182
def main() -> None:
183
    args = parse_args()
184
185
    control_full_path = pathlib.Path(args.control).absolute()
186
    if not pathlib.Path.exists(control_full_path):
187
        sys.stderr.write(f"Unable to find control file {control_full_path}\n")
188
        exit(1)
189
    srgs = get_srg_dict(args.manual)
190
    control = ssg.controls.Policy(args.control)
191
    control.load()
192
    rule_json = get_rule_json(args.json)
193
194
    full_output = pathlib.Path(args.output)
195
    with open(full_output, 'w') as csv_file:
196
        csv_writer = setup_csv_writer(csv_file)
197
198
        product_dir = os.path.join(args.root, "products", args.product)
199
        product_yaml_path = os.path.join(product_dir, "product.yml")
200
        env_yaml = ssg.environment.open_environment(args.build_config_yaml, str(product_yaml_path))
201
202
        for control in control.controls:
203
            handle_control(args.product, control, csv_writer, env_yaml, rule_json, srgs)
204
        print(f"File written to {full_output}")
205
206
207
if __name__ == '__main__':
208
    main()
209