Test Failed
Push — master ( b1f310...582393 )
by Jan
03:05 queued 14s
created

utils.create_srg_export.get_severity()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
cc 3
eloc 6
nop 1
dl 0
loc 7
ccs 0
cts 6
cp 0
crap 12
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
3
import argparse
4
import csv
5
import datetime
6
import json
7
import pathlib
8
import os
9
import re
10
import sys
11
from typing.io import TextIO
12
import xml.etree.ElementTree as ET
13
14
try:
15
    import ssg.build_yaml
16
    import ssg.constants
17
    import ssg.controls
18
    import ssg.environment
19
    import ssg.rules
20
    import ssg.yaml
21
except (ModuleNotFoundError, ImportError):
22
    sys.stderr.write("Unable to load ssg python modules.\n")
23
    sys.stderr.write("Hint: run source ./.pyenv.sh\n")
24
    exit(3)
25
26
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
27
RULES_JSON = os.path.join(SSG_ROOT, "build", "rule_dirs.json")
28
BUILD_CONFIG = os.path.join(SSG_ROOT, "build", "build_config.yml")
29
OUTPUT = os.path.join(SSG_ROOT, 'build',
30
                      f'{datetime.datetime.now().strftime("%s")}_stig_export.csv')
31
SRG_PATH = os.path.join(SSG_ROOT, 'shared', 'references', 'disa-os-srg-v2r1.xml')
32
NS = {'scap': ssg.constants.datastream_namespace,
33
      'xccdf-1.2': ssg.constants.XCCDF12_NS,
34
      'xccdf-1.1': ssg.constants.XCCDF11_NS}
35
SEVERITY = {'low': 'CAT III', 'medium': 'CAT II', 'high': 'CAT I'}
36
37
38
def get_severity(input_severity: str) -> str:
39
    if input_severity not in ['CAT I', 'CAT II', 'CAT III', 'low', 'medium', 'high']:
40
        raise ValueError(f'Severity of {input_severity}')
41
    elif input_severity in ['CAT I', 'CAT II', 'CAT III']:
42
        return input_severity
43
    else:
44
        return SEVERITY[input_severity]
45
46
47
class DisaStatus:
48
    """
49
    Convert control status to a string for the spreadsheet
50
    """
51
    PENDING = "pending"
52
    PLANNED = "planned"
53
    NOT_APPLICABLE = "Not Applicable"
54
    INHERENTLY_MET = "Applicable - Inherently Met"
55
    DOCUMENTATION = "documentation"
56
    PARTIAL = "Applicable - Configurable"
57
    SUPPORTED = "supported"
58
    AUTOMATED = "Applicable - Configurable"
59
    DOES_NOT_MEET = "Applicable – Does Not Meet"
60
61
    @staticmethod
62
    def from_string(source: str) -> str:
63
        if source == ssg.controls.Status.INHERENTLY_MET:
64
            return DisaStatus.INHERENTLY_MET
65
        elif source == ssg.controls.Status.DOES_NOT_MEET:
66
            return DisaStatus.DOES_NOT_MEET
67
        elif source == ssg.controls.Status.AUTOMATED:
68
            return DisaStatus.AUTOMATED
69
        return source
70
71
    STATUSES = {AUTOMATED, INHERENTLY_MET, DOES_NOT_MEET, NOT_APPLICABLE}
72
73
74
def html_plain_text(source: str) -> str:
75
    if source is None:
76
        return ""
77
    # Quick and dirty way to clean up HTML fields.
78
    # Add line breaks
79
    result = source.replace("<br />", "\n")
80
    result = result.replace("<tt>", '"')
81
    result = result.replace("</tt>", '"')
82
    # Remove all other tags
83
    result = re.sub(r"(?s)<.*?>", " ", result)
84
    return result
85
86
87
def get_description_root(srg: ET.Element) -> ET.Element:
88
    # DISA adds escaped XML to the description field
89
    # This method unescapes that XML and parses it
90
    description_xml = "<root>"
91
    description_xml += srg.find('xccdf-1.1:description', NS).text.replace('&lt;', '<') \
92
        .replace('&gt;', '>').replace(' & ', '')
93
    description_xml += "</root>"
94
    description_root = ET.ElementTree(ET.fromstring(description_xml)).getroot()
95
    return description_root
96
97
98
def get_srg_dict(xml_path: str) -> dict:
99
    if not pathlib.Path(xml_path).exists():
100
        sys.stderr.write("XML for SRG was not found\n")
101
        exit(1)
102
    root = ET.parse(xml_path).getroot()
103
    srgs = dict()
104
    for group in root.findall('xccdf-1.1:Group', NS):
105
        for srg in group.findall('xccdf-1.1:Rule', NS):
106
            srg_id = srg.find('xccdf-1.1:version', NS).text
107
            srgs[srg_id] = dict()
108
            srgs[srg_id]['severity'] = get_severity(srg.get('severity'))
109
            srgs[srg_id]['title'] = srg.find('xccdf-1.1:title', NS).text
110
            description_root = get_description_root(srg)
111
            srgs[srg_id]['vuln_discussion'] = \
112
                html_plain_text(description_root.find('VulnDiscussion').text)
113
            srgs[srg_id]['cci'] = \
114
                srg.find("xccdf-1.1:ident[@system='http://cyber.mil/cci']", NS).text
115
            srgs[srg_id]['fix'] = srg.find('xccdf-1.1:fix', NS).text
116
            srgs[srg_id]['check'] = \
117
                html_plain_text(srg.find('xccdf-1.1:check/xccdf-1.1:check-content', NS).text)
118
            srgs[srg_id]['ia_controls'] = description_root.find('IAControls').text
119
    return srgs
120
121
122
def handle_rule_yaml(product: str, rule_dir: str, env_yaml: dict) -> ssg.build_yaml.Rule:
123
    rule_file = ssg.rules.get_rule_dir_yaml(rule_dir)
124
125
    rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, env_yaml=env_yaml)
126
    rule_yaml.normalize(product)
127
128
    return rule_yaml
129
130
131
def parse_args() -> argparse.Namespace:
132
    parser = argparse.ArgumentParser()
133
    parser.add_argument('-c', '--control', type=str, action="store", required=True,
134
                        help="The control file to parse")
135
    parser.add_argument('-o', '--output', type=str,
136
                        help=f"The path to the output. Defaults to {OUTPUT}",
137
                        default=OUTPUT)
138
    parser.add_argument("-r", "--root", type=str, action="store", default=SSG_ROOT,
139
                        help=f"Path to SSG root directory (defaults to {SSG_ROOT})")
140
    parser.add_argument("-j", "--json", type=str, action="store", default=RULES_JSON,
141
                        help=f"Path to the rules_dir.json (defaults to {RULES_JSON})")
142
    parser.add_argument("-p", "--product", type=str, action="store", required=True,
143
                        help="What product to get STIGs for")
144
    parser.add_argument("-b", "--build-config-yaml", default=BUILD_CONFIG,
145
                        help="YAML file with information about the build configuration.")
146
    parser.add_argument("-m", "--manual", type=str, action="store",
147
                        help="Path to XML XCCDF manual file to use as the source of the SRGs",
148
                        default=SRG_PATH)
149
    return parser.parse_args()
150
151
152
def handle_control(product: str, control: ssg.controls.Control, csv_writer: csv.DictWriter,
153
                   env_yaml: ssg.environment, rule_json: dict, srgs: dict,
154
                   used_rules: list) -> None:
155
    if len(control.selections) > 0:
156
        for rule in control.selections:
157
            if rule not in used_rules:
158
                rule_object = handle_rule_yaml(product, rule_json[rule]['dir'], env_yaml)
159
                row = create_base_row(control, srgs, rule_object)
160
                if control.levels is not None:
161
                    row['Severity'] = get_severity(control.levels[0])
162
                row['Requirement'] = srgs[control.id]['title'].replace('The operating system',
163
                                                                       env_yaml['full_name'])
164
                row['Vul Discussion'] = html_plain_text(rule_object.rationale)
165
                row['Check'] = f'{html_plain_text(rule_object.ocil)}\n\n' \
166
                               f'If {rule_object.ocil_clause}, then this is a finding.'
167
                row['Fix'] = html_plain_text(rule_object.fix)
168
                if control.status is not None:
169
                    row['Status'] = DisaStatus.from_string(control.status)
170
                else:
171
                    row['Status'] = DisaStatus.AUTOMATED
172
                csv_writer.writerow(row)
173
                used_rules.append(rule)
174
    else:
175
        row = create_base_row(control, srgs, ssg.build_yaml.Rule('null'))
176
        row['Requirement'] = control.description
177
        row['Status'] = DisaStatus.from_string(control.status)
178
        row['Vul Discussion'] = control.rationale
179
        row['Fix'] = control.fix
180
        row['Check'] = control.check
181
        row['Vul Discussion'] = html_plain_text(control.rationale)
182
        csv_writer.writerow(row)
183
184
185
def create_base_row(item: ssg.controls.Control, srgs: dict,
186
                    rule_object: ssg.build_yaml.Rule) -> dict:
187
    row = dict()
188
    srg_id = item.id
189
    if srg_id not in srgs:
190
        print(f"Unable to find SRG {srg_id}. Id in the control must be a valid SRGID.")
191
        exit(1)
192
    srg = srgs[srg_id]
193
194
    row['SRGID'] = rule_object.references.get('srg', srg_id)
195
    row['CCI'] = rule_object.references.get('disa', srg['cci'])
196
    row['SRG Requirement'] = srg['title']
197
    row['SRG VulDiscussion'] = srg['vuln_discussion']
198
    row['SRG Check'] = srg['check']
199
    row['SRG Fix'] = srg['fix']
200
    row['Severity'] = get_severity(srg.get('severity'))
201
    row['IA Control'] = srg['ia_controls']
202
    row['Mitigation'] = item.mitigation
203
    row['Artifact Description'] = item.artifact_description
204
    row['Status Justification'] = item.status_justification
205
    return row
206
207
208
def setup_csv_writer(csv_file: TextIO) -> csv.DictWriter:
209
    headers = ['IA Control', 'CCI', 'SRGID', 'SRG Requirement', 'Requirement',
210
               'SRG VulDiscussion', 'Vul Discussion', 'Status', 'SRG Check', 'Check', 'SRG Fix',
211
               'Fix', 'Severity', 'Mitigation', 'Artifact Description', 'Status Justification']
212
    csv_writer = csv.DictWriter(csv_file, headers)
213
    csv_writer.writeheader()
214
    return csv_writer
215
216
217
def get_rule_json(json_path: str) -> dict:
218
    with open(json_path, 'r') as json_file:
219
        rule_json = json.load(json_file)
220
    return rule_json
221
222
223
def main() -> None:
224
    args = parse_args()
225
226
    control_full_path = pathlib.Path(args.control).absolute()
227
    if not pathlib.Path.exists(control_full_path):
228
        sys.stderr.write(f"Unable to find control file {control_full_path}\n")
229
        exit(1)
230
    if not os.path.exists(args.json):
231
        sys.stderr.write(f"Unable to find rule_dirs.json file {args.json}\n")
232
        sys.stderr.write("Hint: run ./utils/rule_dir_json.py\n")
233
        exit(2)
234
    srgs = get_srg_dict(args.manual)
235
    product_dir = os.path.join(args.root, "products", args.product)
236
    product_yaml_path = os.path.join(product_dir, "product.yml")
237
    env_yaml = ssg.environment.open_environment(args.build_config_yaml, str(product_yaml_path))
238
239
    policy = ssg.controls.Policy(args.control, env_yaml=env_yaml)
240
    policy.load()
241
    rule_json = get_rule_json(args.json)
242
    full_output = pathlib.Path(args.output)
243
    used_rules = list()
244
    with open(full_output, 'w') as csv_file:
245
        csv_writer = setup_csv_writer(csv_file)
246
247
        for control in policy.controls:
248
            handle_control(args.product, control, csv_writer, env_yaml, rule_json, srgs,
249
                           used_rules)
250
        print(f"File written to {full_output}")
251
252
253
if __name__ == '__main__':
254
    main()
255