1 | #!/usr/bin/python3 |
||
2 | import argparse |
||
3 | import collections |
||
4 | import json |
||
5 | import os |
||
6 | import yaml |
||
7 | |||
8 | # NOTE: This is not to be confused with the https://pypi.org/project/ssg/ |
||
9 | # package. The ssg package we're referencing here is actually a relative import |
||
10 | # within this repository. Because of this, you need to ensure |
||
11 | # ComplianceAsCode/content/ssg is discoverable from PYTHONPATH before you |
||
12 | # invoke this script. |
||
13 | try: |
||
14 | from ssg import controls |
||
15 | import ssg.products |
||
16 | except ModuleNotFoundError as e: |
||
17 | # NOTE: Only emit this message if we're dealing with an import error for |
||
18 | # ssg. Since the local ssg module imports other things, like PyYAML, we |
||
19 | # don't want to emit misleading errors for legit dependencies issues if the |
||
20 | # user hasn't installed PyYAML or other transitive dependencies from ssg. |
||
21 | # We should revisit this if or when we decide to implement a python package |
||
22 | # management strategy for the python scripts provided in this repository. |
||
23 | if e.name == 'ssg': |
||
24 | msg = """Unable to import local 'ssg' module. |
||
25 | |||
26 | The 'ssg' package from within this repository must be discoverable before |
||
27 | invoking this script. Make sure the top-level directory of the |
||
28 | ComplianceAsCode/content repository is available in the PYTHONPATH environment |
||
29 | variable (example: $ export PYTHONPATH=($pwd)). |
||
30 | HINT: $ source .pyenv.sh |
||
31 | """ |
||
32 | raise RuntimeError(msg) from e |
||
33 | raise |
||
34 | |||
35 | |||
36 | SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) |
||
37 | |||
38 | |||
39 | def print_options(opts): |
||
40 | if len(opts) > 0: |
||
41 | print("Available options are:\n - " + "\n - ".join(opts)) |
||
42 | else: |
||
43 | print("The controls file is not written appropriately.") |
||
44 | |||
45 | |||
46 | def validate_args(ctrlmgr, args): |
||
47 | """ Validates that the appropriate args were given |
||
48 | and that they're valid entries in the control manager.""" |
||
49 | |||
50 | policy = None |
||
51 | try: |
||
52 | policy = ctrlmgr._get_policy(args.id) |
||
53 | except ValueError as e: |
||
54 | print("Error:", e) |
||
55 | print_options(ctrlmgr.policies.keys()) |
||
56 | exit(1) |
||
57 | |||
58 | try: |
||
59 | policy.get_level_with_ancestors_sequence(args.level) |
||
60 | except ValueError as e: |
||
61 | print("Error:", e) |
||
62 | print_options(policy.levels_by_id.keys()) |
||
63 | exit(1) |
||
64 | |||
65 | |||
66 | def get_available_products(): |
||
67 | products_dir = os.path.join(SSG_ROOT, "products") |
||
68 | try: |
||
69 | return os.listdir(products_dir) |
||
70 | except Exception as e: |
||
71 | print(e) |
||
72 | exit(1) |
||
73 | |||
74 | |||
75 | def validate_product(product): |
||
76 | products = get_available_products() |
||
77 | if product not in products: |
||
78 | print(f"Error: Product '{product}' is not valid.") |
||
79 | print_options(products) |
||
80 | exit(1) |
||
81 | |||
82 | |||
83 | def get_parameter_from_yaml(yaml_file: str, section: str) -> list: |
||
84 | with open(yaml_file, 'r') as file: |
||
85 | try: |
||
86 | yaml_content = yaml.safe_load(file) |
||
87 | return yaml_content[section] |
||
88 | except yaml.YAMLError as e: |
||
89 | print(e) |
||
90 | |||
91 | |||
92 | def get_controls_from_profiles(controls: list, profiles_files: list, used_controls: set) -> set: |
||
93 | for file in profiles_files: |
||
94 | selections = get_parameter_from_yaml(file, 'selections') |
||
95 | for selection in selections: |
||
96 | if any(selection.startswith(control) for control in controls): |
||
97 | used_controls.add(selection.split(':')[0]) |
||
98 | return used_controls |
||
99 | |||
100 | |||
101 | def get_controls_used_by_products(ctrls_mgr: controls.ControlsManager, products: list) -> list: |
||
102 | used_controls = set() |
||
103 | controls = ctrls_mgr.policies.keys() |
||
104 | for product in products: |
||
105 | profiles_files = get_product_profiles_files(product) |
||
106 | used_controls = get_controls_from_profiles(controls, profiles_files, used_controls) |
||
107 | return used_controls |
||
108 | |||
109 | |||
110 | def get_policy_levels(ctrls_mgr: object, control_id: str) -> list: |
||
111 | policy = ctrls_mgr._get_policy(control_id) |
||
112 | return policy.levels_by_id.keys() |
||
113 | |||
114 | |||
115 | def get_product_dir(product): |
||
116 | validate_product(product) |
||
117 | return os.path.join(SSG_ROOT, "products", product) |
||
118 | |||
119 | |||
120 | def get_product_profiles_files(product: str) -> list: |
||
121 | product_yaml = load_product_yaml(product) |
||
122 | return ssg.products.get_profile_files_from_root(product_yaml, product_yaml) |
||
123 | |||
124 | |||
125 | def get_product_yaml(product): |
||
126 | product_dir = get_product_dir(product) |
||
127 | product_yml = os.path.join(product_dir, "product.yml") |
||
128 | if os.path.exists(product_yml): |
||
129 | return product_yml |
||
130 | print(f"'{product_yml}' file was not found.") |
||
131 | exit(1) |
||
132 | |||
133 | |||
134 | def load_product_yaml(product: str) -> yaml: |
||
135 | product_yaml = get_product_yaml(product) |
||
136 | return ssg.products.load_product_yaml(product_yaml) |
||
137 | |||
138 | |||
139 | def load_controls_manager(controls_dir: str, product: str) -> object: |
||
140 | product_yaml = load_product_yaml(product) |
||
141 | ctrls_mgr = controls.ControlsManager(controls_dir, product_yaml) |
||
142 | ctrls_mgr.load() |
||
143 | return ctrls_mgr |
||
144 | |||
145 | |||
146 | def get_formatted_name(text_name): |
||
147 | for special_char in '-. ': |
||
148 | text_name = text_name.replace(special_char, '_') |
||
149 | return text_name |
||
150 | |||
151 | |||
152 | def count_implicit_status(ctrls, status_count): |
||
153 | automated = status_count[controls.Status.AUTOMATED] |
||
154 | documentation = status_count[controls.Status.DOCUMENTATION] |
||
155 | inherently_met = status_count[controls.Status.INHERENTLY_MET] |
||
156 | manual = status_count[controls.Status.MANUAL] |
||
157 | not_applicable = status_count[controls.Status.NOT_APPLICABLE] |
||
158 | pending = status_count[controls.Status.PENDING] |
||
159 | |||
160 | status_count['all'] = len(ctrls) |
||
161 | status_count['applicable'] = len(ctrls) - not_applicable |
||
162 | status_count['assessed'] = status_count['applicable'] - pending |
||
163 | status_count['not assessed'] = status_count['applicable'] - status_count['assessed'] |
||
164 | status_count['full coverage'] = automated + documentation + inherently_met + manual |
||
165 | return status_count |
||
166 | |||
167 | |||
168 | def create_implicit_control_lists(ctrls, control_list): |
||
169 | does_not_meet = control_list[controls.Status.DOES_NOT_MEET] |
||
170 | not_applicable = control_list[controls.Status.NOT_APPLICABLE] |
||
171 | partial = control_list[controls.Status.PARTIAL] |
||
172 | pending = control_list[controls.Status.PENDING] |
||
173 | planned = control_list[controls.Status.PLANNED] |
||
174 | supported = control_list[controls.Status.SUPPORTED] |
||
175 | |||
176 | control_list['all'] = ctrls |
||
177 | control_list['applicable'] = ctrls - not_applicable |
||
178 | control_list['assessed'] = control_list['applicable'] - pending |
||
179 | control_list['not assessed'] = control_list['applicable'] - control_list['assessed'] |
||
180 | control_list['full coverage'] = ctrls - does_not_meet - not_applicable - partial\ |
||
181 | - pending - planned - supported |
||
182 | return control_list |
||
183 | |||
184 | |||
185 | def count_rules_and_vars_in_control(ctrl): |
||
186 | Counts = collections.namedtuple('Counts', ['rules', 'variables']) |
||
187 | rules_count = variables_count = 0 |
||
188 | for item in ctrl.rules: |
||
189 | if "=" in item: |
||
190 | variables_count += 1 |
||
191 | else: |
||
192 | rules_count += 1 |
||
193 | return Counts(rules_count, variables_count) |
||
194 | |||
195 | |||
196 | def count_rules_and_vars(ctrls): |
||
197 | rules_total = variables_total = 0 |
||
198 | for ctrl in ctrls: |
||
199 | content_counts = count_rules_and_vars_in_control(ctrl) |
||
200 | rules_total += content_counts.rules |
||
201 | variables_total += content_counts.variables |
||
202 | return rules_total, variables_total |
||
203 | |||
204 | |||
205 | def count_controls_by_status(ctrls): |
||
206 | status_count = collections.defaultdict(int) |
||
207 | control_list = collections.defaultdict(set) |
||
208 | |||
209 | for status in controls.Status.get_status_list(): |
||
210 | status_count[status] = 0 |
||
211 | |||
212 | for ctrl in ctrls: |
||
213 | status_count[str(ctrl.status)] += 1 |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
214 | control_list[str(ctrl.status)].add(ctrl) |
||
215 | |||
216 | status_count = count_implicit_status(ctrls, status_count) |
||
217 | control_list = create_implicit_control_lists(ctrls, control_list) |
||
218 | |||
219 | return status_count, control_list |
||
220 | |||
221 | |||
222 | def print_specific_stat(status, current, total): |
||
223 | if current > 0: |
||
224 | print("{status:16} {current:6} / {total:3} = {percent:4}%".format( |
||
225 | status=status, |
||
226 | percent=round((current / total) * 100.00, 2), |
||
227 | current=current, |
||
228 | total=total)) |
||
229 | |||
230 | |||
231 | def sort_controls_by_id(control_list): |
||
232 | return sorted([(str(c.id), c.title) for c in control_list]) |
||
233 | |||
234 | |||
235 | def print_controls(status_count, control_list, args): |
||
236 | status = args.status |
||
237 | if status not in status_count: |
||
238 | print("Error: The informed status is not available") |
||
239 | print_options(status_count) |
||
240 | exit(1) |
||
241 | |||
242 | if status_count[status] > 0: |
||
243 | print("\nList of the {status} ({total}) controls:".format( |
||
244 | total=status_count[status], status=status)) |
||
245 | |||
246 | for ctrl in sort_controls_by_id(control_list[status]): |
||
247 | print("{id:>16} - {title}".format(id=ctrl[0], title=ctrl[1])) |
||
248 | else: |
||
249 | print("There is no controls with {status} status.".format(status=status)) |
||
250 | |||
251 | |||
252 | def print_stats(status_count, control_list, rules_count, vars_count, args): |
||
253 | implicit_status = controls.Status.get_status_list() |
||
254 | explicit_status = status_count.keys() - implicit_status |
||
255 | |||
256 | print("General stats:") |
||
257 | for status in sorted(explicit_status): |
||
258 | print_specific_stat(status, status_count[status], status_count['all']) |
||
259 | |||
260 | print("\nStats grouped by status:") |
||
261 | for status in sorted(implicit_status): |
||
262 | print_specific_stat(status, status_count[status], status_count['applicable']) |
||
263 | |||
264 | print(f"\nRules and Variables in {args.id} - {args.level}:") |
||
265 | print(f'{rules_count} rules are selected') |
||
266 | print(f'{vars_count} variables are explicitly defined') |
||
267 | |||
268 | if args.show_controls: |
||
269 | print_controls(status_count, control_list, args) |
||
270 | |||
271 | |||
272 | def print_stats_json(product, id, level, control_list): |
||
273 | data = dict() |
||
274 | data["format_version"] = "v0.0.3" |
||
275 | data["product_name"] = product |
||
276 | data["benchmark"] = dict() |
||
277 | data["benchmark"]["name"] = id |
||
278 | data["benchmark"]["baseline"] = level |
||
279 | data["total_controls"] = len(control_list['applicable']) |
||
280 | data["addressed_controls"] = dict() |
||
281 | |||
282 | for status in sorted(control_list.keys()): |
||
283 | json_key_name = get_formatted_name(status) |
||
284 | data["addressed_controls"][json_key_name] = [ |
||
285 | sorted(str(c.id) for c in (control_list[status]))] |
||
286 | print(json.dumps(data)) |
||
287 | |||
288 | |||
289 | def stats(args): |
||
290 | ctrls_mgr = load_controls_manager(args.controls_dir, args.product) |
||
291 | validate_args(ctrls_mgr, args) |
||
292 | ctrls = set(ctrls_mgr.get_all_controls_of_level(args.id, args.level)) |
||
293 | total = len(ctrls) |
||
294 | |||
295 | if total == 0: |
||
296 | print("No controls found with the given inputs. Maybe try another level.") |
||
297 | exit(1) |
||
298 | |||
299 | status_count, control_list = count_controls_by_status(ctrls) |
||
300 | rules_count, vars_count = count_rules_and_vars(ctrls) |
||
301 | |||
302 | if args.output_format == 'json': |
||
303 | print_stats_json(args.product, args.id, args.level, control_list) |
||
304 | else: |
||
305 | print_stats(status_count, control_list, rules_count, vars_count, args) |
||
306 | |||
307 | |||
308 | subcmds = dict( |
||
309 | stats=stats |
||
310 | ) |
||
311 | |||
312 | |||
313 | def parse_arguments(): |
||
314 | parser = argparse.ArgumentParser( |
||
315 | description="Tool used to evaluate control files", |
||
316 | epilog="Usage example: utils/controleval.py stats -i cis_rhel8 -l l2_server -p rhel8") |
||
317 | parser.add_argument( |
||
318 | '--controls-dir', default='./controls/', help=( |
||
319 | "Directory that contains control files with policy controls. " |
||
320 | "e.g.: ~/scap-security-guide/controls")) |
||
321 | subparsers = parser.add_subparsers(dest='subcmd', required=True) |
||
322 | |||
323 | stats_parser = subparsers.add_parser( |
||
324 | 'stats', |
||
325 | help="calculate and return the statistics for the given benchmark") |
||
326 | stats_parser.add_argument( |
||
327 | '-i', '--id', required=True, |
||
328 | help="the ID or name of the control file in the 'controls' directory") |
||
329 | stats_parser.add_argument( |
||
330 | '-l', '--level', required=True, |
||
331 | help="the compliance target level to analyze") |
||
332 | stats_parser.add_argument( |
||
333 | '-o', '--output-format', choices=['json'], |
||
334 | help="The output format of the result") |
||
335 | stats_parser.add_argument( |
||
336 | '-p', '--product', |
||
337 | help="product to check has required references") |
||
338 | stats_parser.add_argument( |
||
339 | '--show-controls', action='store_true', |
||
340 | help="list the controls and their respective status") |
||
341 | stats_parser.add_argument( |
||
342 | '-s', '--status', default='all', |
||
343 | help="status used to filter the controls list output") |
||
344 | return parser.parse_args() |
||
345 | |||
346 | |||
347 | def main(): |
||
348 | args = parse_arguments() |
||
349 | subcmds[args.subcmd](args) |
||
350 | |||
351 | |||
352 | if __name__ == "__main__": |
||
353 | main() |
||
354 |