Test Failed
Pull Request — master (#5004)
by Jan
02:09
created

ssg.templates.sanitize_input()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
ccs 0
cts 2
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import os
4
import sys
5
import re
6
7
import ssg.build_yaml
8
9
languages = ["anaconda", "ansible", "bash", "oval", "puppet"]
10
11
lang_to_ext_map = {
12
    "anaconda": ".anaconda",
13
    "ansible": ".yml",
14
    "bash": ".sh",
15
    "oval": ".xml",
16
    "puppet": ".pp"
17
}
18
19
def sanitize_input(string):
20
    return re.sub(r'[\W_]', '_', string)
21
22
templates = dict()
23
24
25
def template(langs):
26
    def decorator_template(func):
27
        func.langs = langs
28
        templates[func.__name__] = func
29
        return func
30
    return decorator_template
31
32
33
# Callback functions for processing template parameters and/or validating them
34
35
36
@template(["ansible", "bash", "oval"])
37
def accounts_password(data, lang):
38
    if lang == "oval":
39
        data["sign"] = "-?" if data["variable"].endswith("credit") else ""
40
    return data
41
42
43
@template(["ansible", "bash", "oval"])
44
def auditd_lineinfile(data, lang):
45
    missing_parameter_pass = data["missing_parameter_pass"]
46
    if missing_parameter_pass == "true":
47
        missing_parameter_pass = True
48
    elif missing_parameter_pass == "false":
49
        missing_parameter_pass = False
50
    data["missing_parameter_pass"] = missing_parameter_pass
51
    return data
52
53
54
@template(["ansible", "bash", "oval"])
55
def audit_rules_dac_modification(data, lang):
56
    return data
57
58
59
@template(["ansible", "bash", "oval"])
60
def audit_rules_file_deletion_events(data, lang):
61
    return data
62
63
64
@template(["ansible", "bash", "oval"])
65
def audit_rules_login_events(data, lang):
66
    path = data["path"]
67
    name = re.sub(r'[-\./]', '_', os.path.basename(os.path.normpath(path)))
68
    data["name"] = name
69
    if lang == "oval":
70
        data["path"] = path.replace("/", "\\/")
71
    return data
72
73
74
@template(["ansible", "bash", "oval"])
75
def audit_rules_path_syscall(data, lang):
76
    if lang == "oval":
77
        pathid = re.sub(r'[-\./]', '_', data["path"])
78
        # remove root slash made into '_'
79
        pathid = pathid[1:]
80
        data["pathid"] = pathid
81
    return data
82
83
84
@template(["ansible", "bash", "oval"])
85
def audit_rules_privileged_commands(data, lang):
86
    path = data["path"]
87
    name = re.sub(r"[-\./]", "_", os.path.basename(path))
88
    data["name"] = name
89
    if lang == "oval":
90
        data["id"] = data["_rule_id"]
91
        data["title"] = "Record Any Attempts to Run " + name
92
        data["path"] = path.replace("/", "\\/")
93
    return data
94
95
96
@template(["ansible", "bash", "oval"])
97
def audit_rules_unsuccessful_file_modification(data, lang):
98
    return data
99
100
101
@template(["oval"])
102
def audit_rules_unsuccessful_file_modification_o_creat(data, lang):
103
    return data
104
105
106
@template(["oval"])
107
def audit_rules_unsuccessful_file_modification_o_trunc_write(data, lang):
108
    return data
109
110
111
@template(["oval"])
112
def audit_rules_unsuccessful_file_modification_rule_order(data, lang):
113
    return data
114
115
116
@template(["ansible", "bash", "oval"])
117
def audit_rules_usergroup_modification(data, lang):
118
    path = data["path"]
119
    name = re.sub(r'[-\./]', '_', os.path.basename(path))
120
    data["name"] = name
121
    if lang == "oval":
122
        data["path"] = path.replace("/", "\\/")
123
    return data
124
125
126
def _file_owner_groupowner_permissions_regex(data):
127
    data["is_directory"] = data["filepath"].endswith("/")
128
    if "missing_file_pass" not in data:
129
        data["missing_file_pass"] = False
130
    if "file_regex" in data and not data["is_directory"]:
131
        raise ValueError(
132
            "Used 'file_regex' key in rule '{0}' but filepath '{1}' does not "
133
            "specify a directory. Append '/' to the filepath or remove the "
134
            "'file_regex' key.".format(data["_rule_id"], data["filepath"]))
135
136
137
@template(["ansible", "bash", "oval"])
138
def file_groupowner(data, lang):
139
    _file_owner_groupowner_permissions_regex(data)
140
    if lang == "oval":
141
        data["fileid"] = data["_rule_id"].replace("file_groupowner", "")
142
    return data
143
144
145
@template(["ansible", "bash", "oval"])
146
def file_owner(data, lang):
147
    _file_owner_groupowner_permissions_regex(data)
148
    if lang == "oval":
149
        data["fileid"] = data["_rule_id"].replace("file_owner", "")
150
    return data
151
152
153
@template(["ansible", "bash", "oval"])
154
def file_permissions(data, lang):
155
    _file_owner_groupowner_permissions_regex(data)
156
    if lang == "oval":
157
        data["fileid"] = data["_rule_id"].replace("file_permissions", "")
158
        # build the state that describes our mode
159
        # mode_str maps to STATEMODE in the template
160
        mode = data["filemode"]
161
        fields = [
162
            'oexec', 'owrite', 'oread', 'gexec', 'gwrite', 'gread',
163
            'uexec', 'uwrite', 'uread', 'sticky', 'sgid', 'suid']
164
        mode_int = int(mode, 8)
165
        mode_str = ""
166
        for field in fields:
167
            if mode_int & 0x01 == 1:
168
                mode_str = (
169
                    "	<unix:" + field + " datatype=\"boolean\">true</unix:"
170
                    + field + ">\n" + mode_str)
171
            else:
172
                mode_str = (
173
                    "	<unix:" + field + " datatype=\"boolean\">false</unix:"
174
                    + field + ">\n" + mode_str)
175
            mode_int = mode_int >> 1
176
        data["statemode"] = mode_str
177
    return data
178
179
180
@template(["ansible", "bash", "oval"])
181
def grub2_bootloader_argument(data, lang):
182
    data["arg_name_value"] = data["arg_name"] + "=" + data["arg_value"]
183
    return data
184
185
186
@template(["ansible", "bash", "oval"])
187
def kernel_module_disabled(data, lang):
188
    return data
189
190
191
@template(["anaconda", "oval"])
192
def mount(data, lang):
193
    data["pointid"] = re.sub(r'[-\./]', '_', data["mountpoint"])
194
    return data
195
196
197
def _mount_option(data, lang):
198
    if lang == "oval":
199
        data["pointid"] = re.sub(r"[-\./]", "_", data["mountpoint"]).lstrip("_")
200
    else:
201
        data["mountoption"] = re.sub(" ", ",", data["mountoption"])
202
    return data
203
204
205
@template(["anaconda", "ansible", "bash", "oval"])
206
def mount_option(data, lang):
207
    return _mount_option(data, lang)
208
209
210
@template(["ansible", "bash", "oval"])
211
def mount_option_remote_filesystems(data, lang):
212
    if lang == "oval":
213
        data["mountoptionid"] = sanitize_input(data["mountoption"])
214
    return _mount_option(data, lang)
215
216
217
@template(["anaconda", "ansible", "bash", "oval"])
218
def mount_option_removable_partitions(data, lang):
219
    return _mount_option(data, lang)
220
221
222
@template(["anaconda", "ansible", "bash", "oval", "puppet"])
223
def package_installed(data, lang):
224
    if "evr" in data:
225
        evr = data["evr"]
226
        if evr and not re.match(r'\d:\d[\d\w+.]*-\d[\d\w+.]*', evr, 0):
227
            raise RuntimeError(
228
                "ERROR: input violation: evr key should be in "
229
                "epoch:version-release format, but package {0} has set "
230
                "evr to {1}".format(data["pkgname"], evr))
231
    return data
232
233
234
@template(["ansible", "bash", "oval"])
235
def sysctl(data, lang):
236
    data["sysctlid"] = re.sub(r'[-\.]', '_', data["sysctlvar"])
237
    if not data.get("sysctlval"):
238
        data["sysctlval"] = ""
239
    ipv6_flag = "P"
240
    if data["sysctlid"].find("ipv6") >= 0:
241
        ipv6_flag = "I"
242
    data["flags"] = "SR" + ipv6_flag
243
    return data
244
245
246
@template(["anaconda", "ansible", "bash", "oval", "puppet"])
247
def package_removed(data, lang):
248
    return data
249
250
251
@template(["ansible", "bash", "oval"])
252
def sebool(data, lang):
253
    sebool_bool = data.get("sebool_bool", None)
254
    if sebool_bool is not None and sebool_bool not in ["true", "false"]:
255
        raise ValueError(
256
            "ERROR: key sebool_bool in rule {0} contains forbidden "
257
            "value '{1}'.".format(data["_rule_id"], sebool_bool)
258
        )
259
    return data
260
261
262
@template(["ansible", "bash", "oval", "puppet"])
263
def service_disabled(data, lang):
264
    if "packagename" not in data:
265
        data["packagename"] = data["servicename"]
266
    if "daemonname" not in data:
267
        data["daemonname"] = data["servicename"]
268
    if "mask_service" not in data:
269
        data["mask_service"] = "true"
270
    return data
271
272
273
@template(["ansible", "bash", "oval", "puppet"])
274
def service_enabled(data, lang):
275
    if "packagename" not in data:
276
        data["packagename"] = data["servicename"]
277
    if "daemonname" not in data:
278
        data["daemonname"] = data["servicename"]
279
    return data
280
281
282
@template(["ansible", "bash", "oval"])
283
def sshd_lineinfile(data, lang):
284
    missing_parameter_pass = data["missing_parameter_pass"]
285
    if missing_parameter_pass == "true":
286
        missing_parameter_pass = True
287
    elif missing_parameter_pass == "false":
288
        missing_parameter_pass = False
289
    data["missing_parameter_pass"] = missing_parameter_pass
290
    return data
291
292
293
@template(["ansible", "bash", "oval"])
294
def timer_enabled(data, lang):
295
    if "packagename" not in data:
296
        data["packagename"] = data["timername"]
297
    return data
298
299
300
class Builder(object):
301
    """
302
    Class for building all templated content for a given product.
303
304
    To generate content from templates, pass the env_yaml, path to the
305
    directory with resolved rule YAMLs, path to the directory that contains
306
    templates, path to the output directory for checks and a path to the
307
    output directory for remediations into the constructor. Then, call the
308
    method build() to perform a build.
309
    """
310
    def __init__(
311
            self, env_yaml, resolved_rules_dir, templates_dir,
312
            remediations_dir, checks_dir):
313
        self.env_yaml = env_yaml
314
        self.resolved_rules_dir = resolved_rules_dir
315
        self.templates_dir = templates_dir
316
        self.remediations_dir = remediations_dir
317
        self.checks_dir = checks_dir
318
        self.output_dirs = dict()
319
        for lang in languages:
320
            if lang == "oval":
321
                # OVAL checks need to be put to a different directory because
322
                # they are processed differently than remediations later in the
323
                # build process
324
                output_dir = self.checks_dir
325
            else:
326
                output_dir = self.remediations_dir
327
            dir_ = os.path.join(output_dir, lang)
328
            self.output_dirs[lang] = dir_
329
330
    def preprocess_data(self, template, lang, raw_parameters):
331
        """
332
        Processes template data using a callback before the data will be
333
        substituted into the Jinja template.
334
        """
335
        template_func = templates[template]
336
        parameters = template_func(raw_parameters.copy(), lang)
337
        # TODO: Remove this right after the variables in templates are renamed
338
        # to lowercase
339
        uppercases = dict()
340
        for k, v in parameters.items():
341
            uppercases[k.upper()] = v
342
        return uppercases
343
344
    def build_lang(
345
            self, rule_id, template_name, template_vars, lang, local_env_yaml):
346
        """
347
        Builds templated content for a given rule for a given language.
348
        Writes the output to the correct build directories.
349
        """
350
        template_func = templates[template_name]
351
        if lang not in template_func.langs:
352
            return
353
        template_file_name = "template_{0}_{1}".format(
354
            lang.upper(), template_name)
355
        template_file_path = os.path.join(
356
            self.templates_dir, template_file_name)
357
        if not os.path.exists(template_file_path):
358
            raise RuntimeError(
359
                "Rule {0} wants to generate {1} content from template {2}, "
360
                "but file {3} which provides this template does not "
361
                "exist.".format(
362
                    rule_id, lang, template_name, template_file_path)
363
            )
364
        ext = lang_to_ext_map[lang]
365
        output_file_name = rule_id + ext
366
        output_filepath = os.path.join(
367
            self.output_dirs[lang], output_file_name)
368
        template_parameters = self.preprocess_data(
369
            template_name, lang, template_vars)
370
        jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
371
        filled_template = ssg.jinja.process_file_with_macros(
372
            template_file_path, jinja_dict)
373
        with open(output_filepath, "w") as f:
374
            f.write(filled_template)
375
376
    def get_langs_to_generate(self, rule):
377
        """
378
        For a given rule returns list of languages that should be generated
379
        from templates. This is controlled by "template_backends" in rule.yml.
380
        """
381
        if "backends" in rule.template:
382
            backends = rule.template["backends"]
383
            for lang in backends:
384
                if lang not in languages:
385
                    raise RuntimeError(
386
                        "Rule {0} wants to generate unknown language '{1}"
387
                        "from a template.".format(rule.id_, lang)
388
                    )
389
            langs_to_generate = []
390
            for lang in languages:
391
                backend = backends.get(lang, "on")
392
                if backend == "on":
393
                    langs_to_generate.append(lang)
394
            return langs_to_generate
395
        else:
396
            return languages
397
398
    def build_rule(self, rule_id, rule_title, template, langs_to_generate):
399
        """
400
        Builds templated content for a given rule for selected languages,
401
        writing the output to the correct build directories.
402
        """
403
        try:
404
            template_name = template["name"]
405
        except KeyError:
406
            raise ValueError(
407
                "Rule {0} is missing template name under template key".format(
408
                    rule_id))
409
        if template_name not in templates:
410
            raise ValueError(
411
                "Rule {0} uses template {1} which does not exist.".format(
412
                    rule_id, template_name))
413
        try:
414
            template_vars = template["vars"]
415
        except KeyError:
416
            raise ValueError(
417
                "Rule {0} does not contain mandatory 'vars:' key under "
418
                "'template:' key.".format(rule_id))
419
        # Add the rule ID which will be reused in OVAL templates as OVAL
420
        # definition ID so that the build system matches the generated
421
        # check with the rule.
422
        template_vars["_rule_id"] = rule_id
423
        # checks and remediations are processed with a custom YAML dict
424
        local_env_yaml = self.env_yaml.copy()
425
        local_env_yaml["rule_id"] = rule_id
426
        local_env_yaml["rule_title"] = rule_title
427
        local_env_yaml["products"] = self.env_yaml["product"]
428
        for lang in langs_to_generate:
429
            self.build_lang(
430
                rule_id, template_name, template_vars, lang, local_env_yaml)
431
432
    def build_extra_ovals(self):
433
        declaration_path = os.path.join(self.templates_dir, "extra_ovals.yml")
434
        declaration = ssg.yaml.open_raw(declaration_path)
435
        for oval_def_id, template in declaration.items():
436
            langs_to_generate = ["oval"]
437
            # Since OVAL definition ID in shorthand format is always the same
438
            # as rule ID, we can use it instead of the rule ID even if no rule
439
            # with that ID exists
440
            self.build_rule(
441
                oval_def_id, oval_def_id, template, langs_to_generate)
442
443
    def build_all_rules(self):
444
        for rule_file in os.listdir(self.resolved_rules_dir):
445
            rule_path = os.path.join(self.resolved_rules_dir, rule_file)
446
            try:
447
                rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml)
448
            except ssg.build_yaml.DocumentationNotComplete:
449
                # Happens on non-debug build when a rule is "documentation-incomplete"
450
                continue
451
            if rule.template is None:
452
                # rule is not templated, skipping
453
                continue
454
            langs_to_generate = self.get_langs_to_generate(rule)
455
            self.build_rule(
456
                rule.id_, rule.title, rule.template, langs_to_generate)
457
458
    def build(self):
459
        """
460
        Builds all templated content for all languages, writing
461
        the output to the correct build directories.
462
        """
463
464
        for dir_ in self.output_dirs.values():
465
            if not os.path.exists(dir_):
466
                os.makedirs(dir_)
467
468
        self.build_extra_ovals()
469
        self.build_all_rules()
470