Passed
Push — master ( 42467a...9d011f )
by Matěj
03:19 queued 11s
created

utils.rule_dir_json   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 204
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 144
dl 0
loc 204
ccs 0
cts 139
cp 0
rs 9.6
c 0
b 0
f 0
wmc 35

7 Functions

Rating   Name   Duplication   Size   Complexity  
C main() 0 48 11
A handle_ovals() 0 25 4
A parse_args() 0 8 1
B walk_products() 0 26 5
A collect_rule_ids_and_dirs() 0 3 2
A handle_rule_yaml() 0 18 3
C handle_remediations() 0 37 9
1
#!/usr/bin/env python
2
3
from __future__ import print_function
4
5
import argparse
6
import os
7
import sys
8
from collections import defaultdict
9
10
import json
11
12
import ssg.build_yaml
13
import ssg.oval
14
import ssg.build_remediations
15
import ssg.products
16
import ssg.rules
17
import ssg.yaml
18
19
20
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
21
BUILD_OUTPUT = os.path.join(SSG_ROOT, "build", "rule_dirs.json")
22
23
24
def parse_args():
25
    parser = argparse.ArgumentParser()
26
    parser.add_argument("-r", "--root", type=str, action="store", default=SSG_ROOT,
27
                   help="Path to SSG root directory (defaults to %s)" % SSG_ROOT)
28
    parser.add_argument("-o", "--output", type=str, action="store", default=BUILD_OUTPUT,
29
                   help="File to write json output to (defaults to build/rule_dirs.json)")
30
31
    return parser.parse_args()
32
33
34
def walk_products(root, all_products):
35
    visited_dirs = set()
36
37
    all_rule_dirs = []
38
    product_yamls = {}
39
40
    for product in all_products:
41
        product_dir = os.path.join(root, product)
42
        product_yaml_path = os.path.join(product_dir, "product.yml")
43
        product_yaml = ssg.yaml.open_raw(product_yaml_path)
44
        product_yaml.update(ssg.yaml._get_implied_properties(product_yaml))
45
        product_yamls[product] = product_yaml
46
47
        guide_dir = os.path.join(product_dir, product_yaml['benchmark_root'])
48
        guide_dir = os.path.abspath(guide_dir)
49
50
        additional_content_directories = product_yaml.get("additional_content_directories", [])
51
        add_content_dirs = [os.path.abspath(os.path.join(product_dir, rd)) for rd in additional_content_directories]
52
53
        for cur_dir in [guide_dir] + add_content_dirs:
54
            if cur_dir not in visited_dirs:
55
                for rule_id, rule_dir in collect_rule_ids_and_dirs(cur_dir):
56
                    all_rule_dirs.append((rule_id, rule_dir, cur_dir, product))
57
                visited_dirs.add(cur_dir)
58
59
    return all_rule_dirs, product_yamls
60
61
62
def collect_rule_ids_and_dirs(rules_dir):
63
    for rule_dir in sorted(ssg.rules.find_rule_dirs(rules_dir)):
64
        yield ssg.rules.get_rule_dir_id(rule_dir), rule_dir
65
66
67
def handle_rule_yaml(product_list, product_yamls, rule_id, rule_dir, guide_dir):
68
    rule_obj = {'id': rule_id, 'dir': rule_dir, 'guide': guide_dir}
69
    rule_file = ssg.rules.get_rule_dir_yaml(rule_dir)
70
71
    prod_type = product_list[0]
72
    product_yaml = product_yamls[prod_type]
73
74
    rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, product_yaml)
75
    rule_products = set()
76
    for product in product_list:
77
        if ssg.utils.is_applicable(rule_yaml.prodtype, product):
78
            rule_products.add(product)
79
80
    rule_products = sorted(rule_products)
81
    rule_obj['products'] = rule_products
82
    rule_obj['title'] = rule_yaml.title
83
84
    return rule_obj
85
86
87
def handle_ovals(product_list, product_yamls, rule_obj):
88
    rule_dir = rule_obj['dir']
89
90
    rule_ovals = {}
91
    oval_products = defaultdict(set)
92
93
    for oval_path in ssg.rules.get_rule_dir_ovals(rule_dir):
94
        oval_name = os.path.basename(oval_path)
95
        oval_product, _ = os.path.splitext(oval_name)
96
        oval_obj = {'name': oval_name, 'product': oval_product}
97
98
        platforms = ssg.oval.applicable_platforms(oval_path)
99
        cs_platforms = ','.join(platforms)
100
101
        oval_obj['platforms'] = platforms
102
        oval_obj['products'] = set()
103
        for product in product_list:
104
            if ssg.utils.is_applicable(cs_platforms, product):
105
                oval_products[product].add(oval_name)
106
                oval_obj['products'].add(product)
107
108
        oval_obj['products'] = sorted(oval_obj['products'])
109
        rule_ovals[oval_name] = oval_obj
110
111
    return rule_ovals, oval_products
112
113
114
def handle_remediations(product_list, product_yamls, rule_obj):
115
    rule_dir = rule_obj['dir']
116
117
    rule_remediations = {}
118
    r_products = defaultdict(set)
119
    for r_type in ssg.build_remediations.REMEDIATION_TO_EXT_MAP:
120
        rule_remediations[r_type] = {}
121
        r_paths = ssg.build_remediations.get_rule_dir_remediations(rule_dir, r_type)
122
123
        for r_path in r_paths:
124
            r_name = os.path.basename(r_path)
125
            r_product, r_ext = os.path.splitext(r_name)
126
            r_obj = {'type': r_type, 'name': r_name, 'product': r_product, 'ext': r_ext[1:]}
127
128
            prod_type = product_list[0]
129
            if r_product != 'shared' and r_product in product_list:
130
                prod_type = r_product
131
            product_yaml = product_yamls[prod_type]
132
133
            _, config = ssg.build_remediations.parse_from_file_with_jinja(
134
                r_path, product_yaml
135
            )
136
            platforms = config['platform']
137
            if not platforms:
138
                print(config['platform'])
139
140
            r_obj['platforms'] = sorted(map(lambda x: x.strip(), platforms.split(',')))
141
            r_obj['products'] = set()
142
            for product in product_list:
143
                if ssg.utils.is_applicable(platforms, product):
144
                    r_products[product].add(r_name)
145
                    r_obj['products'].add(product)
146
147
            r_obj['products'] = sorted(r_obj['products'])
148
            rule_remediations[r_type][r_name] = r_obj
149
150
    return rule_remediations, r_products
151
152
153
def main():
154
    args = parse_args()
155
156
    linux_products, other_products = ssg.products.get_all(args.root)
157
    all_products = linux_products.union(other_products)
158
159
    all_rule_dirs, product_yamls = walk_products(args.root, all_products)
160
161
    known_rules = {}
162
    for rule_id, rule_dir, guide_dir, given_product in all_rule_dirs:
163
        product_list = sorted(linux_products)
164
        if 'linux_os' not in guide_dir:
165
            product_list = [given_product]
166
167
        try:
168
            rule_obj = handle_rule_yaml(product_list, product_yamls, rule_id, rule_dir, guide_dir)
169
        except ssg.yaml.DocumentationNotComplete:
170
            # Happens on non-debug build when a rule is "documentation-incomplete"
171
            continue
172
173
        rule_obj['ovals'], oval_products = handle_ovals(product_list, product_yamls, rule_obj)
174
        rule_obj['remediations'], r_products = handle_remediations(product_list, product_yamls, rule_obj)
175
176
        # Validate oval products
177
        for key in oval_products:
178
            oval_products[key] = sorted(oval_products[key])
179
            if len(oval_products[key]) > 1:
180
                print("product has multiple ovals: %s - %s" % (key, ','.join(oval_products[key])), file=sys.stderr)
181
182
        rule_obj['oval_products'] = oval_products
183
184
        # Validate remediation products
185
        for key in r_products:
186
            r_products[key] = sorted(r_products[key])
187
            if len(r_products[key]) > 1:
188
                exts = sorted(map(lambda x: os.path.splitext(x)[1], r_products[key]))
189
                if len(exts) != len(set(exts)):
190
                    print("product has multiple remediations of the same type: %s - %s" % (key, ','.join(r_products[key])), file=sys.stderr)
191
192
        rule_obj['remediation_products'] = r_products
193
194
        known_rules[rule_id] = rule_obj
195
196
    f = open(args.output, 'w')
197
    j = json.dump(known_rules, f)
198
    if not f.closed:
199
        f.flush()
200
        f.close()
201
202
if __name__ == "__main__":
203
    main()
204