Passed
Push — master ( 42467a...9d011f )
by Matěj
03:19 queued 11s
created

utils.move_rules.add_key_subkey()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 8

Duplication

Lines 11
Ratio 100 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 4
dl 11
loc 11
ccs 0
cts 8
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
import sys
2
import os
3
import argparse
4
import subprocess
5
import jinja2
6
import yaml
7
8
import ssg.products
9
10
11
linux_os = set()
12
13
14
def abs_join(a, *p):
15
    return os.path.abspath(os.path.join(a, *p))
16
17
def read_file(path):
18
    file_contents = open(path, 'r').read().split("\n")
19
    if file_contents[-1] == '':
20
        file_contents = file_contents[:-1]
21
    return file_contents
22
23
24
def write_file(path, contents):
25
    _f = open(path, 'w')
26
    for line in contents:
27
        _f.write(line + "\n")
28
29
    _f.flush()
30
    _f.close()
31
32
33 View Code Duplication
def find_section_lines(file_contents, sec):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
34
    # Hack to find a global key ("section"/sec) in a YAML-like file.
35
    # All indented lines until the next global key are included in the range.
36
    # For example:
37
    #
38
    # 0: not_it:
39
    # 1:     - value
40
    # 2: this_one:
41
    # 3:      - 2
42
    # 4:      - 5
43
    # 5:
44
    # 6: nor_this:
45
    #
46
    # for the section "this_one", the result [(2, 5)] will be returned.
47
    # Note that multiple sections may exist in a file and each will be
48
    # identified and returned.
49
    sec_ranges = []
50
51
    sec_id = sec + ":"
52
    sec_len = len(sec_id)
53
    end_num = len(file_contents)
54
    line_num = 0
55
56
    while line_num < end_num:
57
        if len(file_contents[line_num]) >= sec_len:
58
            if file_contents[line_num][0:sec_len] == sec_id:
59
                begin = line_num
60
                line_num += 1
61
                while line_num < end_num:
62
                    if len(file_contents[line_num]) > 0 and file_contents[line_num][0] != ' ':
63
                        break
64
                    line_num += 1
65
66
                end = line_num - 1
67
                sec_ranges.append((begin, end))
68
        line_num += 1
69
    return sec_ranges
70
71
72 View Code Duplication
def update_key_value(contents, key, old_value, new_value):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
73
    new_contents = contents[:]
74
    old_line = key + ": " + old_value
75
    updated = False
76
77
    for line_num in range(0, len(new_contents)):
78
        line = new_contents[line_num]
79
        if line == old_line:
80
            new_contents[line_num] = key + ": " + new_value
81
            updated = True
82
            break
83
84
    if not updated:
85
        assert(False)
86
87
    return new_contents
88
89
90 View Code Duplication
def update_subkey_value(contents, key, subkey, old_value, new_value):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
91
    new_contents = contents[:]
92
    old_line = "    " + subkey + ": " + old_value
93
    key_range = find_section_lines(contents, key)[0]
94
    updated = False
95
96
    for line_num in range(key_range[0], key_range[1] + 1):
97
        line = new_contents[line_num]
98
        if line == old_line:
99
            new_contents[line_num] = "    " + subkey + ": "
100
            updated = True
101
102
    if not updated:
103
        print(key)
104
        print(subkey)
105
        print(old_value)
106
        print(new_value)
107
        print(contents[key_range[0]:key_range[1]+1])
108
        assert(False)
109
110
    return new_contents
111
112
113 View Code Duplication
def add_key_subkey(contents, key, subkey, value):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
114
    new_line = "    " + subkey + ": " + value
115
    key_range = find_section_lines(contents, key)[0]
116
117
    # Since there is always at least one line in the key_range (when [0] == [1]),
118
    # it is always safe to add the new value right after the key header.
119
    start_line = key_range[0] + 1
120
    new_contents = contents[0:start_line]
121
    new_contents.append(new_line)
122
    new_contents.extend(contents[start_line:])
123
    return new_contents
124
125
126 View Code Duplication
def get_key(line):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
127
    if ':' in line and line[0].isalpha():
128
        char_index = 0
129
        _ll = len(line)
130
        while char_index < _ll-1 and (line[char_index].isalpha() or
131
                                      line[char_index] == '_'):
132
            char_index += 1
133
        if line[char_index] == ':':
134
            return line[0:char_index]
135
    return None
136
137
138
def get_sections(file_contents):
139
    global_sections = set()
140
    for line in file_contents:
141
        key = get_key(line)
142
        if key:
143
            global_sections.add(key)
144
    return global_sections
145
146
147
def range_has_jinja(file_contents, range):
148
    return '{{' and '}}' in "\n".join(file_contents[range[0]:range[1]+1])
149
150
151 View Code Duplication
def move_rule_other(ssg_root, current_product, path, obj_name):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
152
    base_path = os.path.dirname(path)
153
    new_rule_dir = abs_join(base_path, obj_name)
154
    new_rule_path = abs_join(new_rule_dir, "rule.yml")
155
156
    assert not os.path.exists(new_rule_dir)
157
158
    sub_dirs = ['oval', 'bash', 'ansible', 'anaconda', 'puppet']
159
    new_rule_subdirs = [abs_join(new_rule_dir, j) for j in sub_dirs]
160
161
    move_templates = [
162
        {
163
            'source_dir': "%s/checks/oval",
164
            'source_name': "%s.xml",
165
            'dest_dir': new_rule_subdirs[0],
166
            'dest_name': "%s.xml"
167
        },
168
        {
169
            'source_dir': "%s/fixes/bash",
170
            'source_name': "%s.sh",
171
            'dest_dir': new_rule_subdirs[1],
172
            'dest_name': "%s.sh"
173
        },
174
        {
175
            'source_dir': "%s/fixes/ansible",
176
            'source_name': "%s.yml",
177
            'dest_dir': new_rule_subdirs[2],
178
            'dest_name': "%s.yml"
179
        },
180
        {
181
            'source_dir': "%s/fixes/anaconda",
182
            'source_name': "%s.anaconda",
183
            'dest_dir': new_rule_subdirs[3],
184
            'dest_name': "%s.anaconda"
185
        },
186
        {
187
            'source_dir': "%s/fixes/puppet",
188
            'source_name': "%s.pp",
189
            'dest_dir': new_rule_subdirs[4],
190
            'dest_name': "%s.pp"
191
        },
192
    ]
193
194
    moves = []
195
196
    # Generate possible build artifact paths and add them to the move queue.
197
    # The queue will later be filtered for existance before the move is
198
    # performed.
199
    for move_template in move_templates:
200
        source_dir = move_template['source_dir'] % current_product
201
        source_name = move_template['source_name'] % obj_name
202
        source_file = abs_join(ssg_root, source_dir, source_name)
203
204
        dest_dir = move_template['dest_dir']
205
        dest_name = move_template['dest_name'] % current_product
206
        dest_file = abs_join(dest_dir, dest_name)
207
208
        moves.append((source_file, dest_file))
209
210
    print("mkdir -p '%s'" % new_rule_dir)
211
    for sub_dir in new_rule_subdirs:
212
        print("mkdir -p '%s'" % sub_dir)
213
    print("mv '%s' -v '%s'" % (path, new_rule_path))
214
    for move_tuple in moves:
215
        if os.path.exists(move_tuple[0]):
216
            print("mv '%s' -v '%s'" % move_tuple)
217
218
    print()
219
220
221 View Code Duplication
def move_rule_linux_os(ssg_root, current_product, path, obj_name):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
222
    global linux_os
223
224
    base_path = os.path.dirname(path)
225
    new_rule_dir = abs_join(base_path, obj_name)
226
    new_rule_path = abs_join(new_rule_dir, "rule.yml")
227
228
    assert not os.path.exists(new_rule_dir)
229
230
    sub_dirs = ['oval', 'bash', 'ansible', 'anaconda', 'puppet']
231
    new_rule_subdirs = [abs_join(new_rule_dir, j) for j in sub_dirs]
232
233
    product_list = set(['shared']).union(linux_os)
234
235
    move_templates = [
236
        {
237
            'source_dir': "%s/checks/oval",
238
            'source_name': "%s.xml",
239
            'dest_dir': new_rule_subdirs[0],
240
            'dest_name': "%s.xml"
241
        },
242
        {
243
            'source_dir': "%s/fixes/bash",
244
            'source_name': "%s.sh",
245
            'dest_dir': new_rule_subdirs[1],
246
            'dest_name': "%s.sh"
247
        },
248
        {
249
            'source_dir': "%s/fixes/ansible",
250
            'source_name': "%s.yml",
251
            'dest_dir': new_rule_subdirs[2],
252
            'dest_name': "%s.yml"
253
        },
254
        {
255
            'source_dir': "%s/fixes/anaconda",
256
            'source_name': "%s.anaconda",
257
            'dest_dir': new_rule_subdirs[3],
258
            'dest_name': "%s.anaconda"
259
        },
260
        {
261
            'source_dir': "%s/fixes/puppet",
262
            'source_name': "%s.pp",
263
            'dest_dir': new_rule_subdirs[4],
264
            'dest_name': "%s.pp"
265
        },
266
    ]
267
268
    moves = []
269
270
    # Generate possible build artifact paths and add them to the move queue.
271
    # The queue will later be filtered for existance before the move is
272
    # performed.
273
    for move_template in move_templates:
274
        for product in product_list:
275
            source_dir = move_template['source_dir'] % product
276
            source_name = move_template['source_name'] % obj_name
277
            source_file = abs_join(ssg_root, source_dir, source_name)
278
279
            dest_dir = move_template['dest_dir']
280
            dest_name = move_template['dest_name'] % product
281
            dest_file = abs_join(dest_dir, dest_name)
282
283
            moves.append((source_file, dest_file))
284
285
    # Find the test case location
286
    #assert path.startswith(ssg_root + "/")
287
    #without_ssg_root = path[len(ssg_root)+1:]
288
    #assert without_ssg_root.startswith("linux_os/guide/")
289
    #without_linuxos_guide = without_ssg_root[len("linux_os/guide/"):]
290
    #slash_split_paths = without_linuxos_guide.split(os.path.sep)
291
    #group_parts = list(map(lambda x: "group_" + x, slash_split_paths[:-1]))
292
    #rule_part = "rule_" + obj_name
293
    #test_path = abs_join(ssg_root, "tests", "data", *group_parts, rule_part)
294
295
    #if os.path.isdir(test_path):
296
    #    for _file in os.listdir(test_path):
297
    #        start_path = abs_join(test_path, _file)
298
    #        dest_path = abs_join(new_rule_subdirs[5], _file)
299
    #        moves.append((start_path, dest_path))
300
301
    print("mkdir -p '%s'" % new_rule_dir)
302
    for sub_dir in new_rule_subdirs:
303
        print("mkdir -p '%s'" % sub_dir)
304
    print("mv '%s' -v '%s'" % (path, new_rule_path))
305
    for move_tuple in moves:
306
        if os.path.exists(move_tuple[0]):
307
            print("mv '%s' -v '%s'" % move_tuple)
308
309
    print()
310
311
312 View Code Duplication
def fix_ocil_clause(ssg_root, path, obj_name):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
313
    is_file_templated = obj_name[0:4] == 'file'
314
    is_permissions = '_permissions_' in obj_name
315
    is_groupowner = '_groupowner_' in obj_name
316
    is_owner = '_owner_' in obj_name
317
318
    if not is_file_templated or not (is_permissions or is_groupowner or is_owner):
319
        return
320
321
    loaded_file = read_file(path)
322
    sections = get_sections(loaded_file)
323
    if not 'ocil_clause' in sections:
324
        ocil_lines = find_section_lines(loaded_file, 'ocil')
325
        assert(len(ocil_lines) == 1)
326
        ocil_lines = ocil_lines[0]
327
328
        ocil = parse_from_yaml(loaded_file, ocil_lines)['ocil']
329
        if '{{{' not in ocil:
330
            print(path)
331
332
        ocil_clause_str = ocil.replace('ocil_', 'ocil_clause_')
333
        new_line = "ocil_clause: '%s'" % ocil_clause_str
334
        new_file = loaded_file[:ocil_lines[0]]
335
        new_file.extend([new_line, ''])
336
        new_file.extend(loaded_file[ocil_lines[0]:])
337
        write_file(path, new_file)
338
339
340
def parse_from_yaml(file_contents, lines):
341
    new_file_arr = file_contents[lines[0]:lines[1] + 1]
342
    new_file = "\n".join(new_file_arr)
343
    return yaml.load(new_file)
344
345
346
def print_file(file_contents):
347
    for line_num in range(0, len(file_contents)):
348
        print("%d: %s" % (line_num, file_contents[line_num]))
349
350
351 View Code Duplication
def walk_dir(ssg_root, product, function):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
352
    product_guide = os.path.join(ssg_root, product, 'guide')
353
    _pgl = len(product_guide)
354
355
    data = None
356
    for root, dirs, files in os.walk(product_guide):
357
        for filename in files:
358
            path = os.path.join(root, filename)
359
360
            obj_name = filename
361
            is_rule = len(path) >= 5 and path[-5:] == '.rule'
362
363
            if is_rule:
364
                obj_name = filename[:-5]
365
                function(ssg_root, product, path, obj_name)
366
367
368
def parse_args():
369
    parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
370
                                     description="Utility for finding similar guide rules")
371
    parser.add_argument("ssg_root", help="Path to root of ssg git directory")
372
    return parser.parse_args()
373
374
375
def __main__():
376
    global linux_os
377
378
    args = parse_args()
379
    _linux_os, other_products = ssg.products.get_all(args.ssg_root)
380
    linux_os.update(_linux_os)
381
382
    print("#!/bin/bash")
383
    print("set -e")
384
    print()
385
386
    walk_dir(args.ssg_root, "linux_os", move_rule_linux_os)
387
388
    for product in other_products:
389
        walk_dir(args.ssg_root, product, move_rule_other)
390
391
392
if __name__ == "__main__":
393
    __main__()
394