Passed
Pull Request — master (#3193)
by Alexander
02:07
created

rule_dir_json.main()   C

Complexity

Conditions 10

Size

Total Lines 43
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 110

Importance

Changes 0
Metric Value
cc 10
eloc 31
nop 0
dl 0
loc 43
ccs 0
cts 31
cp 0
crap 110
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like rule_dir_json.main() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
3
from __future__ import print_function
4
5
import argparse
6
import os
7
import sys
8
from collections import defaultdict
9
10
import json
11
12
import ssg.build_yaml
13
import ssg.oval
14
import ssg.build_remediations
15
import ssg.rules
16
import ssg.yaml
17
18
19
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
20
21
22
def parse_args():
23
    parser = argparse.ArgumentParser()
24
    parser.add_argument("-r", "--root", type=str, action="store", default=SSG_ROOT,
25
                   help="Path to SSG root directory (defaults to %s)" % SSG_ROOT)
26
    parser.add_argument("-o", "--output", type=str, action="store", default="build/rule_dirs.json",
27
                   help="File to write json output to (defaults to build/rule_dirs.json)")
28
29
    return parser.parse_args()
30
31
32
def walk_products(root, all_products):
33
    visited_guide_dirs = set()
34
35
    all_rule_dirs = []
36
    product_yamls = {}
37
    known_rules = set()
38
39
    for product in all_products:
40
        product_dir = os.path.join(root, product)
41
        product_yaml_path = os.path.join(product_dir, "product.yml")
42
        product_yaml = ssg.yaml.open_raw(product_yaml_path)
43
        product_yaml.update(ssg.yaml._get_implied_properties(product_yaml))
44
        product_yamls[product] = product_yaml
45
46
        guide_dir = os.path.join(product_dir, product_yaml['benchmark_root'])
47
        guide_dir = os.path.abspath(guide_dir)
48
49
        if guide_dir in visited_guide_dirs:
50
            continue
51
52
        new_rule_dirs = sorted(ssg.rules.find_rule_dirs(guide_dir))
53
        for rule_dir in new_rule_dirs:
54
            rule_id = ssg.rules.get_rule_dir_id(rule_dir)
55
            all_rule_dirs.append((rule_id, rule_dir, guide_dir, product))
56
57
            if rule_id in known_rules:
58
                exp = "Multiple rules with same rule_id: %s and %s" % \
59
                      (known_rules[rule_id]['rule_dir'], rule_dir)
60
                raise ValueError(exp)
61
            known_rules.add(rule_id)
62
63
        visited_guide_dirs.add(guide_dir)
64
65
    return all_rule_dirs, product_yamls
66
67
68
def handle_rule_yaml(product_list, product_yamls, rule_id, rule_dir, guide_dir):
69
    rule_obj = {'id': rule_id, 'dir': rule_dir, 'guide': guide_dir}
70
    rule_file = ssg.rules.get_rule_dir_yaml(rule_dir)
71
72
    prod_type = product_list[0]
73
    product_yaml = product_yamls[prod_type]
74
75
    rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, product_yaml)
76
    rule_products = set()
77
    for product in product_list:
78
        if ssg.rules.is_applicable(rule_yaml.prodtype, product):
79
            rule_products.add(product)
80
81
    rule_products = sorted(rule_products)
82
    rule_obj['products'] = rule_products
83
    rule_obj['title'] = rule_yaml.title
84
85
    return rule_obj
86
87
88
def handle_ovals(product_list, product_yamls, rule_obj):
89
    rule_dir = rule_obj['dir']
90
91
    rule_ovals = {}
92
    oval_products = defaultdict(set)
93
94
    for oval_path in ssg.rules.get_rule_dir_ovals(rule_dir):
95
        oval_name = os.path.basename(oval_path)
96
        oval_product, _ = os.path.splitext(oval_name)
97
        oval_obj = {'name': oval_name, 'product': oval_product}
98
99
        platforms = ssg.oval.applicable_platforms(oval_path)
100
        cs_platforms = ','.join(platforms)
101
102
        oval_obj['platforms'] = platforms
103
        oval_obj['products'] = set()
104
        for product in product_list:
105
            if ssg.rules.is_applicable(cs_platforms, product):
106
                oval_products[product].add(oval_name)
107
                oval_obj['products'].add(product)
108
109
        oval_obj['products'] = sorted(oval_obj['products'])
110
        rule_ovals[oval_name] = oval_obj
111
112
    return rule_ovals, oval_products
113
114
115
def handle_remediations(product_list, product_yamls, rule_obj):
116
    rule_dir = rule_obj['dir']
117
118
    rule_remediations = {}
119
    r_products = defaultdict(set)
120
    for r_type in ssg.build_remediations.REMEDIATION_TO_EXT_MAP:
121
        rule_remediations[r_type] = {}
122
        r_paths = ssg.rules.get_rule_dir_remediations(rule_dir, r_type)
123
124
        for r_path in r_paths:
125
            r_name = os.path.basename(r_path)
126
            r_product, r_ext = os.path.splitext(r_name)
127
            r_obj = {'type': r_type, 'name': r_name, 'product': r_product, 'ext': r_ext[1:]}
128
129
            prod_type = product_list[0]
130
            if r_product != 'shared' and r_product in product_list:
131
                prod_type = r_product
132
            product_yaml = product_yamls[prod_type]
133
134
            _, config = ssg.build_remediations.parse_from_file(r_path, product_yaml)
135
            platforms = config['platform']
136
            if not platforms:
137
                print(config['platform'])
138
139
            r_obj['platforms'] = sorted(map(lambda x: x.strip(), platforms.split(',')))
140
            r_obj['products'] = set()
141
            for product in product_list:
142
                if ssg.rules.is_applicable(platforms, product):
143
                    r_products[product].add(r_name)
144
                    r_obj['products'].add(product)
145
146
            r_obj['products'] = sorted(r_obj['products'])
147
            rule_remediations[r_type][r_name] = r_obj
148
149
    return rule_remediations, r_products
150
151
152
def main():
153
    args = parse_args()
154
155
    linux_products, other_products = ssg.products.get_all(args.root)
156
    all_products = linux_products.union(other_products)
157
158
    all_rule_dirs, product_yamls = walk_products(args.root, all_products)
159
160
    known_rules = {}
161
    for rule_id, rule_dir, guide_dir, given_product in all_rule_dirs:
162
        product_list = sorted(linux_products)
163
        if 'linux_os' not in guide_dir:
164
            product_list = [given_product]
165
166
        rule_obj = handle_rule_yaml(product_list, product_yamls, rule_id, rule_dir, guide_dir)
167
        rule_obj['ovals'], oval_products = handle_ovals(product_list, product_yamls, rule_obj)
168
        rule_obj['remediations'], r_products = handle_remediations(product_list, product_yamls, rule_obj)
169
170
        # Validate oval products
171
        for key in oval_products:
172
            oval_products[key] = sorted(oval_products[key])
173
            if len(oval_products[key]) > 1:
174
                print("product has multiple ovals: %s - %s" % (key, ','.join(oval_products[key])), file=sys.stderr)
175
176
        rule_obj['oval_products'] = oval_products
177
178
        # Validate remediation products
179
        for key in r_products:
180
            r_products[key] = sorted(r_products[key])
181
            if len(r_products[key]) > 1:
182
                exts = sorted(map(lambda x: os.path.splitext(x)[1], r_products[key]))
183
                if len(exts) != len(set(exts)):
184
                    print("product has multiple remediations of the same type: %s - %s" % (key, ','.join(r_products[key])), file=sys.stderr)
185
186
        rule_obj['remediation_products'] = r_products
187
188
        known_rules[rule_id] = rule_obj
189
190
    f = open(args.output, 'w')
191
    j = json.dump(known_rules, f)
192
    if not f.closed:
193
        f.flush()
194
        f.close()
195
196
if __name__ == "__main__":
197
    main()
198