1 | #!/usr/bin/python3 |
||
2 | import argparse |
||
3 | import csv |
||
4 | import enum |
||
5 | import os.path |
||
6 | import sys |
||
7 | |||
8 | from utils.create_srg_export import DisaStatus |
||
9 | |||
10 | try: |
||
11 | import ssg.constants |
||
12 | except (ModuleNotFoundError, ImportError): |
||
13 | sys.stderr.write("Unable to load ssg python modules.\n") |
||
14 | sys.stderr.write("Hint: run source ./.pyenv.sh\n") |
||
15 | exit(3) |
||
16 | |||
17 | |||
18 | class Problems(enum.Enum): |
||
19 | MISSING_REQUIREMENT = "Missing requirement" |
||
20 | REQUIREMENT_HAS_INVALID_CHARS = "Requirement contains \", &, ', <, or > and is invalid." |
||
21 | CHECK_BADWORDS = "Check contains should, shall, or please." |
||
22 | FIX_BADWORDS = "Fix contains should, shall, or please." |
||
23 | CHECK_BAD_START = "Check starts with ensure or interview" |
||
24 | FIX_BAD_START = "Fix starts with ensure or interview" |
||
25 | CHECK_IS_A_FIND = "Check doesn't contain ', this is a finding'" |
||
26 | INVALID_STATUS = "The status is not valid" |
||
27 | INVALID_SEVERITY = "Severity is not CAT I, CAT II, or CAT III" |
||
28 | |||
29 | |||
30 | def has_element(search: list, target: str) -> bool: |
||
31 | return len([elem for elem in search if (elem in target)]) != 0 |
||
32 | |||
33 | |||
34 | def has_startswith(search: list, target: str) -> bool: |
||
35 | return len([elem for elem in search if (target.startswith(elem))]) != 0 |
||
36 | |||
37 | |||
38 | def parse_args() -> argparse.Namespace: |
||
39 | parser = argparse.ArgumentParser(description="Audits a csv from utils/create_srg_export.py", |
||
40 | epilog="example: ./utils/srg_audit.py build/1642443529_stig_" |
||
41 | "export.csv") |
||
42 | parser.add_argument('file', type=str, help="A CSV file created by utils/create_srg_export.py") |
||
43 | return parser.parse_args() |
||
44 | |||
45 | |||
46 | def create_reduction_dicts() -> (dict, dict, dict, dict): |
||
47 | # The goal these dictionaries is to reduce all values the dictionary to zero, |
||
48 | # save optional values. |
||
49 | # |
||
50 | # Optional values should reduce to 2 if defined or if not defined should remain 3. |
||
51 | config_dict = {'Requirement': 1, |
||
52 | 'Vul Discussion': 1, |
||
53 | 'Status': 1, |
||
54 | 'Check': 1, |
||
55 | 'Fix': 1, |
||
56 | 'Severity': 1, |
||
57 | 'Mitigation': 0, |
||
58 | 'Artifact Description': 0, |
||
59 | 'Status Justification': 3, |
||
60 | 'CCI': 1, |
||
61 | 'SRGID': 1} |
||
62 | meets_dict = {'Requirement': 1, |
||
63 | 'VulDiscussion': 1, |
||
64 | 'Status': 1, |
||
65 | 'Check': 0, |
||
66 | 'Fix': 0, |
||
67 | 'Severity': 1, |
||
68 | 'Mitigation': 0, |
||
69 | 'Artifact Description': 1, |
||
70 | 'Status Justification': 1, |
||
71 | 'CCI': 1, |
||
72 | 'SRGID': 1} |
||
73 | does_not_meet = {'Requirement': 1, |
||
74 | 'Vul Discussion': 1, |
||
75 | 'Status': 1, |
||
76 | 'Check': 0, |
||
77 | 'Fix': 0, |
||
78 | 'Severity': 1, |
||
79 | 'Mitigation': 1, |
||
80 | 'Artifact Description': 0, |
||
81 | 'Status Justification': 1, |
||
82 | 'CCI': 1, |
||
83 | 'SRGID': 1} |
||
84 | not_applicable = {'Requirement': 1, |
||
85 | 'Vul Discussion': 0, |
||
86 | 'Status': 1, |
||
87 | 'Check': 0, |
||
88 | 'Fix': 0, |
||
89 | 'Severity': 1, |
||
90 | 'Mitigation': 1, |
||
91 | 'Artifact Description': 0, |
||
92 | 'Status Justification': 1, |
||
93 | 'CCI': 1, |
||
94 | 'SRGID': 1} |
||
95 | return config_dict, does_not_meet, meets_dict, not_applicable |
||
96 | |||
97 | |||
98 | def check_paths(file: str) -> None: |
||
99 | if not os.path.exists(file): |
||
100 | sys.stderr.write("Unable to perform audit.\n") |
||
101 | sys.stderr.write(f"File not found: {file}\n") |
||
102 | exit(1) |
||
103 | if not os.path.isfile(file): |
||
104 | sys.stderr.write("Unable to perform audit.\n") |
||
105 | sys.stderr.write(f"Input must be a file: {file}\n") |
||
106 | exit(2) |
||
107 | |||
108 | |||
109 | def validate_check_fix(results: dict, row: dict, row_id: str) -> None: |
||
110 | check_invalid = ['shall', 'should', 'please'] |
||
111 | check_invalid_start = ['ensure', 'interview'] |
||
112 | if has_element(check_invalid, row['Fix']): |
||
113 | results[row_id].append(Problems.FIX_BADWORDS.value) |
||
114 | if has_element(check_invalid, row['Check']): |
||
115 | results[row_id].append(Problems.CHECK_BADWORDS.value) |
||
116 | if has_startswith(check_invalid_start, row['Check']): |
||
117 | results[row_id].append(Problems.CHECK_BAD_START.value) |
||
118 | if has_startswith(check_invalid_start, row['Fix']): |
||
119 | results[row_id].append(Problems.FIX_BAD_START.value) |
||
120 | if 'this is a finding.' not in row['Check']: |
||
121 | results[row_id].append(Problems.CHECK_IS_A_FIND.value) |
||
122 | |||
123 | |||
124 | def get_results_dict(row: dict) -> dict: |
||
125 | config_dict, does_not_meet, meets_dict, not_applicable = create_reduction_dicts() |
||
126 | if row['Status'] == DisaStatus.AUTOMATED: |
||
127 | results = config_dict.copy() |
||
128 | elif row['Status'] == DisaStatus.INHERENTLY_MET: |
||
129 | results = meets_dict.copy() |
||
130 | elif row['Status'] == DisaStatus.DOES_NOT_MEET: |
||
131 | results = does_not_meet.copy() |
||
132 | elif row['Status'] == DisaStatus.NOT_APPLICABLE: |
||
133 | results = not_applicable.copy() |
||
134 | else: |
||
135 | results = None |
||
136 | return results |
||
137 | |||
138 | |||
139 | def process_row_results(errors: dict, results: dict, row: dict) -> None: |
||
140 | for result in results: |
||
141 | if results[result] == 1: |
||
142 | errors[get_row_id(row)].append(f'Field {result} is not defined.') |
||
143 | elif results[result] == -1: |
||
144 | errors[get_row_id(row)].append(f'Field {result} is defined and should not be.') |
||
145 | elif results[result] not in [0, 2, 3]: |
||
146 | raise AttributeError(f"Config data is not defined correctly. " |
||
147 | f"Result = {result}, {results[result]}") |
||
148 | |||
149 | |||
150 | def validate_base_rows(errors: dict, row: dict) -> None: |
||
151 | if get_row_id(row) not in errors: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
152 | errors[get_row_id(row)] = list() |
||
153 | if not row['Requirement']: |
||
154 | errors[get_row_id(row)].append(Problems.MISSING_REQUIREMENT.value) |
||
155 | req_bad_chars = ['\"', "&", "'", "<", ">"] |
||
156 | if has_element(req_bad_chars, row['Requirement']): |
||
157 | errors[get_row_id(row)].append(Problems.REQUIREMENT_HAS_INVALID_CHARS.value) |
||
158 | |||
159 | |||
160 | def set_blank_rows_results(results: dict, row: dict) -> None: |
||
161 | for col in row: |
||
162 | col_value = row[col].strip() |
||
163 | if col_value != '' and col in results: |
||
164 | results[col] -= 1 |
||
165 | |||
166 | |||
167 | def print_results(errors: dict) -> None: |
||
168 | for result in errors: |
||
169 | result_errors = errors[str(result)] |
||
170 | if len(result_errors) > 0: |
||
171 | print(result) |
||
172 | for error in result_errors: |
||
173 | print(f'\t{error}') |
||
174 | |||
175 | |||
176 | def get_row_id(row: dict) -> str: |
||
177 | if row['STIGID'] is not None and row['STIGID'] != '': |
||
178 | return row['STIGID'] |
||
179 | else: |
||
180 | return row['SRGID'] |
||
181 | |||
182 | |||
183 | def main(): |
||
184 | args = parse_args() |
||
185 | check_paths(args.file) |
||
186 | |||
187 | with open(args.file, 'r') as f: |
||
188 | reader = csv.DictReader(f) |
||
189 | errors = dict() |
||
190 | for row in reader: |
||
191 | validate_base_rows(errors, row) |
||
192 | results = get_results_dict(row) |
||
193 | if row['Status'] == DisaStatus.AUTOMATED: |
||
194 | validate_check_fix(errors, row, get_row_id(row)) |
||
195 | if row['Status'] not in DisaStatus.STATUSES: |
||
196 | errors[get_row_id(row)].append(Problems.INVALID_STATUS.value) |
||
197 | continue |
||
198 | set_blank_rows_results(results, row) |
||
199 | process_row_results(errors, results, row) |
||
200 | print_results(errors) |
||
201 | |||
202 | |||
203 | if __name__ == '__main__': |
||
204 | main() |
||
205 |