Passed
Push — master ( 42467a...9d011f )
by Matěj
03:19 queued 11s
created

utils.fix_file_ocilclause   C

Complexity

Total Complexity 55

Size/Duplication

Total Lines 250
Duplicated Lines 65.2 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 173
dl 163
loc 250
ccs 0
cts 166
cp 0
rs 6
c 0
b 0
f 0
wmc 55

16 Functions

Rating   Name   Duplication   Size   Complexity  
A update_key_value() 16 16 4
A add_key_subkey() 11 11 1
A write_file() 0 7 2
A range_has_jinja() 0 2 1
B get_key() 10 10 7
A update_subkey_value() 21 21 4
A get_sections() 0 7 3
A __main__() 0 8 1
A read_file() 0 5 2
B find_section_lines() 37 37 7
A walk_dir() 15 15 4
A print_file() 0 3 2
B fix_ocil_clause() 26 26 7
A parse_from_yaml() 0 4 1
A parse_args() 0 5 1
B _create_profile_cache() 27 28 8

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like utils.fix_file_ocilclause often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import sys
2
import os
3
import argparse
4
import subprocess
5
import jinja2
6
import yaml
7
8
import ssg
9
10
11 View Code Duplication
def _create_profile_cache(ssg_root):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
12
    profile_cache = {}
13
14
    product_list = ['debian9', 'debian10', 'fedora', 'ol7', 'opensuse',
15
                    'rhel7', 'sle12', 'ubuntu1604', 'ubuntu1804',
16
                    'wrlinux']
17
18
    for product in product_list:
19
        found_obj_name = False
20
        prod_profiles_dir = os.path.join(ssg_root, product, "profiles")
21
        for _, _, files in os.walk(prod_profiles_dir):
22
            for filename in files:
23
                profile_path = os.path.join(prod_profiles_dir, filename)
24
                parsed_profile = yaml.load(open(profile_path, 'r'))
25
                for _obj in parsed_profile['selections']:
26
                    obj = _obj
27
                    if '=' in obj:
28
                        # is a var with non-default value
29
                        obj = _obj[:_obj.index('=')]
30
                    if not obj[0].isalpha():
31
                        obj = obj[1:]
32
33
                    if obj not in profile_cache:
34
                        profile_cache[obj] = set()
35
36
                    profile_cache[obj].add(product)
37
38
    return profile_cache
39
40
41
def read_file(path):
42
    file_contents = open(path, 'r').read().split("\n")
43
    if file_contents[-1] == '':
44
        file_contents = file_contents[:-1]
45
    return file_contents
46
47
48
def write_file(path, contents):
49
    _f = open(path, 'w')
50
    for line in contents:
51
        _f.write(line + "\n")
52
53
    _f.flush()
54
    _f.close()
55
56
57 View Code Duplication
def find_section_lines(file_contents, sec):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
58
    # Hack to find a global key ("section"/sec) in a YAML-like file.
59
    # All indented lines until the next global key are included in the range.
60
    # For example:
61
    #
62
    # 0: not_it:
63
    # 1:     - value
64
    # 2: this_one:
65
    # 3:      - 2
66
    # 4:      - 5
67
    # 5:
68
    # 6: nor_this:
69
    #
70
    # for the section "this_one", the result [(2, 5)] will be returned.
71
    # Note that multiple sections may exist in a file and each will be
72
    # identified and returned.
73
    sec_ranges = []
74
75
    sec_id = sec + ":"
76
    sec_len = len(sec_id)
77
    end_num = len(file_contents)
78
    line_num = 0
79
80
    while line_num < end_num:
81
        if len(file_contents[line_num]) >= sec_len:
82
            if file_contents[line_num][0:sec_len] == sec_id:
83
                begin = line_num
84
                line_num += 1
85
                while line_num < end_num:
86
                    if len(file_contents[line_num]) > 0 and file_contents[line_num][0] != ' ':
87
                        break
88
                    line_num += 1
89
90
                end = line_num - 1
91
                sec_ranges.append((begin, end))
92
        line_num += 1
93
    return sec_ranges
94
95
96 View Code Duplication
def update_key_value(contents, key, old_value, new_value):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
97
    new_contents = contents[:]
98
    old_line = key + ": " + old_value
99
    updated = False
100
101
    for line_num in range(0, len(new_contents)):
102
        line = new_contents[line_num]
103
        if line == old_line:
104
            new_contents[line_num] = key + ": " + new_value
105
            updated = True
106
            break
107
108
    if not updated:
109
        assert(False)
110
111
    return new_contents
112
113
114 View Code Duplication
def update_subkey_value(contents, key, subkey, old_value, new_value):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
115
    new_contents = contents[:]
116
    old_line = "    " + subkey + ": " + old_value
117
    key_range = find_section_lines(contents, key)[0]
118
    updated = False
119
120
    for line_num in range(key_range[0], key_range[1] + 1):
121
        line = new_contents[line_num]
122
        if line == old_line:
123
            new_contents[line_num] = "    " + subkey + ": "
124
            updated = True
125
126
    if not updated:
127
        print(key)
128
        print(subkey)
129
        print(old_value)
130
        print(new_value)
131
        print(contents[key_range[0]:key_range[1]+1])
132
        assert(False)
133
134
    return new_contents
135
136
137 View Code Duplication
def add_key_subkey(contents, key, subkey, value):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
138
    new_line = "    " + subkey + ": " + value
139
    key_range = find_section_lines(contents, key)[0]
140
141
    # Since there is always at least one line in the key_range (when [0] == [1]),
142
    # it is always safe to add the new value right after the key header.
143
    start_line = key_range[0] + 1
144
    new_contents = contents[0:start_line]
145
    new_contents.append(new_line)
146
    new_contents.extend(contents[start_line:])
147
    return new_contents
148
149
150 View Code Duplication
def get_key(line):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
151
    if ':' in line and line[0].isalpha():
152
        char_index = 0
153
        _ll = len(line)
154
        while char_index < _ll-1 and (line[char_index].isalpha() or
155
                                      line[char_index] == '_'):
156
            char_index += 1
157
        if line[char_index] == ':':
158
            return line[0:char_index]
159
    return None
160
161
162
def get_sections(file_contents):
163
    global_sections = set()
164
    for line in file_contents:
165
        key = get_key(line)
166
        if key:
167
            global_sections.add(key)
168
    return global_sections
169
170
171
def range_has_jinja(file_contents, range):
172
    return '{{' and '}}' in "\n".join(file_contents[range[0]:range[1]+1])
173
174
175 View Code Duplication
def fix_ocil_clause(ssg_root, path, obj_name):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
176
    is_file_templated = obj_name[0:4] == 'file'
177
    is_permissions = '_permissions_' in obj_name
178
    is_groupowner = '_groupowner_' in obj_name
179
    is_owner = '_owner_' in obj_name
180
181
    if not is_file_templated or not (is_permissions or is_groupowner or is_owner):
182
        return
183
184
    loaded_file = read_file(path)
185
    sections = get_sections(loaded_file)
186
    if not 'ocil_clause' in sections:
187
        ocil_lines = find_section_lines(loaded_file, 'ocil')
188
        assert(len(ocil_lines) == 1)
189
        ocil_lines = ocil_lines[0]
190
191
        ocil = parse_from_yaml(loaded_file, ocil_lines)['ocil']
192
        if '{{{' not in ocil:
193
            print(path)
194
195
        ocil_clause_str = ocil.replace('ocil_', 'ocil_clause_')
196
        new_line = "ocil_clause: '%s'" % ocil_clause_str
197
        new_file = loaded_file[:ocil_lines[0]]
198
        new_file.extend([new_line, ''])
199
        new_file.extend(loaded_file[ocil_lines[0]:])
200
        write_file(path, new_file)
201
202
203
def parse_from_yaml(file_contents, lines):
204
    new_file_arr = file_contents[lines[0]:lines[1] + 1]
205
    new_file = "\n".join(new_file_arr)
206
    return yaml.load(new_file)
207
208
209
def print_file(file_contents):
210
    for line_num in range(0, len(file_contents)):
211
        print("%d: %s" % (line_num, file_contents[line_num]))
212
213
214 View Code Duplication
def walk_dir(ssg_root, function):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
215
    product_guide = os.path.join(ssg_root, 'linux_os', 'guide')
216
    _pgl = len(product_guide)
217
218
    data = None
219
    for root, dirs, files in os.walk(product_guide):
220
        for filename in files:
221
            path = os.path.join(root, filename)
222
223
            obj_name = filename
224
            is_rule = len(path) >= 5 and path[-5:] == '.rule'
225
226
            if is_rule:
227
                obj_name = filename[:-5]
228
                function(ssg_root, path, obj_name)
229
230
231
def parse_args():
232
    parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
233
                                     description="Utility for finding similar guide rules")
234
    parser.add_argument("ssg_root", help="Path to root of ssg git directory")
235
    return parser.parse_args()
236
237
238
def __main__():
239
    args = parse_args()
240
241
    pc = _create_profile_cache(args.ssg_root)
242
    global profile_cache
243
    profile_cache = pc
244
245
    walk_dir(args.ssg_root, fix_ocil_clause)
246
247
248
if __name__ == "__main__":
249
    __main__()
250