Total Complexity | 131 |
Total Lines | 516 |
Duplicated Lines | 15.12 % |
Coverage | 0% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like fix-rules often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | #!/usr/bin/env python2 |
||
2 | |||
3 | import sys |
||
4 | import os |
||
5 | import jinja2 |
||
6 | import argparse |
||
7 | |||
8 | from ssg import yaml, checks |
||
9 | from ssg.shims import input_func |
||
10 | import ssg |
||
11 | |||
12 | |||
13 | View Code Duplication | def has_empty_identifier(yaml_file, product_yaml=None): |
|
|
|||
14 | rule = yaml.open_and_macro_expand(yaml_file, product_yaml) |
||
15 | if 'identifiers' in rule and rule['identifiers'] is None: |
||
16 | return True |
||
17 | |||
18 | if 'identifiers' in rule and rule['identifiers'] is not None: |
||
19 | for _, value in rule['identifiers'].items(): |
||
20 | if str(value).strip() == "": |
||
21 | return True |
||
22 | return False |
||
23 | |||
24 | |||
25 | View Code Duplication | def has_empty_references(yaml_file, product_yaml=None): |
|
26 | rule = yaml.open_and_macro_expand(yaml_file, product_yaml) |
||
27 | if 'references' in rule and rule['references'] is None: |
||
28 | return True |
||
29 | |||
30 | if 'references' in rule and rule['references'] is not None: |
||
31 | for _, value in rule['references'].items(): |
||
32 | if str(value).strip() == "": |
||
33 | return True |
||
34 | return False |
||
35 | |||
36 | |||
37 | def has_prefix_cce(yaml_file, product_yaml=None): |
||
38 | rule = yaml.open_and_macro_expand(yaml_file, product_yaml) |
||
39 | if 'identifiers' in rule and rule['identifiers'] is not None: |
||
40 | for i_type, i_value in rule['identifiers'].items(): |
||
41 | if i_type[0:3] == 'cce': |
||
42 | has_prefix = i_value[0:3].upper() == 'CCE' |
||
43 | remainder_valid = checks.is_cce_format_valid("CCE-" + i_value[3:]) |
||
44 | remainder_valid |= checks.is_cce_format_valid("CCE-" + i_value[4:]) |
||
45 | return has_prefix and remainder_valid |
||
46 | return False |
||
47 | |||
48 | |||
49 | def has_invalid_cce(yaml_file, product_yaml=None): |
||
50 | rule = yaml.open_and_macro_expand(yaml_file, product_yaml) |
||
51 | if 'identifiers' in rule and rule['identifiers'] is not None: |
||
52 | for i_type, i_value in rule['identifiers'].items(): |
||
53 | if i_type[0:3] == 'cce': |
||
54 | if not checks.is_cce_value_valid("CCE-" + str(i_value)): |
||
55 | return True |
||
56 | return False |
||
57 | |||
58 | |||
59 | def has_int_identifier(yaml_file, product_yaml=None): |
||
60 | rule = yaml.open_and_macro_expand(yaml_file, product_yaml) |
||
61 | if 'identifiers' in rule and rule['identifiers'] is not None: |
||
62 | for _, value in rule['identifiers'].items(): |
||
63 | if type(value) != str: |
||
64 | return True |
||
65 | return False |
||
66 | |||
67 | |||
68 | def has_int_reference(yaml_file, product_yaml=None): |
||
69 | rule = yaml.open_and_macro_expand(yaml_file, product_yaml) |
||
70 | if 'references' in rule and rule['references'] is not None: |
||
71 | for _, value in rule['references'].items(): |
||
72 | if type(value) != str: |
||
73 | return True |
||
74 | return False |
||
75 | |||
76 | |||
77 | def find_rules(directory, func): |
||
78 | # Iterates over passed directory to correctly parse rules (which are |
||
79 | # YAML files with internal macros). The most recently seen product.yml |
||
80 | # takes precedence over previous product.yml, e.g.: |
||
81 | # |
||
82 | # a/product.yml |
||
83 | # a/b/product.yml -- will be selected for the following rule: |
||
84 | # a/b/c/something.rule |
||
85 | # |
||
86 | # The corresponding rule and contents of the product.yml are then passed |
||
87 | # into func(/path/to/rule, product_yaml_contents); if the result evaluates |
||
88 | # to true, the tuple (/path/to/rule, /path/to/product.yml) is saved as a |
||
89 | # result. |
||
90 | # |
||
91 | # This process mimics the build system and allows us to find rule files |
||
92 | # which satisfy the constraints of the passed func. |
||
93 | results = [] |
||
94 | product_yamls = {} |
||
95 | product_yaml_paths = {} |
||
96 | product_yaml = None |
||
97 | product_yaml_path = None |
||
98 | for root, dirs, files in os.walk(directory): |
||
99 | |||
100 | if "product.yml" in files: |
||
101 | product_yaml_path = os.path.join(root, "product.yml") |
||
102 | product_yaml = yaml.open_raw(product_yaml_path) |
||
103 | product_yamls[root] = product_yaml |
||
104 | product_yaml_paths[root] = product_yaml_path |
||
105 | # for d in dirs: |
||
106 | # product_yamls[os.path.join(root, d)] = product_yaml |
||
107 | # product_yaml_paths[os.path.join(root, d)] = product_yaml_path |
||
108 | elif root in product_yamls: |
||
109 | product_yaml = product_yamls[root] |
||
110 | product_yaml_path = product_yaml_paths[root] |
||
111 | # for d in dirs: |
||
112 | # product_yamls[os.path.join(root, d)] = product_yaml |
||
113 | # product_yaml_paths[os.path.join(root, d)] = product_yaml_path |
||
114 | else: |
||
115 | pass |
||
116 | |||
117 | for filename in files: |
||
118 | path = os.path.join(root, filename) |
||
119 | rule_filename_id = 'rule.yml' |
||
120 | rule_filename_id_len = len(rule_filename_id) |
||
121 | if len(path) < rule_filename_id_len \ |
||
122 | or path[-(rule_filename_id_len):] != rule_filename_id \ |
||
123 | or "tests/" in path: |
||
124 | continue |
||
125 | try: |
||
126 | if func(path, product_yaml): |
||
127 | results.append((path, product_yaml_path)) |
||
128 | except jinja2.exceptions.UndefinedError: |
||
129 | print("Failed to parse file %s (with product.yaml: %s). Skipping" |
||
130 | % (path, product_yaml_path)) |
||
131 | pass |
||
132 | |||
133 | return results |
||
134 | |||
135 | |||
136 | def print_file(file_contents): |
||
137 | for line_num in range(0, len(file_contents)): |
||
138 | print("%d: %s" % (line_num, file_contents[line_num])) |
||
139 | |||
140 | |||
141 | View Code Duplication | def find_section_lines(file_contents, sec): |
|
142 | # Hack to find a global key ("section"/sec) in a YAML-like file. |
||
143 | # All indented lines until the next global key are included in the range. |
||
144 | # For example: |
||
145 | # |
||
146 | # 0: not_it: |
||
147 | # 1: - value |
||
148 | # 2: this_one: |
||
149 | # 3: - 2 |
||
150 | # 4: - 5 |
||
151 | # 5: |
||
152 | # 6: nor_this: |
||
153 | # |
||
154 | # for the section "this_one", the result [(2, 5)] will be returned. |
||
155 | # Note that multiple sections may exist in a file and each will be |
||
156 | # identified and returned. |
||
157 | sec_ranges = [] |
||
158 | |||
159 | sec_id = sec + ":" |
||
160 | sec_len = len(sec_id) |
||
161 | end_num = len(file_contents) |
||
162 | line_num = 0 |
||
163 | |||
164 | while line_num < end_num: |
||
165 | if len(file_contents[line_num]) >= sec_len: |
||
166 | if file_contents[line_num][0:sec_len] == sec_id: |
||
167 | begin = line_num |
||
168 | line_num += 1 |
||
169 | while line_num < end_num: |
||
170 | if len(file_contents[line_num]) > 0 and file_contents[line_num][0] != ' ': |
||
171 | break |
||
172 | line_num += 1 |
||
173 | |||
174 | end = line_num - 1 |
||
175 | sec_ranges.append((begin, end)) |
||
176 | line_num += 1 |
||
177 | return sec_ranges |
||
178 | |||
179 | |||
180 | def remove_lines(file_contents, lines): |
||
181 | # Returns a series of lines and returns a new copy |
||
182 | new_file = [] |
||
183 | for line_num in range(0, len(file_contents)): |
||
184 | if line_num not in lines: |
||
185 | new_file.append(file_contents[line_num]) |
||
186 | |||
187 | return new_file |
||
188 | |||
189 | |||
190 | def remove_section_keys(file_contents, yaml_contents, section, removed_keys): |
||
191 | # Remove a series of keys from a section. Refuses to operate if there is more |
||
192 | # than one instance of the section. If the section is empty (because all keys |
||
193 | # are removed), then the section is also removed. Otherwise, only matching keys |
||
194 | # are removed. Note that all instances of the keys will be removed, if it appears |
||
195 | # more than once. |
||
196 | sec_ranges = find_section_lines(file_contents, section) |
||
197 | if len(sec_ranges) != 1: |
||
198 | raise RuntimeError("Refusing to fix file: %s -- could not find one section: %d" |
||
199 | % (path, sec_ranges)) |
||
200 | |||
201 | begin, end = sec_ranges[0] |
||
202 | r_lines = set() |
||
203 | |||
204 | if (yaml_contents[section] is None or len(yaml_contents[section].keys()) == len(removed_keys)): |
||
205 | r_lines = set(range(begin, end+1)) |
||
206 | print("Removing entire section since all keys are empty") |
||
207 | else: |
||
208 | # Don't include section header |
||
209 | for line_num in range(begin+1, end+1): |
||
210 | line = file_contents[line_num].strip() |
||
211 | len_line = len(line) |
||
212 | |||
213 | for key in removed_keys: |
||
214 | k_l = len(key)+1 |
||
215 | k_i = key + ":" |
||
216 | if len_line >= k_l and line[0:k_l] == k_i: |
||
217 | r_lines.add(line_num) |
||
218 | break |
||
219 | |||
220 | return remove_lines(file_contents, r_lines) |
||
221 | |||
222 | |||
223 | def rewrite_value_int_str(line): |
||
224 | # Rewrites a key's value to explicitly be a string. Assumes it starts |
||
225 | # as an integer. Takes a line. |
||
226 | key_end = line.index(':') |
||
227 | key = line[0:key_end] |
||
228 | value = line[key_end+1:].strip() |
||
229 | str_value = '"' + value + '"' |
||
230 | return key + ": " + str_value |
||
231 | |||
232 | |||
233 | def rewrite_value_remove_prefix(line): |
||
234 | # Rewrites a key's value to remove a "CCE" prefix. |
||
235 | key_end = line.index(':') |
||
236 | key = line[0:key_end] |
||
237 | value = line[key_end+1:].strip() |
||
238 | new_value = value |
||
239 | if checks.is_cce_format_valid("CCE-" + value[3:]): |
||
240 | new_value = value[3:] |
||
241 | elif checks.is_cce_format_valid("CCE-" + value[4:]): |
||
242 | new_value = value[4:] |
||
243 | return key + ": " + new_value |
||
244 | |||
245 | |||
246 | def rewrite_section_value(file_contents, yaml_contents, section, keys, transform): |
||
247 | # For a given section, rewrite the keys in int_keys to be strings. Refuses to |
||
248 | # operate if the given section appears more than once in the file. Assumes all |
||
249 | # instances of key are an integer; all will get updated. |
||
250 | new_contents = file_contents[:] |
||
251 | |||
252 | sec_ranges = find_section_lines(file_contents, section) |
||
253 | if len(sec_ranges) != 1: |
||
254 | raise RuntimeError("Refusing to fix file: %s -- could not find one section: %d" |
||
255 | % (path, sec_ranges)) |
||
256 | |||
257 | begin, end = sec_ranges[0] |
||
258 | r_lines = set() |
||
259 | |||
260 | # Don't include section header |
||
261 | for line_num in range(begin+1, end+1): |
||
262 | line = file_contents[line_num].strip() |
||
263 | len_line = len(line) |
||
264 | |||
265 | for key in keys: |
||
266 | k_l = len(key)+1 |
||
267 | k_i = key + ":" |
||
268 | |||
269 | if len_line >= k_l and line[0:k_l] == k_i: |
||
270 | new_contents[line_num] = transform(file_contents[line_num]) |
||
271 | break |
||
272 | |||
273 | return new_contents |
||
274 | |||
275 | |||
276 | def rewrite_section_value_int_str(file_contents, yaml_contents, section, int_keys): |
||
277 | return rewrite_section_value(file_contents, yaml_contents, section, int_keys, |
||
278 | rewrite_value_int_str) |
||
279 | |||
280 | |||
281 | View Code Duplication | def fix_empty_identifier(file_contents, yaml_contents): |
|
282 | section = 'identifiers' |
||
283 | |||
284 | empty_identifiers = [] |
||
285 | if yaml_contents[section] is not None: |
||
286 | for i_type, i_value in yaml_contents[section].items(): |
||
287 | if str(i_value).strip() == "": |
||
288 | empty_identifiers.append(i_type) |
||
289 | |||
290 | return remove_section_keys(file_contents, yaml_contents, section, empty_identifiers) |
||
291 | |||
292 | |||
293 | View Code Duplication | def fix_empty_reference(file_contents, yaml_contents): |
|
294 | section = 'references' |
||
295 | |||
296 | empty_identifiers = [] |
||
297 | |||
298 | if yaml_contents[section] is not None: |
||
299 | for i_type, i_value in yaml_contents[section].items(): |
||
300 | if str(i_value).strip() == "": |
||
301 | empty_identifiers.append(i_type) |
||
302 | |||
303 | return remove_section_keys(file_contents, yaml_contents, section, empty_identifiers) |
||
304 | |||
305 | |||
306 | def fix_prefix_cce(file_contents, yaml_contents): |
||
307 | section = 'identifiers' |
||
308 | |||
309 | prefixed_identifiers = [] |
||
310 | |||
311 | if yaml_contents[section] is not None: |
||
312 | for i_type, i_value in yaml_contents[section].items(): |
||
313 | if i_type[0:3] == 'cce': |
||
314 | has_prefix = i_value[0:3].upper() == 'CCE' |
||
315 | remainder_valid = checks.is_cce_format_valid("CCE-" + str(i_value[3:])) |
||
316 | remainder_valid |= checks.is_cce_format_valid("CCE-" + str(i_value[4:])) |
||
317 | if has_prefix and remainder_valid: |
||
318 | prefixed_identifiers.append(i_type) |
||
319 | |||
320 | return rewrite_section_value(file_contents, yaml_contents, section, prefixed_identifiers, |
||
321 | rewrite_value_remove_prefix) |
||
322 | |||
323 | |||
324 | def fix_invalid_cce(file_contents, yaml_contents): |
||
325 | section = 'identifiers' |
||
326 | |||
327 | invalid_identifiers = [] |
||
328 | |||
329 | if yaml_contents[section] is not None: |
||
330 | for i_type, i_value in yaml_contents[section].items(): |
||
331 | if i_type[0:3] == 'cce': |
||
332 | if not checks.is_cce_value_valid("CCE-" + str(i_value)): |
||
333 | invalid_identifiers.append(i_type) |
||
334 | |||
335 | return remove_section_keys(file_contents, yaml_contents, section, invalid_identifiers) |
||
336 | |||
337 | |||
338 | def fix_int_identifier(file_contents, yaml_contents): |
||
339 | section = 'identifiers' |
||
340 | |||
341 | int_identifiers = [] |
||
342 | for i_type, i_value in yaml_contents[section].items(): |
||
343 | if type(i_value) != str: |
||
344 | int_identifiers.append(i_type) |
||
345 | |||
346 | return rewrite_section_value_int_str(file_contents, yaml_contents, section, int_identifiers) |
||
347 | |||
348 | |||
349 | def fix_int_reference(file_contents, yaml_contents): |
||
350 | section = 'references' |
||
351 | |||
352 | int_identifiers = [] |
||
353 | for i_type, i_value in yaml_contents[section].items(): |
||
354 | if type(i_value) != str: |
||
355 | int_identifiers.append(i_type) |
||
356 | |||
357 | return rewrite_section_value_int_str(file_contents, yaml_contents, section, int_identifiers) |
||
358 | |||
359 | |||
360 | def fix_file(path, product_yaml, func): |
||
361 | file_contents = open(path, 'r').read().split("\n") |
||
362 | if file_contents[-1] == '': |
||
363 | file_contents = file_contents[:-1] |
||
364 | |||
365 | yaml_contents = yaml.open_and_macro_expand(path, product_yaml) |
||
366 | |||
367 | print("====BEGIN BEFORE====") |
||
368 | print_file(file_contents) |
||
369 | print("====END BEFORE====") |
||
370 | |||
371 | file_contents = func(file_contents, yaml_contents) |
||
372 | |||
373 | print("====BEGIN AFTER====") |
||
374 | print_file(file_contents) |
||
375 | print("====END AFTER====") |
||
376 | response = input_func("Confirm writing output to %s: (y/n): " % path) |
||
377 | if response.strip() == 'y': |
||
378 | f = open(path, 'w') |
||
379 | for line in file_contents: |
||
380 | f.write(line) |
||
381 | f.write("\n") |
||
382 | f.flush() |
||
383 | f.close() |
||
384 | |||
385 | |||
386 | def fix_empty_identifiers(directory): |
||
387 | results = find_rules(directory, has_empty_identifier) |
||
388 | print("Number of rules with empty identifiers: %d" % len(results)) |
||
389 | |||
390 | for result in results: |
||
391 | rule_path = result[0] |
||
392 | product_yaml_path = result[1] |
||
393 | |||
394 | product_yaml = None |
||
395 | if product_yaml_path is not None: |
||
396 | product_yaml = yaml.open_raw(product_yaml_path) |
||
397 | |||
398 | fix_file(rule_path, product_yaml, fix_empty_identifier) |
||
399 | |||
400 | |||
401 | def fix_empty_references(directory): |
||
402 | results = find_rules(directory, has_empty_references) |
||
403 | print("Number of rules with empty references: %d" % len(results)) |
||
404 | |||
405 | for result in results: |
||
406 | rule_path = result[0] |
||
407 | product_yaml_path = result[1] |
||
408 | |||
409 | product_yaml = None |
||
410 | if product_yaml_path is not None: |
||
411 | product_yaml = yaml.open_raw(product_yaml_path) |
||
412 | |||
413 | fix_file(rule_path, product_yaml, fix_empty_reference) |
||
414 | |||
415 | |||
416 | def find_prefix_cce(directory): |
||
417 | results = find_rules(directory, has_prefix_cce) |
||
418 | print("Number of rules with prefixed CCEs: %d" % len(results)) |
||
419 | |||
420 | for result in results: |
||
421 | rule_path = result[0] |
||
422 | product_yaml_path = result[1] |
||
423 | |||
424 | product_yaml = None |
||
425 | if product_yaml_path is not None: |
||
426 | product_yaml = yaml.open_raw(product_yaml_path) |
||
427 | |||
428 | fix_file(rule_path, product_yaml, fix_prefix_cce) |
||
429 | |||
430 | |||
431 | def find_invalid_cce(directory): |
||
432 | results = find_rules(directory, has_invalid_cce) |
||
433 | print("Number of rules with invalid CCEs: %d" % len(results)) |
||
434 | |||
435 | for result in results: |
||
436 | rule_path = result[0] |
||
437 | product_yaml_path = result[1] |
||
438 | |||
439 | product_yaml = None |
||
440 | if product_yaml_path is not None: |
||
441 | product_yaml = yaml.open_raw(product_yaml_path) |
||
442 | |||
443 | fix_file(rule_path, product_yaml, fix_invalid_cce) |
||
444 | |||
445 | |||
446 | def find_int_identifiers(directory): |
||
447 | results = find_rules(directory, has_int_identifier) |
||
448 | print("Number of rules with integer identifiers: %d" % len(results)) |
||
449 | |||
450 | for result in results: |
||
451 | rule_path = result[0] |
||
452 | product_yaml_path = result[1] |
||
453 | |||
454 | product_yaml = None |
||
455 | if product_yaml_path is not None: |
||
456 | product_yaml = yaml.open_raw(product_yaml_path) |
||
457 | |||
458 | fix_file(rule_path, product_yaml, fix_int_identifier) |
||
459 | |||
460 | |||
461 | def find_int_references(directory): |
||
462 | results = find_rules(directory, has_int_reference) |
||
463 | print("Number of rules with integer references: %d" % len(results)) |
||
464 | |||
465 | for result in results: |
||
466 | rule_path = result[0] |
||
467 | product_yaml_path = result[1] |
||
468 | |||
469 | product_yaml = None |
||
470 | if product_yaml_path is not None: |
||
471 | product_yaml = yaml.open_raw(product_yaml_path) |
||
472 | |||
473 | fix_file(rule_path, product_yaml, fix_int_reference) |
||
474 | |||
475 | |||
476 | def parse_args(): |
||
477 | parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, |
||
478 | description="Utility for fixing mistakes in .rule files", |
||
479 | epilog=""" |
||
480 | Commands: |
||
481 | \tempty_identifiers - check and fix rules with empty identifiers |
||
482 | \tprefixed_identifiers - check and fix rules with prefixed (CCE-) identifiers |
||
483 | \tinvalid_identifiers - check and fix rules with invalid identifiers |
||
484 | \tint_identifiers - check and fix rules with pseudo-integer identifiers |
||
485 | \tempty_references - check and fix rules with empty references |
||
486 | \tint_references - check and fix rules with pseudo-integer references |
||
487 | """) |
||
488 | parser.add_argument("command", help="Which fix to perform.", |
||
489 | choices=['empty_identifiers', 'prefixed_identifiers', |
||
490 | 'invalid_identifiers', 'int_identifiers', |
||
491 | 'empty_references', 'int_references']) |
||
492 | parser.add_argument("ssg_root", help="Path to root of ssg git directory") |
||
493 | return parser.parse_args() |
||
494 | |||
495 | |||
496 | def __main__(): |
||
497 | args = parse_args() |
||
498 | |||
499 | if args.command == 'empty_identifiers': |
||
500 | fix_empty_identifiers(args.ssg_root) |
||
501 | elif args.command == 'prefixed_identifiers': |
||
502 | find_prefix_cce(args.ssg_root) |
||
503 | elif args.command == 'invalid_identifiers': |
||
504 | find_invalid_cce(args.ssg_root) |
||
505 | elif args.command == 'int_identifiers': |
||
506 | find_int_identifiers(args.ssg_root) |
||
507 | elif args.command == 'empty_references': |
||
508 | fix_empty_references(args.ssg_root) |
||
509 | elif args.command == 'int_references': |
||
510 | find_int_references(args.ssg_root) |
||
511 | else: |
||
512 | sys.exit(1) |
||
513 | |||
514 | if __name__ == "__main__": |
||
515 | __main__() |
||
516 |