ssg.rule_yaml.find_section_lines()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 48
Code Lines 21

Duplication

Lines 14
Ratio 29.17 %

Code Coverage

Tests 0
CRAP Score 56

Importance

Changes 0
Metric Value
cc 7
eloc 21
nop 2
dl 14
loc 48
ccs 0
cts 21
cp 0
crap 56
rs 7.9759
c 0
b 0
f 0
1
"""
2
The rule_yaml module provides various utility functions for handling YAML files
3
containing Jinja macros, without having to parse the macros.
4
"""
5
6
from __future__ import absolute_import
7
from __future__ import print_function
8
9
import os
10
import sys
11
from collections import namedtuple, defaultdict
12
import yaml
13
14
from .rules import get_rule_dir_yaml
15
from .utils import read_file_list
16
17
18
def find_section_lines(file_contents, sec):
19
    """
20
    Parses the given file_contents as YAML to find the section with the given identifier.
21
    Note that this does not call into the yaml library and thus correctly handles jinja
22
    macros at the expense of not being a strictly valid yaml parsing.
23
24
    Returns a list of namedtuples (start, end) of the lines where section exists.
25
    """
26
27
    # Hack to find a global key ("section"/sec) in a YAML-like file.
28
    # All indented lines until the next global key are included in the range.
29
    # For example:
30
    #
31
    # 0: not_it:
32
    # 1:     - value
33
    # 2: this_one:
34
    # 3:      - 2
35
    # 4:      - 5
36
    # 5:
37
    # 6: nor_this:
38
    #
39
    # for the section "this_one", the result [(2, 5)] will be returned.
40
    # Note that multiple sections may exist in a file and each will be
41
    # identified and returned.
42
    section = namedtuple('section', ['start', 'end'])
43
44
    sec_ranges = []
45
    sec_id = sec + ":"
46
    sec_len = len(sec_id)
47
    end_num = len(file_contents)
48
    line_num = 0
49
50 View Code Duplication
    while line_num < end_num:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
51
        if len(file_contents[line_num]) >= sec_len:
52
            if file_contents[line_num][0:sec_len] == sec_id:
53
                begin = line_num
54
                line_num += 1
55
                while line_num < end_num:
56
                    nonempty_line = file_contents[line_num]
57
                    if nonempty_line and file_contents[line_num][0] != ' ':
58
                        break
59
                    line_num += 1
60
61
                end = line_num - 1
62
                sec_ranges.append(section(begin, end))
63
        line_num += 1
64
65
    return sec_ranges
66
67
68
def add_key_value(contents, key, start_line, new_value):
69
    """
70
    Adds a new key to contents with the given value after line start_line, returning
71
    the result. Also adds a blank line afterwards.
72
73
    Does not modify the value of contents.
74
    """
75
76
    new_contents = contents[:start_line]
77
    new_contents.append("%s: %s" % (key, new_value))
78
    new_contents.append("")
79
    new_contents.extend(contents[start_line:])
80
81
    return new_contents
82
83
84
def update_key_value(contents, key, old_value, new_value):
85
    """
86
    Find key in the contents of a file and replace its value with the new value,
87
    returning the resulting file. This validates that the old value is constant and
88
    hasn't changed since parsing its value.
89
90
    Raises a ValueError when the key cannot be found in the given contents.
91
92
    Does not modify the value of contents.
93
    """
94
95
    new_contents = contents[:]
96
    old_line = key + ": " + old_value
97
    updated = False
98
99
    for line_num in range(0, len(new_contents)):
100
        line = new_contents[line_num]
101
        if line == old_line:
102
            new_contents[line_num] = key + ": " + new_value
103
            updated = True
104
            break
105
106
    if not updated:
107
        raise ValueError("For key:%s, cannot find the old value (%s) in the given "
108
                         "contents." % (key, old_value))
109
110
    return new_contents
111
112
113
def remove_lines(contents, lines):
114
    """
115
    Remove the lines of the section from the parsed file, returning the new contents.
116
117
    Does not modify the passed in contents.
118
    """
119
120
    new_contents = contents[:lines.start]
121
    new_contents.extend(contents[lines.end+1:])
122
    return new_contents
123
124
125
def parse_from_yaml(file_contents, lines):
126
    """
127
    Parse the given line range as a yaml, returning the parsed object.
128
    """
129
130
    new_file_arr = file_contents[lines.start:lines.end + 1]
131
    new_file = "\n".join(new_file_arr)
132
    return yaml.load(new_file, Loader=yaml.Loader)
133
134
135
def get_yaml_contents(rule_obj):
136
    """
137
    From a rule_obj description, return a namedtuple of (path, contents); where
138
    path is the path to the rule YAML and contents is the list of lines in
139
    the file.
140
    """
141
142
    file_description = namedtuple('file_description', ('path', 'contents'))
143
144
    yaml_file = get_rule_dir_yaml(rule_obj['dir'])
145
    if not os.path.exists(yaml_file):
146
        raise ValueError("Error: yaml file does not exist for rule_id:%s" %
147
                         rule_obj['id'], file=sys.stderr)
148
149
    yaml_contents = read_file_list(yaml_file)
150
151
    return file_description(yaml_file, yaml_contents)
152
153
154
def parse_prodtype(prodtype):
155
    """
156
    From a prodtype line, returns the set of products listed.
157
    """
158
159
    return set(map(lambda x: x.strip(), prodtype.split(',')))
160
161
162
def get_section_lines(file_path, file_contents, key_name):
163
    """
164
    From the given file_path and file_contents, find the lines describing the section
165
    key_name and returns the line range of the section.
166
    """
167
168
    section = find_section_lines(file_contents, key_name)
169
170
    if len(section) > 1:
171
        raise ValueError("Multiple instances (%d) of %s in %s; refusing to modify file." %
172
                         (len(section), key_name, file_path), file=sys.stderr)
173
174
    elif len(section) == 1:
175
        return section[0]
176
177
    return None
178
179
180
def has_duplicated_subkeys(file_path, file_contents, sections):
181
    """
182
    Checks whether a section has duplicated keys. Note that these are silently
183
    eaten by the YAML parser we use.
184
    """
185
186
    if isinstance(sections, str):
187
        sections = [sections]
188
189
    for section in sections:
190
        # Get the lines in the file which match this section. If none exists,
191
        # it should be safe to silently ignore it. Clearly if the section
192
        # exists, there are no duplicated sections.
193
        section_range = get_section_lines(file_path, file_contents, section)
194
        if not section_range:
195
            continue
196
197
        # Get the YAML parser's version of events. :-)
198
        parsed_section = parse_from_yaml(file_contents, section_range)
199
200
        # Sort the YAML parser's subkeys.
201
        parent_key = list(parsed_section.keys())[0]
202
        subkeys = parsed_section[parent_key].keys()
203
204
        # Create a dictionary for counting them.
205
        subkey_counts = defaultdict(lambda: 0)
206
207
        # Iterate over the lines, see if they match a known key. Ignore the
208
        # first line (as it is the section header).
209
        for line_num in range(section_range.start+1, section_range.end):
210
            line = file_contents[line_num]
211
            if not line:
212
                continue
213
214
            # We'll be lazy for the time being. Iterate over all keys.
215
            for key in subkeys:
216
                our_key = ' ' + key + ':'
217
                if our_key in line:
218
                    subkey_counts[our_key] += 1
219
                    if subkey_counts[our_key] > 1:
220
                        print("Duplicated key " + our_key + " in " + section + " of " + file_path)
221
                        return True
222
223
    return False
224
225
226
def sort_section_keys(file_path, file_contents, sections, sort_func=None):
227
    """
228
    Sort subkeys in a YAML file's section.
229
    """
230
231
    if isinstance(sections, str):
232
        sections = [sections]
233
234
    new_contents = file_contents[:]
235
236
    for section in sections:
237
        section_range = get_section_lines(file_path, new_contents, section)
238
        if not section_range:
239
            continue
240
241
        # Start by parsing the lines as YAML.
242
        parsed_section = parse_from_yaml(new_contents, section_range)
243
244
        # Ignore the section header. This header is included in the start range,
245
        # so just increment by one.
246
        start_offset = 1
247
        while not new_contents[section_range.start + start_offset].strip():
248
            start_offset += 1
249
250
        # Ignore any trailing empty lines.
251
        end_offset = 0
252
        while not new_contents[section_range.end - end_offset].strip():
253
            end_offset += 1
254
255
        # Validate we only have a single section.
256
        assert len(parsed_section.keys()) == 1
257
258
        # Sort the parsed subkeys.
259
        parent_key = list(parsed_section.keys())[0]
260
        subkeys = sorted(parsed_section[parent_key].keys(), key=sort_func)
261
262
        # Don't bother if there are zero or one subkeys. Sorting order thus
263
        # doesn't matter.
264
        if not subkeys or len(subkeys) == 1:
265
            continue
266
267
        # Now we need to map sorted subkeys onto lines in the new contents,
268
        # so we can re-order them appropriately. We'll assume the section is
269
        # small so we'll do it in O(n^2).
270
        subkey_mapping = dict()
271
        for key in subkeys:
272
            our_line = None
273
            spaced_key = ' ' + key + ':'
274
            tabbed_key = '\t' + key + ':'
275
            range_start = section_range.start + start_offset
276
            range_end = section_range.end - end_offset + 1
277
            for line_num in range(range_start, range_end):
278
                this_line = new_contents[line_num]
279
                if spaced_key in this_line or tabbed_key in this_line:
280
                    if our_line:
281
                        # Not supposed to be possible to have multiple keys
282
                        # matching the same value in this file. We should've
283
                        # already fixed this with fix-rules.py's duplicate_subkeys.
284
                        msg = "File {0} has duplicated key {1}: {2} vs {3}"
285
                        msg = msg.format(file_path, key, our_line, this_line)
286
                        raise ValueError(msg)
287
                    our_line = this_line
288
            assert our_line
289
            subkey_mapping[key] = our_line
290
291
        # Now we'll remove all the section's subkeys and start over. Include
292
        # section header but not any of the keys (or potential blank lines
293
        # in the interior -- but we preserve them on either end of the
294
        # section).
295
        prefix = new_contents[:section_range.start+start_offset]
296
        contents = list(map(lambda key: subkey_mapping[key], subkeys))
0 ignored issues
show
introduced by
The variable subkey_mapping does not seem to be defined for all execution paths.
Loading history...
297
        suffix = new_contents[section_range.end+1-end_offset:]
298
299
        new_contents = prefix + contents + suffix
300
301
    return new_contents
302