Test Failed
Push — master ( 36ef11...81e955 )
by Jan
03:06 queued 24s
created

collect_remediations.main()   B

Complexity

Conditions 5

Size

Total Lines 32
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
cc 5
eloc 25
nop 0
dl 0
loc 32
ccs 0
cts 22
cp 0
crap 30
rs 8.8133
c 0
b 0
f 0
1
#!/usr/bin/env python
2
3
import sys
4
import os
5
import os.path
6
import argparse
7
8
import ssg.build_remediations as remediation
9
import ssg.build_yaml
10
import ssg.rules
11
import ssg.jinja
12
import ssg.environment
13
import ssg.utils
14
import ssg.xml
15
from ssg.build_cpe import ProductCPEs
16
17
18
def parse_args():
19
    p = argparse.ArgumentParser(
20
        description="This script collects all remediation scripts, both "
21
        "static and templated, processes them with Jinja and puts them to "
22
        "the given output directory.")
23
    p.add_argument(
24
        "--build-config-yaml", required=True,
25
        help="YAML file with information about the build configuration. "
26
        "e.g.: ~/scap-security-guide/build/build_config.yml"
27
    )
28
    p.add_argument(
29
        "--product-yaml", required=True,
30
        help="YAML file with information about the product we are building. "
31
        "e.g.: ~/scap-security-guide/rhel7/product.yml"
32
    )
33
    p.add_argument(
34
        "--resolved-rules-dir", required=True,
35
        help="Directory with <rule-id>.yml resolved rule YAMLs"
36
    )
37
    p.add_argument(
38
        "--remediation-type", required=True, action="append",
39
        help="language or type of the remediations we are combining."
40
        "example: ansible")
41
    p.add_argument(
42
        "--output-dir", required=True,
43
        help="output directory where all remediations will be saved"
44
    )
45
    p.add_argument(
46
        "--fixes-from-templates-dir", required=True,
47
        help="directory from which we will collect fixes generated from "
48
        "templates")
49
    p.add_argument(
50
        "--platforms-dir", required=True,
51
        help="directory from which we collect compiled platforms")
52
53
    return p.parse_args()
54
55
56
def prepare_output_dirs(output_dir, remediation_types):
57
    output_dirs = dict()
58
    for lang in remediation_types:
59
        language_output_dir = os.path.join(output_dir, lang)
60
        if not os.path.exists(language_output_dir):
61
            os.makedirs(language_output_dir)
62
        output_dirs[lang] = language_output_dir
63
    return output_dirs
64
65
66
def find_remediation(
67
        fixes_from_templates_dir, rule_dir, lang, product, expected_file_name):
68
    language_fixes_from_templates_dir = os.path.join(
69
        fixes_from_templates_dir, lang)
70
    fix_path = None
71
    # first look for a static remediation
72
    rule_dir_remediations = remediation.get_rule_dir_remediations(
73
        rule_dir, lang, product)
74
    if len(rule_dir_remediations) > 0:
75
        # first item in the list has the highest priority
76
        fix_path = rule_dir_remediations[0]
77
    if fix_path is None:
78
        # check if we have a templated remediation instead
79
        if os.path.isdir(language_fixes_from_templates_dir):
80
            templated_fix_path = os.path.join(
81
                language_fixes_from_templates_dir, expected_file_name)
82
            if os.path.exists(templated_fix_path):
83
                fix_path = templated_fix_path
84
    return fix_path
85
86
87
def process_remediation(
88
        rule, fix_path, lang, output_dirs, expected_file_name, env_yaml, cpe_platforms):
89
    remediation_cls = remediation.REMEDIATION_TO_CLASS[lang]
90
    remediation_obj = remediation_cls(fix_path)
91
    remediation_obj.associate_rule(rule)
92
    fix = remediation.process(remediation_obj, env_yaml, cpe_platforms)
93
    if fix:
94
        output_file_path = os.path.join(output_dirs[lang], expected_file_name)
95
        remediation.write_fix_to_file(fix, output_file_path)
96
97
98
def collect_remediations(
99
        rule, langs, fixes_from_templates_dir, product, output_dirs,
100
        env_yaml, cpe_platforms):
101
    rule_dir = os.path.dirname(rule.definition_location)
102
    for lang in langs:
103
        ext = remediation.REMEDIATION_TO_EXT_MAP[lang]
104
        expected_file_name = rule.id_ + ext
105
        fix_path = find_remediation(
106
            fixes_from_templates_dir, rule_dir, lang, product,
107
            expected_file_name)
108
        if fix_path is None:
109
            # neither static nor templated remediation found
110
            continue
111
        process_remediation(
112
            rule, fix_path, lang, output_dirs, expected_file_name, env_yaml, cpe_platforms)
113
114
115
def main():
116
    args = parse_args()
117
118
    env_yaml = ssg.environment.open_environment(
119
        args.build_config_yaml, args.product_yaml)
120
121
    product = ssg.utils.required_key(env_yaml, "product")
122
    output_dirs = prepare_output_dirs(args.output_dir, args.remediation_type)
123
    product_cpes = ProductCPEs(env_yaml)
124
    cpe_platforms = dict()
125
    for platform_file in os.listdir(args.platforms_dir):
126
        platform_path = os.path.join(args.platforms_dir, platform_file)
127
        try:
128
            platform = ssg.build_yaml.Platform.from_yaml(platform_path, env_yaml, product_cpes)
129
        except ssg.build_yaml.DocumentationNotComplete:
130
            # Happens on non-debug build when a platform is
131
            # "documentation-incomplete"
132
            continue
133
        cpe_platforms[platform.name] = platform
134
135
    for rule_file in os.listdir(args.resolved_rules_dir):
136
        rule_path = os.path.join(args.resolved_rules_dir, rule_file)
137
        try:
138
            rule = ssg.build_yaml.Rule.from_yaml(rule_path, env_yaml)
139
        except ssg.build_yaml.DocumentationNotComplete:
140
            # Happens on non-debug build when a rule is
141
            # "documentation-incomplete"
142
            continue
143
        collect_remediations(
144
            rule, args.remediation_type, args.fixes_from_templates_dir,
145
            product, output_dirs, env_yaml, cpe_platforms)
146
    sys.exit(0)
147
148
149
if __name__ == "__main__":
150
    main()
151