Test Failed
Push — master ( 2b3e2b...0c4269 )
by Jan
01:15 queued 15s
created

utils.create_srg_export.main()   B

Complexity

Conditions 5

Size

Total Lines 26
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
cc 5
eloc 23
nop 0
dl 0
loc 26
ccs 0
cts 23
cp 0
crap 30
rs 8.8613
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
3
import argparse
4
import csv
5
import datetime
6
import json
7
import pathlib
8
import os
9
import re
10
import sys
11
from typing.io import TextIO
12
import xml.etree.ElementTree as ET
13
14
try:
15
    import ssg.build_yaml
16
    import ssg.constants
17
    import ssg.controls
18
    import ssg.environment
19
    import ssg.rules
20
    import ssg.yaml
21
except (ModuleNotFoundError, ImportError):
22
    sys.stderr.write("Unable to load ssg python modules.\n")
23
    sys.stderr.write("Hint: run source ./.pyenv.sh\n")
24
    exit(3)
25
26
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
27
RULES_JSON = os.path.join(SSG_ROOT, "build", "rule_dirs.json")
28
BUILD_CONFIG = os.path.join(SSG_ROOT, "build", "build_config.yml")
29
OUTPUT = os.path.join(SSG_ROOT, 'build',
30
                      f'{datetime.datetime.now().strftime("%s")}_stig_export.csv')
31
SRG_PATH = os.path.join(SSG_ROOT, 'shared', 'references', 'disa-os-srg-v2r1.xml')
32
NS = {'scap': ssg.constants.datastream_namespace,
33
      'xccdf-1.2': ssg.constants.XCCDF12_NS,
34
      'xccdf-1.1': ssg.constants.XCCDF11_NS}
35
SEVERITY = {'low': 'CAT III', 'medium': 'CAT II', 'high': 'CAT I'}
36
37
38
class DisaStatus:
39
    """
40
    Convert control status to a string for the spreadsheet
41
    """
42
    PENDING = "pending"
43
    PLANNED = "planned"
44
    NOT_APPLICABLE = "Not Applicable"
45
    INHERENTLY_MET = "Applicable - Inherently Met"
46
    DOCUMENTATION = "documentation"
47
    PARTIAL = "Applicable - Configurable"
48
    SUPPORTED = "supported"
49
    AUTOMATED = "Applicable - Configurable"
50
    DOES_NOT_MEET = "Applicable – Does Not Meet"
51
52
    @staticmethod
53
    def from_string(source: str) -> str:
54
        if source == ssg.controls.Status.INHERENTLY_MET:
55
            return DisaStatus.INHERENTLY_MET
56
        elif source == ssg.controls.Status.DOES_NOT_MEET:
57
            return DisaStatus.DOES_NOT_MEET
58
        return source
59
60
61
def html_plain_text(source: str) -> str:
62
    if source is None:
63
        return ""
64
    # Quick and dirty way to clean up HTML fields.
65
    # Add line breaks
66
    result = source.replace("<br />", "\n")
67
    result = result.replace("<tt>", '"')
68
    result = result.replace("</tt>", '"')
69
    # Remove all other tags
70
    result = re.sub(r"(?s)<.*?>", " ", result)
71
    return result
72
73
74
def get_description_root(srg: ET.Element) -> ET.Element:
75
    # DISA adds escaped XML to the description field
76
    # This method unescapes that XML and parses it
77
    description_xml = "<root>"
78
    description_xml += srg.find('xccdf-1.1:description', NS).text.replace('&lt;', '<') \
79
        .replace('&gt;', '>').replace(' & ', '')
80
    description_xml += "</root>"
81
    description_root = ET.ElementTree(ET.fromstring(description_xml)).getroot()
82
    return description_root
83
84
85
def get_srg_dict(xml_path: str) -> dict:
86
    if not pathlib.Path(xml_path).exists():
87
        sys.stderr.write("XML for SRG was not found\n")
88
        exit(1)
89
    root = ET.parse(xml_path).getroot()
90
    srgs = dict()
91
    for group in root.findall('xccdf-1.1:Group', NS):
92
        for srg in group.findall('xccdf-1.1:Rule', NS):
93
            srg_id = srg.find('xccdf-1.1:version', NS).text
94
            srgs[srg_id] = dict()
95
            srgs[srg_id]['severity'] = SEVERITY[srg.get('severity')]
96
            srgs[srg_id]['title'] = srg.find('xccdf-1.1:title', NS).text
97
            description_root = get_description_root(srg)
98
            srgs[srg_id]['vuln_discussion'] = \
99
                html_plain_text(description_root.find('VulnDiscussion').text)
100
            srgs[srg_id]['cci'] = \
101
                srg.find("xccdf-1.1:ident[@system='http://cyber.mil/cci']", NS).text
102
            srgs[srg_id]['fix'] = srg.find('xccdf-1.1:fix', NS).text
103
            srgs[srg_id]['check'] = \
104
                html_plain_text(srg.find('xccdf-1.1:check/xccdf-1.1:check-content', NS).text)
105
            srgs[srg_id]['ia_controls'] = description_root.find('IAControls').text
106
    return srgs
107
108
109
def handle_rule_yaml(product: str, rule_dir: str, env_yaml: dict) -> ssg.build_yaml.Rule:
110
    rule_file = ssg.rules.get_rule_dir_yaml(rule_dir)
111
112
    rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, env_yaml=env_yaml)
113
    rule_yaml.normalize(product)
114
115
    return rule_yaml
116
117
118
def parse_args() -> argparse.Namespace:
119
    parser = argparse.ArgumentParser()
120
    parser.add_argument('-c', '--control', type=str, action="store", required=True,
121
                        help="The control file to parse")
122
    parser.add_argument('-o', '--output', type=str,
123
                        help=f"The path to the output. Defaults to {OUTPUT}",
124
                        default=OUTPUT)
125
    parser.add_argument("-r", "--root", type=str, action="store", default=SSG_ROOT,
126
                        help=f"Path to SSG root directory (defaults to {SSG_ROOT})")
127
    parser.add_argument("-j", "--json", type=str, action="store", default=RULES_JSON,
128
                        help=f"Path to the rules_dir.json (defaults to {RULES_JSON})")
129
    parser.add_argument("-p", "--product", type=str, action="store", required=True,
130
                        help="What product to get STIGs for")
131
    parser.add_argument("-b", "--build-config-yaml", default=BUILD_CONFIG,
132
                        help="YAML file with information about the build configuration.")
133
    parser.add_argument("-m", "--manual", type=str, action="store",
134
                        help="Path to XML XCCDF manual file to use as the source of the SRGs",
135
                        default=SRG_PATH)
136
    return parser.parse_args()
137
138
139
def handle_control(product: str, control: ssg.controls.Control, csv_writer: csv.DictWriter,
140
                   env_yaml: ssg.environment, rule_json: dict, srgs: dict) -> None:
141
    if control.selections:
142
        for rule in control.selections:
143
            row = create_base_row(control, srgs)
144
            rule_object = handle_rule_yaml(product, rule_json[rule]['dir'], env_yaml)
145
            if control.levels is not None:
146
                row['Severity'] = control.levels[0]
147
            row['Requirement'] = html_plain_text(rule_object.description)
148
            row['Vul Discussion'] = html_plain_text(rule_object.rationale)
149
            row['Check'] = f'{html_plain_text(rule_object.ocil)}\n\n' \
150
                           f'If {rule_object.ocil_clause}, then this is a finding.'
151
            row['Fix'] = html_plain_text(rule_object.fix)
152
            # If then control has rule by definition the status is "Applicable - Configurable"
153
            row['Status'] = DisaStatus.AUTOMATED
154
            csv_writer.writerow(row)
155
    else:
156
        row = create_base_row(control, srgs)
157
        row['Mitigation'] = control.mitigation
158
        row['Artifact Description'] = control.artifact_description
159
        row['Status Justification'] = control.status_justification
160
        row['Status'] = DisaStatus.from_string(control.status)
161
        csv_writer.writerow(row)
162
163
164
def create_base_row(item: ssg.controls.Control, srgs: dict) -> dict:
165
    row = dict()
166
    srg_id = item.id
167
    if srg_id not in srgs:
168
        print(f"Unable to find SRG {srg_id}. Id in the control must be an srg.")
169
        exit(1)
170
    srg = srgs[srg_id]
171
    row['SRGID'] = srg_id
172
    row['CCI'] = srg['cci']
173
    row['SRG Requirement'] = srg['title']
174
    row['SRG VulDiscussion'] = srg['vuln_discussion']
175
    row['SRG Check'] = srg['check']
176
    row['SRG Fix'] = srg['fix']
177
    row['Severity'] = srg['severity']
178
    row['IA Control'] = srg['ia_controls']
179
    return row
180
181
182
def setup_csv_writer(csv_file: TextIO) -> csv.DictWriter:
183
    headers = ['IA Control', 'CCI', 'SRGID', 'SRG Requirement', 'Requirement',
184
               'SRG VulDiscussion', 'Vul Discussion', 'Status', 'SRG Check', 'Check', 'SRG Fix',
185
               'Fix', 'Severity', 'Mitigation', 'Artifact Description', 'Status Justification']
186
    csv_writer = csv.DictWriter(csv_file, headers)
187
    csv_writer.writeheader()
188
    return csv_writer
189
190
191
def get_rule_json(json_path: str) -> dict:
192
    with open(json_path, 'r') as json_file:
193
        rule_json = json.load(json_file)
194
    return rule_json
195
196
197
def main() -> None:
198
    args = parse_args()
199
200
    control_full_path = pathlib.Path(args.control).absolute()
201
    if not pathlib.Path.exists(control_full_path):
202
        sys.stderr.write(f"Unable to find control file {control_full_path}\n")
203
        exit(1)
204
    if not os.path.exists(args.json):
205
        sys.stderr.write(f"Unable to find rule_dirs.json file {args.json}\n")
206
        sys.stderr.write("Hint: run ./utils/rule_dir_json.py\n")
207
        exit(2)
208
    srgs = get_srg_dict(args.manual)
209
    control = ssg.controls.Policy(args.control)
210
    control.load()
211
    rule_json = get_rule_json(args.json)
212
    full_output = pathlib.Path(args.output)
213
    with open(full_output, 'w') as csv_file:
214
        csv_writer = setup_csv_writer(csv_file)
215
216
        product_dir = os.path.join(args.root, "products", args.product)
217
        product_yaml_path = os.path.join(product_dir, "product.yml")
218
        env_yaml = ssg.environment.open_environment(args.build_config_yaml, str(product_yaml_path))
219
220
        for control in control.controls:
221
            handle_control(args.product, control, csv_writer, env_yaml, rule_json, srgs)
222
        print(f"File written to {full_output}")
223
224
225
if __name__ == '__main__':
226
    main()
227