Total Complexity | 45 |
Total Lines | 302 |
Duplicated Lines | 4.64 % |
Coverage | 0% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like ssg.rule_yaml often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """ |
||
2 | The rule_yaml module provides various utility functions for handling YAML files |
||
3 | containing Jinja macros, without having to parse the macros. |
||
4 | """ |
||
5 | |||
6 | from __future__ import absolute_import |
||
7 | from __future__ import print_function |
||
8 | |||
9 | import os |
||
10 | import sys |
||
11 | from collections import namedtuple, defaultdict |
||
12 | import yaml |
||
13 | |||
14 | from .rules import get_rule_dir_yaml |
||
15 | from .utils import read_file_list |
||
16 | |||
17 | |||
18 | def find_section_lines(file_contents, sec): |
||
19 | """ |
||
20 | Parses the given file_contents as YAML to find the section with the given identifier. |
||
21 | Note that this does not call into the yaml library and thus correctly handles jinja |
||
22 | macros at the expense of not being a strictly valid yaml parsing. |
||
23 | |||
24 | Returns a list of namedtuples (start, end) of the lines where section exists. |
||
25 | """ |
||
26 | |||
27 | # Hack to find a global key ("section"/sec) in a YAML-like file. |
||
28 | # All indented lines until the next global key are included in the range. |
||
29 | # For example: |
||
30 | # |
||
31 | # 0: not_it: |
||
32 | # 1: - value |
||
33 | # 2: this_one: |
||
34 | # 3: - 2 |
||
35 | # 4: - 5 |
||
36 | # 5: |
||
37 | # 6: nor_this: |
||
38 | # |
||
39 | # for the section "this_one", the result [(2, 5)] will be returned. |
||
40 | # Note that multiple sections may exist in a file and each will be |
||
41 | # identified and returned. |
||
42 | section = namedtuple('section', ['start', 'end']) |
||
43 | |||
44 | sec_ranges = [] |
||
45 | sec_id = sec + ":" |
||
46 | sec_len = len(sec_id) |
||
47 | end_num = len(file_contents) |
||
48 | line_num = 0 |
||
49 | |||
50 | View Code Duplication | while line_num < end_num: |
|
|
|||
51 | if len(file_contents[line_num]) >= sec_len: |
||
52 | if file_contents[line_num][0:sec_len] == sec_id: |
||
53 | begin = line_num |
||
54 | line_num += 1 |
||
55 | while line_num < end_num: |
||
56 | nonempty_line = file_contents[line_num] |
||
57 | if nonempty_line and file_contents[line_num][0] != ' ': |
||
58 | break |
||
59 | line_num += 1 |
||
60 | |||
61 | end = line_num - 1 |
||
62 | sec_ranges.append(section(begin, end)) |
||
63 | line_num += 1 |
||
64 | |||
65 | return sec_ranges |
||
66 | |||
67 | |||
68 | def add_key_value(contents, key, start_line, new_value): |
||
69 | """ |
||
70 | Adds a new key to contents with the given value after line start_line, returning |
||
71 | the result. Also adds a blank line afterwards. |
||
72 | |||
73 | Does not modify the value of contents. |
||
74 | """ |
||
75 | |||
76 | new_contents = contents[:start_line] |
||
77 | new_contents.append("%s: %s" % (key, new_value)) |
||
78 | new_contents.append("") |
||
79 | new_contents.extend(contents[start_line:]) |
||
80 | |||
81 | return new_contents |
||
82 | |||
83 | |||
84 | def update_key_value(contents, key, old_value, new_value): |
||
85 | """ |
||
86 | Find key in the contents of a file and replace its value with the new value, |
||
87 | returning the resulting file. This validates that the old value is constant and |
||
88 | hasn't changed since parsing its value. |
||
89 | |||
90 | Raises a ValueError when the key cannot be found in the given contents. |
||
91 | |||
92 | Does not modify the value of contents. |
||
93 | """ |
||
94 | |||
95 | new_contents = contents[:] |
||
96 | old_line = key + ": " + old_value |
||
97 | updated = False |
||
98 | |||
99 | for line_num in range(0, len(new_contents)): |
||
100 | line = new_contents[line_num] |
||
101 | if line == old_line: |
||
102 | new_contents[line_num] = key + ": " + new_value |
||
103 | updated = True |
||
104 | break |
||
105 | |||
106 | if not updated: |
||
107 | raise ValueError("For key:%s, cannot find the old value (%s) in the given " |
||
108 | "contents." % (key, old_value)) |
||
109 | |||
110 | return new_contents |
||
111 | |||
112 | |||
113 | def remove_lines(contents, lines): |
||
114 | """ |
||
115 | Remove the lines of the section from the parsed file, returning the new contents. |
||
116 | |||
117 | Does not modify the passed in contents. |
||
118 | """ |
||
119 | |||
120 | new_contents = contents[:lines.start] |
||
121 | new_contents.extend(contents[lines.end+1:]) |
||
122 | return new_contents |
||
123 | |||
124 | |||
125 | def parse_from_yaml(file_contents, lines): |
||
126 | """ |
||
127 | Parse the given line range as a yaml, returning the parsed object. |
||
128 | """ |
||
129 | |||
130 | new_file_arr = file_contents[lines.start:lines.end + 1] |
||
131 | new_file = "\n".join(new_file_arr) |
||
132 | return yaml.load(new_file, Loader=yaml.Loader) |
||
133 | |||
134 | |||
135 | def get_yaml_contents(rule_obj): |
||
136 | """ |
||
137 | From a rule_obj description, return a namedtuple of (path, contents); where |
||
138 | path is the path to the rule YAML and contents is the list of lines in |
||
139 | the file. |
||
140 | """ |
||
141 | |||
142 | file_description = namedtuple('file_description', ('path', 'contents')) |
||
143 | |||
144 | yaml_file = get_rule_dir_yaml(rule_obj['dir']) |
||
145 | if not os.path.exists(yaml_file): |
||
146 | raise ValueError("Error: yaml file does not exist for rule_id:%s" % |
||
147 | rule_obj['id'], file=sys.stderr) |
||
148 | |||
149 | yaml_contents = read_file_list(yaml_file) |
||
150 | |||
151 | return file_description(yaml_file, yaml_contents) |
||
152 | |||
153 | |||
154 | def parse_prodtype(prodtype): |
||
155 | """ |
||
156 | From a prodtype line, returns the set of products listed. |
||
157 | """ |
||
158 | |||
159 | return set(map(lambda x: x.strip(), prodtype.split(','))) |
||
160 | |||
161 | |||
162 | def get_section_lines(file_path, file_contents, key_name): |
||
163 | """ |
||
164 | From the given file_path and file_contents, find the lines describing the section |
||
165 | key_name and returns the line range of the section. |
||
166 | """ |
||
167 | |||
168 | section = find_section_lines(file_contents, key_name) |
||
169 | |||
170 | if len(section) > 1: |
||
171 | raise ValueError("Multiple instances (%d) of %s in %s; refusing to modify file." % |
||
172 | (len(section), key_name, file_path), file=sys.stderr) |
||
173 | |||
174 | elif len(section) == 1: |
||
175 | return section[0] |
||
176 | |||
177 | return None |
||
178 | |||
179 | |||
180 | def has_duplicated_subkeys(file_path, file_contents, sections): |
||
181 | """ |
||
182 | Checks whether a section has duplicated keys. Note that these are silently |
||
183 | eaten by the YAML parser we use. |
||
184 | """ |
||
185 | |||
186 | if isinstance(sections, str): |
||
187 | sections = [sections] |
||
188 | |||
189 | for section in sections: |
||
190 | # Get the lines in the file which match this section. If none exists, |
||
191 | # it should be safe to silently ignore it. Clearly if the section |
||
192 | # exists, there are no duplicated sections. |
||
193 | section_range = get_section_lines(file_path, file_contents, section) |
||
194 | if not section_range: |
||
195 | continue |
||
196 | |||
197 | # Get the YAML parser's version of events. :-) |
||
198 | parsed_section = parse_from_yaml(file_contents, section_range) |
||
199 | |||
200 | # Sort the YAML parser's subkeys. |
||
201 | parent_key = list(parsed_section.keys())[0] |
||
202 | subkeys = parsed_section[parent_key].keys() |
||
203 | |||
204 | # Create a dictionary for counting them. |
||
205 | subkey_counts = defaultdict(lambda: 0) |
||
206 | |||
207 | # Iterate over the lines, see if they match a known key. Ignore the |
||
208 | # first line (as it is the section header). |
||
209 | for line_num in range(section_range.start+1, section_range.end): |
||
210 | line = file_contents[line_num] |
||
211 | if not line: |
||
212 | continue |
||
213 | |||
214 | # We'll be lazy for the time being. Iterate over all keys. |
||
215 | for key in subkeys: |
||
216 | our_key = ' ' + key + ':' |
||
217 | if our_key in line: |
||
218 | subkey_counts[our_key] += 1 |
||
219 | if subkey_counts[our_key] > 1: |
||
220 | print("Duplicated key " + our_key + " in " + section + " of " + file_path) |
||
221 | return True |
||
222 | |||
223 | return False |
||
224 | |||
225 | |||
226 | def sort_section_keys(file_path, file_contents, sections, sort_func=None): |
||
227 | """ |
||
228 | Sort subkeys in a YAML file's section. |
||
229 | """ |
||
230 | |||
231 | if isinstance(sections, str): |
||
232 | sections = [sections] |
||
233 | |||
234 | new_contents = file_contents[:] |
||
235 | |||
236 | for section in sections: |
||
237 | section_range = get_section_lines(file_path, new_contents, section) |
||
238 | if not section_range: |
||
239 | continue |
||
240 | |||
241 | # Start by parsing the lines as YAML. |
||
242 | parsed_section = parse_from_yaml(new_contents, section_range) |
||
243 | |||
244 | # Ignore the section header. This header is included in the start range, |
||
245 | # so just increment by one. |
||
246 | start_offset = 1 |
||
247 | while not new_contents[section_range.start + start_offset].strip(): |
||
248 | start_offset += 1 |
||
249 | |||
250 | # Ignore any trailing empty lines. |
||
251 | end_offset = 0 |
||
252 | while not new_contents[section_range.end - end_offset].strip(): |
||
253 | end_offset += 1 |
||
254 | |||
255 | # Validate we only have a single section. |
||
256 | assert len(parsed_section.keys()) == 1 |
||
257 | |||
258 | # Sort the parsed subkeys. |
||
259 | parent_key = list(parsed_section.keys())[0] |
||
260 | subkeys = sorted(parsed_section[parent_key].keys(), key=sort_func) |
||
261 | |||
262 | # Don't bother if there are zero or one subkeys. Sorting order thus |
||
263 | # doesn't matter. |
||
264 | if not subkeys or len(subkeys) == 1: |
||
265 | continue |
||
266 | |||
267 | # Now we need to map sorted subkeys onto lines in the new contents, |
||
268 | # so we can re-order them appropriately. We'll assume the section is |
||
269 | # small so we'll do it in O(n^2). |
||
270 | subkey_mapping = dict() |
||
271 | for key in subkeys: |
||
272 | our_line = None |
||
273 | spaced_key = ' ' + key + ':' |
||
274 | tabbed_key = '\t' + key + ':' |
||
275 | range_start = section_range.start + start_offset |
||
276 | range_end = section_range.end - end_offset + 1 |
||
277 | for line_num in range(range_start, range_end): |
||
278 | this_line = new_contents[line_num] |
||
279 | if spaced_key in this_line or tabbed_key in this_line: |
||
280 | if our_line: |
||
281 | # Not supposed to be possible to have multiple keys |
||
282 | # matching the same value in this file. We should've |
||
283 | # already fixed this with fix-rules.py's duplicate_subkeys. |
||
284 | msg = "File {0} has duplicated key {1}: {2} vs {3}" |
||
285 | msg = msg.format(file_path, key, our_line, this_line) |
||
286 | raise ValueError(msg) |
||
287 | our_line = this_line |
||
288 | assert our_line |
||
289 | subkey_mapping[key] = our_line |
||
290 | |||
291 | # Now we'll remove all the section's subkeys and start over. Include |
||
292 | # section header but not any of the keys (or potential blank lines |
||
293 | # in the interior -- but we preserve them on either end of the |
||
294 | # section). |
||
295 | prefix = new_contents[:section_range.start+start_offset] |
||
296 | contents = list(map(lambda key: subkey_mapping[key], subkeys)) |
||
297 | suffix = new_contents[section_range.end+1-end_offset:] |
||
298 | |||
299 | new_contents = prefix + contents + suffix |
||
300 | |||
301 | return new_contents |
||
302 |