| Total Complexity | 45 |
| Total Lines | 302 |
| Duplicated Lines | 8.94 % |
| Coverage | 13.14% |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like ssg.rule_yaml often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """ |
||
| 2 | The rule_yaml module provides various utility functions for handling YAML files |
||
| 3 | containing Jinja macros, without having to parse the macros. |
||
| 4 | """ |
||
| 5 | |||
| 6 | 2 | from __future__ import absolute_import |
|
| 7 | 2 | from __future__ import print_function |
|
| 8 | |||
| 9 | 2 | import os |
|
| 10 | 2 | import sys |
|
| 11 | 2 | from collections import namedtuple, defaultdict |
|
| 12 | 2 | import yaml |
|
| 13 | |||
| 14 | 2 | from .rules import get_rule_dir_yaml |
|
| 15 | 2 | from .utils import read_file_list |
|
| 16 | |||
| 17 | |||
| 18 | 2 | def find_section_lines(file_contents, sec): |
|
| 19 | """ |
||
| 20 | Parses the given file_contents as YAML to find the section with the given identifier. |
||
| 21 | Note that this does not call into the yaml library and thus correctly handles jinja |
||
| 22 | macros at the expense of not being a strictly valid yaml parsing. |
||
| 23 | |||
| 24 | Returns a list of namedtuples (start, end) of the lines where section exists. |
||
| 25 | """ |
||
| 26 | |||
| 27 | # Hack to find a global key ("section"/sec) in a YAML-like file. |
||
| 28 | # All indented lines until the next global key are included in the range. |
||
| 29 | # For example: |
||
| 30 | # |
||
| 31 | # 0: not_it: |
||
| 32 | # 1: - value |
||
| 33 | # 2: this_one: |
||
| 34 | # 3: - 2 |
||
| 35 | # 4: - 5 |
||
| 36 | # 5: |
||
| 37 | # 6: nor_this: |
||
| 38 | # |
||
| 39 | # for the section "this_one", the result [(2, 5)] will be returned. |
||
| 40 | # Note that multiple sections may exist in a file and each will be |
||
| 41 | # identified and returned. |
||
| 42 | section = namedtuple('section', ['start', 'end']) |
||
| 43 | |||
| 44 | sec_ranges = [] |
||
| 45 | sec_id = sec + ":" |
||
| 46 | sec_len = len(sec_id) |
||
| 47 | end_num = len(file_contents) |
||
| 48 | line_num = 0 |
||
| 49 | |||
| 50 | while line_num < end_num: |
||
| 51 | if len(file_contents[line_num]) >= sec_len: |
||
| 52 | if file_contents[line_num][0:sec_len] == sec_id: |
||
| 53 | begin = line_num |
||
| 54 | line_num += 1 |
||
| 55 | while line_num < end_num: |
||
| 56 | nonempty_line = file_contents[line_num] |
||
| 57 | if nonempty_line and file_contents[line_num][0] != ' ': |
||
| 58 | break |
||
| 59 | line_num += 1 |
||
| 60 | |||
| 61 | end = line_num - 1 |
||
| 62 | sec_ranges.append(section(begin, end)) |
||
| 63 | line_num += 1 |
||
| 64 | |||
| 65 | return sec_ranges |
||
| 66 | |||
| 67 | |||
| 68 | 2 | def add_key_value(contents, key, start_line, new_value): |
|
| 69 | """ |
||
| 70 | Adds a new key to contents with the given value after line start_line, returning |
||
| 71 | the result. Also adds a blank line afterwards. |
||
| 72 | |||
| 73 | Does not modify the value of contents. |
||
| 74 | """ |
||
| 75 | |||
| 76 | new_contents = contents[:start_line] |
||
| 77 | new_contents.append("%s: %s" % (key, new_value)) |
||
| 78 | new_contents.append("") |
||
| 79 | new_contents.extend(contents[start_line:]) |
||
| 80 | |||
| 81 | return new_contents |
||
| 82 | |||
| 83 | |||
| 84 | 2 | View Code Duplication | def update_key_value(contents, key, old_value, new_value): |
|
|
|||
| 85 | """ |
||
| 86 | Find key in the contents of a file and replace its value with the new value, |
||
| 87 | returning the resulting file. This validates that the old value is constant and |
||
| 88 | hasn't changed since parsing its value. |
||
| 89 | |||
| 90 | Raises a ValueError when the key cannot be found in the given contents. |
||
| 91 | |||
| 92 | Does not modify the value of contents. |
||
| 93 | """ |
||
| 94 | |||
| 95 | new_contents = contents[:] |
||
| 96 | old_line = key + ": " + old_value |
||
| 97 | updated = False |
||
| 98 | |||
| 99 | for line_num in range(0, len(new_contents)): |
||
| 100 | line = new_contents[line_num] |
||
| 101 | if line == old_line: |
||
| 102 | new_contents[line_num] = key + ": " + new_value |
||
| 103 | updated = True |
||
| 104 | break |
||
| 105 | |||
| 106 | if not updated: |
||
| 107 | raise ValueError("For key:%s, cannot find the old value (%s) in the given " |
||
| 108 | "contents." % (key, old_value)) |
||
| 109 | |||
| 110 | return new_contents |
||
| 111 | |||
| 112 | |||
| 113 | 2 | def remove_lines(contents, lines): |
|
| 114 | """ |
||
| 115 | Remove the lines of the section from the parsed file, returning the new contents. |
||
| 116 | |||
| 117 | Does not modify the passed in contents. |
||
| 118 | """ |
||
| 119 | |||
| 120 | new_contents = contents[:lines.start] |
||
| 121 | new_contents.extend(contents[lines.end+1:]) |
||
| 122 | return new_contents |
||
| 123 | |||
| 124 | |||
| 125 | 2 | def parse_from_yaml(file_contents, lines): |
|
| 126 | """ |
||
| 127 | Parse the given line range as a yaml, returning the parsed object. |
||
| 128 | """ |
||
| 129 | |||
| 130 | new_file_arr = file_contents[lines.start:lines.end + 1] |
||
| 131 | new_file = "\n".join(new_file_arr) |
||
| 132 | return yaml.load(new_file, Loader=yaml.Loader) |
||
| 133 | |||
| 134 | |||
| 135 | 2 | def get_yaml_contents(rule_obj): |
|
| 136 | """ |
||
| 137 | From a rule_obj description, return a namedtuple of (path, contents); where |
||
| 138 | path is the path to the rule YAML and contents is the list of lines in |
||
| 139 | the file. |
||
| 140 | """ |
||
| 141 | |||
| 142 | file_description = namedtuple('file_description', ('path', 'contents')) |
||
| 143 | |||
| 144 | yaml_file = get_rule_dir_yaml(rule_obj['dir']) |
||
| 145 | if not os.path.exists(yaml_file): |
||
| 146 | raise ValueError("Error: yaml file does not exist for rule_id:%s" % |
||
| 147 | rule_obj['id'], file=sys.stderr) |
||
| 148 | |||
| 149 | yaml_contents = read_file_list(yaml_file) |
||
| 150 | |||
| 151 | return file_description(yaml_file, yaml_contents) |
||
| 152 | |||
| 153 | |||
| 154 | 2 | def parse_prodtype(prodtype): |
|
| 155 | """ |
||
| 156 | From a prodtype line, returns the set of products listed. |
||
| 157 | """ |
||
| 158 | |||
| 159 | return set(map(lambda x: x.strip(), prodtype.split(','))) |
||
| 160 | |||
| 161 | |||
| 162 | 2 | def get_section_lines(file_path, file_contents, key_name): |
|
| 163 | """ |
||
| 164 | From the given file_path and file_contents, find the lines describing the section |
||
| 165 | key_name and returns the line range of the section. |
||
| 166 | """ |
||
| 167 | |||
| 168 | section = find_section_lines(file_contents, key_name) |
||
| 169 | |||
| 170 | if len(section) > 1: |
||
| 171 | raise ValueError("Multiple instances (%d) of %s in %s; refusing to modify file." % |
||
| 172 | (len(section), key_name, file_path), file=sys.stderr) |
||
| 173 | |||
| 174 | elif len(section) == 1: |
||
| 175 | return section[0] |
||
| 176 | |||
| 177 | return None |
||
| 178 | |||
| 179 | |||
| 180 | 2 | def has_duplicated_subkeys(file_path, file_contents, sections): |
|
| 181 | """ |
||
| 182 | Checks whether a section has duplicated keys. Note that these are silently |
||
| 183 | eaten by the YAML parser we use. |
||
| 184 | """ |
||
| 185 | |||
| 186 | if isinstance(sections, str): |
||
| 187 | sections = [sections] |
||
| 188 | |||
| 189 | for section in sections: |
||
| 190 | # Get the lines in the file which match this section. If none exists, |
||
| 191 | # it should be safe to silently ignore it. Clearly if the section |
||
| 192 | # exists, there are no duplicated sections. |
||
| 193 | section_range = get_section_lines(file_path, file_contents, section) |
||
| 194 | if not section_range: |
||
| 195 | continue |
||
| 196 | |||
| 197 | # Get the YAML parser's version of events. :-) |
||
| 198 | parsed_section = parse_from_yaml(file_contents, section_range) |
||
| 199 | |||
| 200 | # Sort the YAML parser's subkeys. |
||
| 201 | parent_key = list(parsed_section.keys())[0] |
||
| 202 | subkeys = parsed_section[parent_key].keys() |
||
| 203 | |||
| 204 | # Create a dictionary for counting them. |
||
| 205 | subkey_counts = defaultdict(lambda: 0) |
||
| 206 | |||
| 207 | # Iterate over the lines, see if they match a known key. Ignore the |
||
| 208 | # first line (as it is the section header). |
||
| 209 | for line_num in range(section_range.start+1, section_range.end): |
||
| 210 | line = file_contents[line_num] |
||
| 211 | if not line: |
||
| 212 | continue |
||
| 213 | |||
| 214 | # We'll be lazy for the time being. Iterate over all keys. |
||
| 215 | for key in subkeys: |
||
| 216 | our_key = ' ' + key + ':' |
||
| 217 | if our_key in line: |
||
| 218 | subkey_counts[our_key] += 1 |
||
| 219 | if subkey_counts[our_key] > 1: |
||
| 220 | print("Duplicated key " + our_key + " in " + section + " of " + file_path) |
||
| 221 | return True |
||
| 222 | |||
| 223 | return False |
||
| 224 | |||
| 225 | |||
| 226 | 2 | def sort_section_keys(file_path, file_contents, sections, sort_func=None): |
|
| 227 | """ |
||
| 228 | Sort subkeys in a YAML file's section. |
||
| 229 | """ |
||
| 230 | |||
| 231 | if isinstance(sections, str): |
||
| 232 | sections = [sections] |
||
| 233 | |||
| 234 | new_contents = file_contents[:] |
||
| 235 | |||
| 236 | for section in sections: |
||
| 237 | section_range = get_section_lines(file_path, new_contents, section) |
||
| 238 | if not section_range: |
||
| 239 | continue |
||
| 240 | |||
| 241 | # Start by parsing the lines as YAML. |
||
| 242 | parsed_section = parse_from_yaml(new_contents, section_range) |
||
| 243 | |||
| 244 | # Ignore the section header. This header is included in the start range, |
||
| 245 | # so just increment by one. |
||
| 246 | start_offset = 1 |
||
| 247 | while not new_contents[section_range.start + start_offset].strip(): |
||
| 248 | start_offset += 1 |
||
| 249 | |||
| 250 | # Ignore any trailing empty lines. |
||
| 251 | end_offset = 0 |
||
| 252 | while not new_contents[section_range.end - end_offset].strip(): |
||
| 253 | end_offset += 1 |
||
| 254 | |||
| 255 | # Validate we only have a single section. |
||
| 256 | assert len(parsed_section.keys()) == 1 |
||
| 257 | |||
| 258 | # Sort the parsed subkeys. |
||
| 259 | parent_key = list(parsed_section.keys())[0] |
||
| 260 | subkeys = sorted(parsed_section[parent_key].keys(), key=sort_func) |
||
| 261 | |||
| 262 | # Don't bother if there are zero or one subkeys. Sorting order thus |
||
| 263 | # doesn't matter. |
||
| 264 | if not subkeys or len(subkeys) == 1: |
||
| 265 | continue |
||
| 266 | |||
| 267 | # Now we need to map sorted subkeys onto lines in the new contents, |
||
| 268 | # so we can re-order them appropriately. We'll assume the section is |
||
| 269 | # small so we'll do it in O(n^2). |
||
| 270 | subkey_mapping = dict() |
||
| 271 | for key in subkeys: |
||
| 272 | our_line = None |
||
| 273 | spaced_key = ' ' + key + ':' |
||
| 274 | tabbed_key = '\t' + key + ':' |
||
| 275 | range_start = section_range.start + start_offset |
||
| 276 | range_end = section_range.end - end_offset + 1 |
||
| 277 | for line_num in range(range_start, range_end): |
||
| 278 | this_line = new_contents[line_num] |
||
| 279 | if spaced_key in this_line or tabbed_key in this_line: |
||
| 280 | if our_line: |
||
| 281 | # Not supposed to be possible to have multiple keys |
||
| 282 | # matching the same value in this file. We should've |
||
| 283 | # already fixed this with fix-rules.py's duplicate_subkeys. |
||
| 284 | msg = "File {0} has duplicated key {1}: {2} vs {3}" |
||
| 285 | msg = msg.format(file_path, key, our_line, this_line) |
||
| 286 | raise ValueError(msg) |
||
| 287 | our_line = this_line |
||
| 288 | assert our_line |
||
| 289 | subkey_mapping[key] = our_line |
||
| 290 | |||
| 291 | # Now we'll remove all the section's subkeys and start over. Include |
||
| 292 | # section header but not any of the keys (or potential blank lines |
||
| 293 | # in the interior -- but we preserve them on either end of the |
||
| 294 | # section). |
||
| 295 | prefix = new_contents[:section_range.start+start_offset] |
||
| 296 | contents = list(map(lambda key: subkey_mapping[key], subkeys)) |
||
| 297 | suffix = new_contents[section_range.end+1-end_offset:] |
||
| 298 | |||
| 299 | new_contents = prefix + contents + suffix |
||
| 300 | |||
| 301 | return new_contents |
||
| 302 |