Passed
Push — master ( 42467a...9d011f )
by Matěj
03:19 queued 11s
created

utils.fix-rules.fix_int_identifier()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 2
dl 0
loc 9
ccs 0
cts 7
cp 0
crap 12
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python2
2
3
import sys
4
import os
5
import jinja2
6
import argparse
7
8
from ssg import yaml, checks
9
from ssg.shims import input_func
10
import ssg
11
12
13 View Code Duplication
def has_empty_identifier(yaml_file, product_yaml=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
14
    rule = yaml.open_and_macro_expand(yaml_file, product_yaml)
15
    if 'identifiers' in rule and rule['identifiers'] is None:
16
        return True
17
18
    if 'identifiers' in rule and rule['identifiers'] is not None:
19
        for _, value in rule['identifiers'].items():
20
            if str(value).strip() == "":
21
                return True
22
    return False
23
24
25 View Code Duplication
def has_empty_references(yaml_file, product_yaml=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26
    rule = yaml.open_and_macro_expand(yaml_file, product_yaml)
27
    if 'references' in rule and rule['references'] is None:
28
        return True
29
30
    if 'references' in rule and rule['references'] is not None:
31
        for _, value in rule['references'].items():
32
            if str(value).strip() == "":
33
                return True
34
    return False
35
36
37
def has_prefix_cce(yaml_file, product_yaml=None):
38
    rule = yaml.open_and_macro_expand(yaml_file, product_yaml)
39
    if 'identifiers' in rule and rule['identifiers'] is not None:
40
        for i_type, i_value in rule['identifiers'].items():
41
            if i_type[0:3] == 'cce':
42
                has_prefix = i_value[0:3].upper() == 'CCE'
43
                remainder_valid = checks.is_cce_format_valid("CCE-" + i_value[3:])
44
                remainder_valid |= checks.is_cce_format_valid("CCE-" + i_value[4:])
45
                return has_prefix and remainder_valid
46
    return False
47
48
49
def has_invalid_cce(yaml_file, product_yaml=None):
50
    rule = yaml.open_and_macro_expand(yaml_file, product_yaml)
51
    if 'identifiers' in rule and rule['identifiers'] is not None:
52
        for i_type, i_value in rule['identifiers'].items():
53
            if i_type[0:3] == 'cce':
54
                if not checks.is_cce_value_valid("CCE-" + str(i_value)):
55
                    return True
56
    return False
57
58
59
def has_int_identifier(yaml_file, product_yaml=None):
60
    rule = yaml.open_and_macro_expand(yaml_file, product_yaml)
61
    if 'identifiers' in rule and rule['identifiers'] is not None:
62
        for _, value in rule['identifiers'].items():
63
            if type(value) != str:
64
                return True
65
    return False
66
67
68
def has_int_reference(yaml_file, product_yaml=None):
69
    rule = yaml.open_and_macro_expand(yaml_file, product_yaml)
70
    if 'references' in rule and rule['references'] is not None:
71
        for _, value in rule['references'].items():
72
            if type(value) != str:
73
                return True
74
    return False
75
76
77
def find_rules(directory, func):
78
    # Iterates over passed directory to correctly parse rules (which are
79
    # YAML files with internal macros). The most recently seen product.yml
80
    # takes precedence over previous product.yml, e.g.:
81
    #
82
    # a/product.yml
83
    # a/b/product.yml       -- will be selected for the following rule:
84
    # a/b/c/something.rule
85
    #
86
    # The corresponding rule and contents of the product.yml are then passed
87
    # into func(/path/to/rule, product_yaml_contents); if the result evaluates
88
    # to true, the tuple (/path/to/rule, /path/to/product.yml) is saved as a
89
    # result.
90
    #
91
    # This process mimics the build system and allows us to find rule files
92
    # which satisfy the constraints of the passed func.
93
    results = []
94
    product_yamls = {}
95
    product_yaml_paths = {}
96
    product_yaml = None
97
    product_yaml_path = None
98
    for root, dirs, files in os.walk(directory):
99
100
        if "product.yml" in files:
101
            product_yaml_path = os.path.join(root, "product.yml")
102
            product_yaml = yaml.open_raw(product_yaml_path)
103
            product_yamls[root] = product_yaml
104
            product_yaml_paths[root] = product_yaml_path
105
            # for d in dirs:
106
            #     product_yamls[os.path.join(root, d)] = product_yaml
107
            #     product_yaml_paths[os.path.join(root, d)] = product_yaml_path
108
        elif root in product_yamls:
109
            product_yaml = product_yamls[root]
110
            product_yaml_path = product_yaml_paths[root]
111
            # for d in dirs:
112
            #     product_yamls[os.path.join(root, d)] = product_yaml
113
            #     product_yaml_paths[os.path.join(root, d)] = product_yaml_path
114
        else:
115
            pass
116
117
        for filename in files:
118
            path = os.path.join(root, filename)
119
            rule_filename_id = 'rule.yml'
120
            rule_filename_id_len = len(rule_filename_id)
121
            if len(path) < rule_filename_id_len \
122
                or path[-(rule_filename_id_len):] != rule_filename_id \
123
                or "tests/" in path:
124
                continue
125
            try:
126
                if func(path, product_yaml):
127
                    results.append((path, product_yaml_path))
128
            except jinja2.exceptions.UndefinedError:
129
                print("Failed to parse file %s (with product.yaml: %s). Skipping"
130
                      % (path, product_yaml_path))
131
                pass
132
133
    return results
134
135
136
def print_file(file_contents):
137
    for line_num in range(0, len(file_contents)):
138
        print("%d: %s" % (line_num, file_contents[line_num]))
139
140
141 View Code Duplication
def find_section_lines(file_contents, sec):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
142
    # Hack to find a global key ("section"/sec) in a YAML-like file.
143
    # All indented lines until the next global key are included in the range.
144
    # For example:
145
    #
146
    # 0: not_it:
147
    # 1:     - value
148
    # 2: this_one:
149
    # 3:      - 2
150
    # 4:      - 5
151
    # 5:
152
    # 6: nor_this:
153
    #
154
    # for the section "this_one", the result [(2, 5)] will be returned.
155
    # Note that multiple sections may exist in a file and each will be
156
    # identified and returned.
157
    sec_ranges = []
158
159
    sec_id = sec + ":"
160
    sec_len = len(sec_id)
161
    end_num = len(file_contents)
162
    line_num = 0
163
164
    while line_num < end_num:
165
        if len(file_contents[line_num]) >= sec_len:
166
            if file_contents[line_num][0:sec_len] == sec_id:
167
                begin = line_num
168
                line_num += 1
169
                while line_num < end_num:
170
                    if len(file_contents[line_num]) > 0 and file_contents[line_num][0] != ' ':
171
                        break
172
                    line_num += 1
173
174
                end = line_num - 1
175
                sec_ranges.append((begin, end))
176
        line_num += 1
177
    return sec_ranges
178
179
180
def remove_lines(file_contents, lines):
181
    # Returns a series of lines and returns a new copy
182
    new_file = []
183
    for line_num in range(0, len(file_contents)):
184
        if line_num not in lines:
185
            new_file.append(file_contents[line_num])
186
187
    return new_file
188
189
190
def remove_section_keys(file_contents, yaml_contents, section, removed_keys):
191
    # Remove a series of keys from a section. Refuses to operate if there is more
192
    # than one instance of the section. If the section is empty (because all keys
193
    # are removed), then the section is also removed. Otherwise, only matching keys
194
    # are removed. Note that all instances of the keys will be removed, if it appears
195
    # more than once.
196
    sec_ranges = find_section_lines(file_contents, section)
197
    if len(sec_ranges) != 1:
198
        raise RuntimeError("Refusing to fix file: %s -- could not find one section: %d"
199
                           % (path, sec_ranges))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable path does not seem to be defined.
Loading history...
200
201
    begin, end = sec_ranges[0]
202
    r_lines = set()
203
204
    if (yaml_contents[section] is None or len(yaml_contents[section].keys()) == len(removed_keys)):
205
        r_lines = set(range(begin, end+1))
206
        print("Removing entire section since all keys are empty")
207
    else:
208
        # Don't include section header
209
        for line_num in range(begin+1, end+1):
210
            line = file_contents[line_num].strip()
211
            len_line = len(line)
212
213
            for key in removed_keys:
214
                k_l = len(key)+1
215
                k_i = key + ":"
216
                if len_line >= k_l and line[0:k_l] == k_i:
217
                    r_lines.add(line_num)
218
                    break
219
220
    return remove_lines(file_contents, r_lines)
221
222
223
def rewrite_value_int_str(line):
224
    # Rewrites a key's value to explicitly be a string. Assumes it starts
225
    # as an integer. Takes a line.
226
    key_end = line.index(':')
227
    key = line[0:key_end]
228
    value = line[key_end+1:].strip()
229
    str_value = '"' + value + '"'
230
    return key + ": " + str_value
231
232
233
def rewrite_value_remove_prefix(line):
234
    # Rewrites a key's value to remove a "CCE" prefix.
235
    key_end = line.index(':')
236
    key = line[0:key_end]
237
    value = line[key_end+1:].strip()
238
    new_value = value
239
    if checks.is_cce_format_valid("CCE-" + value[3:]):
240
        new_value = value[3:]
241
    elif checks.is_cce_format_valid("CCE-" + value[4:]):
242
        new_value = value[4:]
243
    return key + ": " + new_value
244
245
246
def rewrite_section_value(file_contents, yaml_contents, section, keys, transform):
247
    # For a given section, rewrite the keys in int_keys to be strings. Refuses to
248
    # operate if the given section appears more than once in the file. Assumes all
249
    # instances of key are an integer; all will get updated.
250
    new_contents = file_contents[:]
251
252
    sec_ranges = find_section_lines(file_contents, section)
253
    if len(sec_ranges) != 1:
254
        raise RuntimeError("Refusing to fix file: %s -- could not find one section: %d"
255
                           % (path, sec_ranges))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable path does not seem to be defined.
Loading history...
256
257
    begin, end = sec_ranges[0]
258
    r_lines = set()
259
260
    # Don't include section header
261
    for line_num in range(begin+1, end+1):
262
        line = file_contents[line_num].strip()
263
        len_line = len(line)
264
265
        for key in keys:
266
            k_l = len(key)+1
267
            k_i = key + ":"
268
269
            if len_line >= k_l and line[0:k_l] == k_i:
270
                new_contents[line_num] = transform(file_contents[line_num])
271
                break
272
273
    return new_contents
274
275
276
def rewrite_section_value_int_str(file_contents, yaml_contents, section, int_keys):
277
    return rewrite_section_value(file_contents, yaml_contents, section, int_keys,
278
                                 rewrite_value_int_str)
279
280
281 View Code Duplication
def fix_empty_identifier(file_contents, yaml_contents):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
282
    section = 'identifiers'
283
284
    empty_identifiers = []
285
    if yaml_contents[section] is not None:
286
        for i_type, i_value in yaml_contents[section].items():
287
            if str(i_value).strip() == "":
288
                empty_identifiers.append(i_type)
289
290
    return remove_section_keys(file_contents, yaml_contents, section, empty_identifiers)
291
292
293 View Code Duplication
def fix_empty_reference(file_contents, yaml_contents):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
294
    section = 'references'
295
296
    empty_identifiers = []
297
298
    if yaml_contents[section] is not None:
299
        for i_type, i_value in yaml_contents[section].items():
300
            if str(i_value).strip() == "":
301
                empty_identifiers.append(i_type)
302
303
    return remove_section_keys(file_contents, yaml_contents, section, empty_identifiers)
304
305
306
def fix_prefix_cce(file_contents, yaml_contents):
307
    section = 'identifiers'
308
309
    prefixed_identifiers = []
310
311
    if yaml_contents[section] is not None:
312
        for i_type, i_value in yaml_contents[section].items():
313
            if i_type[0:3] == 'cce':
314
                has_prefix = i_value[0:3].upper() == 'CCE'
315
                remainder_valid = checks.is_cce_format_valid("CCE-" + str(i_value[3:]))
316
                remainder_valid |= checks.is_cce_format_valid("CCE-" + str(i_value[4:]))
317
                if has_prefix and remainder_valid:
318
                    prefixed_identifiers.append(i_type)
319
320
    return rewrite_section_value(file_contents, yaml_contents, section, prefixed_identifiers,
321
                                 rewrite_value_remove_prefix)
322
323
324
def fix_invalid_cce(file_contents, yaml_contents):
325
    section = 'identifiers'
326
327
    invalid_identifiers = []
328
329
    if yaml_contents[section] is not None:
330
        for i_type, i_value in yaml_contents[section].items():
331
            if i_type[0:3] == 'cce':
332
                if not checks.is_cce_value_valid("CCE-" + str(i_value)):
333
                    invalid_identifiers.append(i_type)
334
335
    return remove_section_keys(file_contents, yaml_contents, section, invalid_identifiers)
336
337
338
def fix_int_identifier(file_contents, yaml_contents):
339
    section = 'identifiers'
340
341
    int_identifiers = []
342
    for i_type, i_value in yaml_contents[section].items():
343
        if type(i_value) != str:
344
            int_identifiers.append(i_type)
345
346
    return rewrite_section_value_int_str(file_contents, yaml_contents, section, int_identifiers)
347
348
349
def fix_int_reference(file_contents, yaml_contents):
350
    section = 'references'
351
352
    int_identifiers = []
353
    for i_type, i_value in yaml_contents[section].items():
354
        if type(i_value) != str:
355
            int_identifiers.append(i_type)
356
357
    return rewrite_section_value_int_str(file_contents, yaml_contents, section, int_identifiers)
358
359
360
def fix_file(path, product_yaml, func):
361
    file_contents = open(path, 'r').read().split("\n")
362
    if file_contents[-1] == '':
363
        file_contents = file_contents[:-1]
364
365
    yaml_contents = yaml.open_and_macro_expand(path, product_yaml)
366
367
    print("====BEGIN BEFORE====")
368
    print_file(file_contents)
369
    print("====END BEFORE====")
370
371
    file_contents = func(file_contents, yaml_contents)
372
373
    print("====BEGIN AFTER====")
374
    print_file(file_contents)
375
    print("====END AFTER====")
376
    response = input_func("Confirm writing output to %s: (y/n): " % path)
377
    if response.strip() == 'y':
378
        f = open(path, 'w')
379
        for line in file_contents:
380
            f.write(line)
381
            f.write("\n")
382
        f.flush()
383
        f.close()
384
385
386
def fix_empty_identifiers(directory):
387
    results = find_rules(directory, has_empty_identifier)
388
    print("Number of rules with empty identifiers: %d" % len(results))
389
390
    for result in results:
391
        rule_path = result[0]
392
        product_yaml_path = result[1]
393
394
        product_yaml = None
395
        if product_yaml_path is not None:
396
            product_yaml = yaml.open_raw(product_yaml_path)
397
398
        fix_file(rule_path, product_yaml, fix_empty_identifier)
399
400
401
def fix_empty_references(directory):
402
    results = find_rules(directory, has_empty_references)
403
    print("Number of rules with empty references: %d" % len(results))
404
405
    for result in results:
406
        rule_path = result[0]
407
        product_yaml_path = result[1]
408
409
        product_yaml = None
410
        if product_yaml_path is not None:
411
            product_yaml = yaml.open_raw(product_yaml_path)
412
413
        fix_file(rule_path, product_yaml, fix_empty_reference)
414
415
416
def find_prefix_cce(directory):
417
    results = find_rules(directory, has_prefix_cce)
418
    print("Number of rules with prefixed CCEs: %d" % len(results))
419
420
    for result in results:
421
        rule_path = result[0]
422
        product_yaml_path = result[1]
423
424
        product_yaml = None
425
        if product_yaml_path is not None:
426
            product_yaml = yaml.open_raw(product_yaml_path)
427
428
        fix_file(rule_path, product_yaml, fix_prefix_cce)
429
430
431
def find_invalid_cce(directory):
432
    results = find_rules(directory, has_invalid_cce)
433
    print("Number of rules with invalid CCEs: %d" % len(results))
434
435
    for result in results:
436
        rule_path = result[0]
437
        product_yaml_path = result[1]
438
439
        product_yaml = None
440
        if product_yaml_path is not None:
441
            product_yaml = yaml.open_raw(product_yaml_path)
442
443
        fix_file(rule_path, product_yaml, fix_invalid_cce)
444
445
446
def find_int_identifiers(directory):
447
    results = find_rules(directory, has_int_identifier)
448
    print("Number of rules with integer identifiers: %d" % len(results))
449
450
    for result in results:
451
        rule_path = result[0]
452
        product_yaml_path = result[1]
453
454
        product_yaml = None
455
        if product_yaml_path is not None:
456
            product_yaml = yaml.open_raw(product_yaml_path)
457
458
        fix_file(rule_path, product_yaml, fix_int_identifier)
459
460
461
def find_int_references(directory):
462
    results = find_rules(directory, has_int_reference)
463
    print("Number of rules with integer references: %d" % len(results))
464
465
    for result in results:
466
        rule_path = result[0]
467
        product_yaml_path = result[1]
468
469
        product_yaml = None
470
        if product_yaml_path is not None:
471
            product_yaml = yaml.open_raw(product_yaml_path)
472
473
        fix_file(rule_path, product_yaml, fix_int_reference)
474
475
476
def parse_args():
477
    parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
478
                                     description="Utility for fixing mistakes in .rule files",
479
                                     epilog="""
480
Commands:
481
\tempty_identifiers - check and fix rules with empty identifiers
482
\tprefixed_identifiers - check and fix rules with prefixed (CCE-) identifiers
483
\tinvalid_identifiers - check and fix rules with invalid identifiers
484
\tint_identifiers - check and fix rules with pseudo-integer identifiers
485
\tempty_references - check and fix rules with empty references
486
\tint_references - check and fix rules with pseudo-integer references
487
                                     """)
488
    parser.add_argument("command", help="Which fix to perform.",
489
                        choices=['empty_identifiers', 'prefixed_identifiers',
490
                                 'invalid_identifiers', 'int_identifiers',
491
                                 'empty_references', 'int_references'])
492
    parser.add_argument("ssg_root", help="Path to root of ssg git directory")
493
    return parser.parse_args()
494
495
496
def __main__():
497
    args = parse_args()
498
499
    if args.command == 'empty_identifiers':
500
        fix_empty_identifiers(args.ssg_root)
501
    elif args.command == 'prefixed_identifiers':
502
        find_prefix_cce(args.ssg_root)
503
    elif args.command == 'invalid_identifiers':
504
        find_invalid_cce(args.ssg_root)
505
    elif args.command == 'int_identifiers':
506
        find_int_identifiers(args.ssg_root)
507
    elif args.command == 'empty_references':
508
        fix_empty_references(args.ssg_root)
509
    elif args.command == 'int_references':
510
        find_int_references(args.ssg_root)
511
    else:
512
        sys.exit(1)
513
514
if __name__ == "__main__":
515
    __main__()
516