Passed
Push — master ( 234e2f...28d6a7 )
by Jan
02:36
created

ssg.templates.Builder.preprocess_data()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 4
dl 0
loc 13
ccs 0
cts 7
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
from __future__ import absolute_import
2
from __future__ import print_function
3
4
import os
5
import sys
6
import re
7
from xml.sax.saxutils import unescape
8
9
import ssg.build_yaml
10
11
try:
12
    from urllib.parse import quote
13
except ImportError:
14
    from urllib import quote
15
16
languages = ["anaconda", "ansible", "bash", "oval", "puppet", "ignition", "kubernetes"]
17
18
lang_to_ext_map = {
19
    "anaconda": ".anaconda",
20
    "ansible": ".yml",
21
    "bash": ".sh",
22
    "oval": ".xml",
23
    "puppet": ".pp",
24
    "ignition": ".yml",
25
    "kubernetes": ".yml"
26
}
27
28
def sanitize_input(string):
29
    return re.sub(r'[\W_]', '_', string)
30
31
templates = dict()
32
33
34
def template(langs):
35
    def decorator_template(func):
36
        func.langs = langs
37
        templates[func.__name__] = func
38
        return func
39
    return decorator_template
40
41
42
# Callback functions for processing template parameters and/or validating them
43
44
45
@template(["ansible", "bash", "oval"])
46
def accounts_password(data, lang):
47
    if lang == "oval":
48
        data["sign"] = "-?" if data["variable"].endswith("credit") else ""
49
    return data
50
51
52
@template(["ansible", "bash", "oval"])
53
def auditd_lineinfile(data, lang):
54
    missing_parameter_pass = data["missing_parameter_pass"]
55
    if missing_parameter_pass == "true":
56
        missing_parameter_pass = True
57
    elif missing_parameter_pass == "false":
58
        missing_parameter_pass = False
59
    data["missing_parameter_pass"] = missing_parameter_pass
60
    return data
61
62
63
@template(["ansible", "bash", "oval", "kubernetes"])
64
def audit_rules_dac_modification(data, lang):
65
    return data
66
67
68
@template(["ansible", "bash", "oval"])
69
def audit_rules_file_deletion_events(data, lang):
70
    return data
71
72
73
@template(["ansible", "bash", "oval", "kubernetes"])
74
def audit_rules_login_events(data, lang):
75
    path = data["path"]
76
    name = re.sub(r'[-\./]', '_', os.path.basename(os.path.normpath(path)))
77
    data["name"] = name
78
    if lang == "oval":
79
        data["path"] = path.replace("/", "\\/")
80
    return data
81
82
83
@template(["ansible", "bash", "oval"])
84
def audit_rules_path_syscall(data, lang):
85
    if lang == "oval":
86
        pathid = re.sub(r'[-\./]', '_', data["path"])
87
        # remove root slash made into '_'
88
        pathid = pathid[1:]
89
        data["pathid"] = pathid
90
    return data
91
92
93
@template(["ansible", "bash", "oval", "kubernetes"])
94
def audit_rules_privileged_commands(data, lang):
95
    path = data["path"]
96
    name = re.sub(r"[-\./]", "_", os.path.basename(path))
97
    data["name"] = name
98
    if lang == "oval":
99
        data["id"] = data["_rule_id"]
100
        data["title"] = "Record Any Attempts to Run " + name
101
        data["path"] = path.replace("/", "\\/")
102
    elif lang == "kubernetes":
103
        npath = path.replace("/", "_")
104
        if npath[0] == '_':
105
            npath = npath[1:]
106
        data["normalized_path"] = npath
107
    return data
108
109
@template(["ansible", "bash", "oval"])
110
def audit_rules_rule_file(data, lang):
111
    return data
112
113
114
@template(["ansible", "bash", "oval"])
115
def audit_rules_unsuccessful_file_modification(data, lang):
116
    return data
117
118
119
@template(["oval"])
120
def audit_rules_unsuccessful_file_modification_o_creat(data, lang):
121
    return data
122
123
124
@template(["oval"])
125
def audit_rules_unsuccessful_file_modification_o_trunc_write(data, lang):
126
    return data
127
128
129
@template(["oval"])
130
def audit_rules_unsuccessful_file_modification_rule_order(data, lang):
131
    return data
132
133
134
@template(["ansible", "bash", "oval"])
135
def audit_rules_usergroup_modification(data, lang):
136
    path = data["path"]
137
    name = re.sub(r'[-\./]', '_', os.path.basename(path))
138
    data["name"] = name
139
    if lang == "oval":
140
        data["path"] = path.replace("/", "\\/")
141
    return data
142
143
144
@template(["ansible", "bash", "oval"])
145
def audit_file_contents(data, lang):
146
    if lang == "oval":
147
        pathid = re.sub(r'[-\./]', '_', data["filepath"])
148
        # remove root slash made into '_'
149
        pathid = pathid[1:]
150
        data["filepath_id"] = pathid
151
152
    # The build system converts "<",">" and "&" for us
153
    if lang == "bash" or lang == "ansible":
154
        data["contents"] = unescape(data["contents"])
155
    return data
156
157
158
def _file_owner_groupowner_permissions_regex(data):
159
    data["is_directory"] = data["filepath"].endswith("/")
160
    if "missing_file_pass" not in data:
161
        data["missing_file_pass"] = False
162
    if "file_regex" in data and not data["is_directory"]:
163
        raise ValueError(
164
            "Used 'file_regex' key in rule '{0}' but filepath '{1}' does not "
165
            "specify a directory. Append '/' to the filepath or remove the "
166
            "'file_regex' key.".format(data["_rule_id"], data["filepath"]))
167
168
169
@template(["ansible", "bash", "oval"])
170
def file_groupowner(data, lang):
171
    _file_owner_groupowner_permissions_regex(data)
172
    if lang == "oval":
173
        data["fileid"] = data["_rule_id"].replace("file_groupowner", "")
174
    return data
175
176
177
@template(["ansible", "bash", "oval"])
178
def file_owner(data, lang):
179
    _file_owner_groupowner_permissions_regex(data)
180
    if lang == "oval":
181
        data["fileid"] = data["_rule_id"].replace("file_owner", "")
182
    return data
183
184
185
@template(["ansible", "bash", "oval"])
186
def file_permissions(data, lang):
187
    _file_owner_groupowner_permissions_regex(data)
188
    if lang == "oval":
189
        data["fileid"] = data["_rule_id"].replace("file_permissions", "")
190
        # build the state that describes our mode
191
        # mode_str maps to STATEMODE in the template
192
        mode = data["filemode"]
193
        fields = [
194
            'oexec', 'owrite', 'oread', 'gexec', 'gwrite', 'gread',
195
            'uexec', 'uwrite', 'uread', 'sticky', 'sgid', 'suid']
196
        mode_int = int(mode, 8)
197
        mode_str = ""
198
        for field in fields:
199
            if mode_int & 0x01 == 1:
200
                mode_str = (
201
                    "	<unix:" + field + " datatype=\"boolean\">true</unix:"
202
                    + field + ">\n" + mode_str)
203
            else:
204
                mode_str = (
205
                    "	<unix:" + field + " datatype=\"boolean\">false</unix:"
206
                    + field + ">\n" + mode_str)
207
            mode_int = mode_int >> 1
208
        data["statemode"] = mode_str
209
    return data
210
211
212
@template(["ansible", "bash", "oval"])
213
def grub2_bootloader_argument(data, lang):
214
    data["arg_name_value"] = data["arg_name"] + "=" + data["arg_value"]
215
    if lang == "oval":
216
        # escape dot, this is used in oval regex
217
        data["escaped_arg_name_value"] = data["arg_name_value"].replace(".", "\\.")
218
        # replace . with _, this is used in test / object / state ids
219
        data["sanitized_arg_name"] = data["arg_name"].replace(".", "_")
220
    return data
221
222
223
@template(["ansible", "bash", "oval"])
224
def kernel_module_disabled(data, lang):
225
    return data
226
227
228
@template(["anaconda", "oval"])
229
def mount(data, lang):
230
    data["pointid"] = re.sub(r'[-\./]', '_', data["mountpoint"])
231
    return data
232
233
234
def _mount_option(data, lang):
235
    if lang == "oval":
236
        data["pointid"] = re.sub(r"[-\./]", "_", data["mountpoint"]).lstrip("_")
237
    else:
238
        data["mountoption"] = re.sub(" ", ",", data["mountoption"])
239
    return data
240
241
242
@template(["anaconda", "ansible", "bash", "oval"])
243
def mount_option(data, lang):
244
    return _mount_option(data, lang)
245
246
247
@template(["ansible", "bash", "oval"])
248
def mount_option_remote_filesystems(data, lang):
249
    if lang == "oval":
250
        data["mountoptionid"] = sanitize_input(data["mountoption"])
251
    return _mount_option(data, lang)
252
253
254
@template(["anaconda", "ansible", "bash", "oval"])
255
def mount_option_removable_partitions(data, lang):
256
    return data
257
258
259
@template(["anaconda", "ansible", "bash", "oval", "puppet"])
260
def package_installed(data, lang):
261
    if "evr" in data:
262
        evr = data["evr"]
263
        if evr and not re.match(r'\d:\d[\d\w+.]*-\d[\d\w+.]*', evr, 0):
264
            raise RuntimeError(
265
                "ERROR: input violation: evr key should be in "
266
                "epoch:version-release format, but package {0} has set "
267
                "evr to {1}".format(data["pkgname"], evr))
268
    return data
269
270
271
@template(["ansible", "bash", "oval"])
272
def sysctl(data, lang):
273
    data["sysctlid"] = re.sub(r'[-\.]', '_', data["sysctlvar"])
274
    if not data.get("sysctlval"):
275
        data["sysctlval"] = ""
276
    ipv6_flag = "P"
277
    if data["sysctlid"].find("ipv6") >= 0:
278
        ipv6_flag = "I"
279
    data["flags"] = "SR" + ipv6_flag
280
    return data
281
282
283
@template(["anaconda", "ansible", "bash", "oval", "puppet"])
284
def package_removed(data, lang):
285
    return data
286
287
288
@template(["ansible", "bash", "oval"])
289
def sebool(data, lang):
290
    sebool_bool = data.get("sebool_bool", None)
291
    if sebool_bool is not None and sebool_bool not in ["true", "false"]:
292
        raise ValueError(
293
            "ERROR: key sebool_bool in rule {0} contains forbidden "
294
            "value '{1}'.".format(data["_rule_id"], sebool_bool)
295
        )
296
    return data
297
298
299
@template(["ansible", "bash", "oval", "puppet", "ignition", "kubernetes"])
300
def service_disabled(data, lang):
301
    if "packagename" not in data:
302
        data["packagename"] = data["servicename"]
303
    if "daemonname" not in data:
304
        data["daemonname"] = data["servicename"]
305
    if "mask_service" not in data:
306
        data["mask_service"] = "true"
307
    return data
308
309
310
@template(["ansible", "bash", "oval", "puppet"])
311
def service_enabled(data, lang):
312
    if "packagename" not in data:
313
        data["packagename"] = data["servicename"]
314
    if "daemonname" not in data:
315
        data["daemonname"] = data["servicename"]
316
    return data
317
318
319
@template(["ansible", "bash", "oval", "kubernetes"])
320
def sshd_lineinfile(data, lang):
321
    missing_parameter_pass = data["missing_parameter_pass"]
322
    if missing_parameter_pass == "true":
323
        missing_parameter_pass = True
324
    elif missing_parameter_pass == "false":
325
        missing_parameter_pass = False
326
    data["missing_parameter_pass"] = missing_parameter_pass
327
    return data
328
329
330
@template(["ansible", "bash", "oval"])
331
def shell_lineinfile(data, lang):
332
    value = data["value"]
333
    if value[0] in ("'", '"') and value[0] == value[-1]:
334
        msg = (
335
            "Value >>{value}<< of shell variable '{varname}' "
336
            "has been supplied with quotes, please fix the content - "
337
            "shell quoting is handled by the check/remediation code."
338
            .format(value=value, varname=data["parameter"]))
339
        raise Exception(msg)
340
    missing_parameter_pass = data.get("missing_parameter_pass", "false")
341
    if missing_parameter_pass == "true":
342
        missing_parameter_pass = True
343
    elif missing_parameter_pass == "false":
344
        missing_parameter_pass = False
345
    data["missing_parameter_pass"] = missing_parameter_pass
346
    no_quotes = False
347
    if data["no_quotes"] == "true":
348
        no_quotes = True
349
    data["no_quotes"] = no_quotes
350
    return data
351
352
353
@template(["ansible", "bash", "oval"])
354
def timer_enabled(data, lang):
355
    if "packagename" not in data:
356
        data["packagename"] = data["timername"]
357
    return data
358
359
360
@template(["oval"])
361
def yamlfile_value(data, lang):
362
    data["negate"] = data.get("negate", "false") == "true"
363
    data["ocp_data"] = data.get("ocp_data", "false") == "true"
364
    return data
365
366
367
@template(["oval"])
368
def bls_entries_option(data, lang):
369
    data["arg_name_value"] = data["arg_name"] + "=" + data["arg_value"]
370
    if lang == "oval":
371
        # escape dot, this is used in oval regex
372
        data["escaped_arg_name_value"] = data["arg_name_value"].replace(".", "\\.")
373
        # replace . with _, this is used in test / object / state ids
374
        data["sanitized_arg_name"] = data["arg_name"].replace(".", "_")
375
    return data
376
377
378
@template(["ansible", "bash", "oval"])
379
def zipl_bls_entries_option(data, lang):
380
    return bls_entries_option(data, lang)
381
382
383
@template(["oval"])
384
def coreos_kernel_option(data, lang):
385
    return bls_entries_option(data, lang)
386
387
388
class Builder(object):
389
    """
390
    Class for building all templated content for a given product.
391
392
    To generate content from templates, pass the env_yaml, path to the
393
    directory with resolved rule YAMLs, path to the directory that contains
394
    templates, path to the output directory for checks and a path to the
395
    output directory for remediations into the constructor. Then, call the
396
    method build() to perform a build.
397
    """
398
    def __init__(
399
            self, env_yaml, resolved_rules_dir, templates_dir,
400
            remediations_dir, checks_dir):
401
        self.env_yaml = env_yaml
402
        self.resolved_rules_dir = resolved_rules_dir
403
        self.templates_dir = templates_dir
404
        self.remediations_dir = remediations_dir
405
        self.checks_dir = checks_dir
406
        self.output_dirs = dict()
407
        for lang in languages:
408
            if lang == "oval":
409
                # OVAL checks need to be put to a different directory because
410
                # they are processed differently than remediations later in the
411
                # build process
412
                output_dir = self.checks_dir
413
            else:
414
                output_dir = self.remediations_dir
415
            dir_ = os.path.join(output_dir, lang)
416
            self.output_dirs[lang] = dir_
417
418
    def preprocess_data(self, template, lang, raw_parameters):
419
        """
420
        Processes template data using a callback before the data will be
421
        substituted into the Jinja template.
422
        """
423
        template_func = templates[template]
424
        parameters = template_func(raw_parameters.copy(), lang)
425
        # TODO: Remove this right after the variables in templates are renamed
426
        # to lowercase
427
        uppercases = dict()
428
        for k, v in parameters.items():
429
            uppercases[k.upper()] = v
430
        return uppercases
431
432
    def build_lang(
433
            self, rule_id, template_name, template_vars, lang, local_env_yaml):
434
        """
435
        Builds templated content for a given rule for a given language.
436
        Writes the output to the correct build directories.
437
        """
438
        template_func = templates[template_name]
439
        if lang not in template_func.langs:
440
            return
441
        template_file_name = "template_{0}_{1}".format(
442
            lang.upper(), template_name)
443
        template_file_path = os.path.join(
444
            self.templates_dir, template_file_name)
445
        if not os.path.exists(template_file_path):
446
            raise RuntimeError(
447
                "Rule {0} wants to generate {1} content from template {2}, "
448
                "but file {3} which provides this template does not "
449
                "exist.".format(
450
                    rule_id, lang, template_name, template_file_path)
451
            )
452
        ext = lang_to_ext_map[lang]
453
        output_file_name = rule_id + ext
454
        output_filepath = os.path.join(
455
            self.output_dirs[lang], output_file_name)
456
        template_parameters = self.preprocess_data(
457
            template_name, lang, template_vars)
458
        jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
459
        filled_template = ssg.jinja.process_file_with_macros(
460
            template_file_path, jinja_dict)
461
        with open(output_filepath, "w") as f:
462
            f.write(filled_template)
463
464
    def get_langs_to_generate(self, rule):
465
        """
466
        For a given rule returns list of languages that should be generated
467
        from templates. This is controlled by "template_backends" in rule.yml.
468
        """
469
        if "backends" in rule.template:
470
            backends = rule.template["backends"]
471
            for lang in backends:
472
                if lang not in languages:
473
                    raise RuntimeError(
474
                        "Rule {0} wants to generate unknown language '{1}"
475
                        "from a template.".format(rule.id_, lang)
476
                    )
477
            langs_to_generate = []
478
            for lang in languages:
479
                backend = backends.get(lang, "on")
480
                if backend == "on":
481
                    langs_to_generate.append(lang)
482
            return langs_to_generate
483
        else:
484
            return languages
485
486
    def build_rule(self, rule_id, rule_title, template, langs_to_generate):
487
        """
488
        Builds templated content for a given rule for selected languages,
489
        writing the output to the correct build directories.
490
        """
491
        try:
492
            template_name = template["name"]
493
        except KeyError:
494
            raise ValueError(
495
                "Rule {0} is missing template name under template key".format(
496
                    rule_id))
497
        if template_name not in templates:
498
            raise ValueError(
499
                "Rule {0} uses template {1} which does not exist.".format(
500
                    rule_id, template_name))
501
        try:
502
            template_vars = template["vars"]
503
        except KeyError:
504
            raise ValueError(
505
                "Rule {0} does not contain mandatory 'vars:' key under "
506
                "'template:' key.".format(rule_id))
507
        # Add the rule ID which will be reused in OVAL templates as OVAL
508
        # definition ID so that the build system matches the generated
509
        # check with the rule.
510
        template_vars["_rule_id"] = rule_id
511
        # checks and remediations are processed with a custom YAML dict
512
        local_env_yaml = self.env_yaml.copy()
513
        local_env_yaml["rule_id"] = rule_id
514
        local_env_yaml["rule_title"] = rule_title
515
        local_env_yaml["products"] = self.env_yaml["product"]
516
        for lang in langs_to_generate:
517
            self.build_lang(
518
                rule_id, template_name, template_vars, lang, local_env_yaml)
519
520
    def build_extra_ovals(self):
521
        declaration_path = os.path.join(self.templates_dir, "extra_ovals.yml")
522
        declaration = ssg.yaml.open_raw(declaration_path)
523
        for oval_def_id, template in declaration.items():
524
            langs_to_generate = ["oval"]
525
            # Since OVAL definition ID in shorthand format is always the same
526
            # as rule ID, we can use it instead of the rule ID even if no rule
527
            # with that ID exists
528
            self.build_rule(
529
                oval_def_id, oval_def_id, template, langs_to_generate)
530
531
    def build_all_rules(self):
532
        for rule_file in os.listdir(self.resolved_rules_dir):
533
            rule_path = os.path.join(self.resolved_rules_dir, rule_file)
534
            try:
535
                rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml)
536
            except ssg.build_yaml.DocumentationNotComplete:
537
                # Happens on non-debug build when a rule is "documentation-incomplete"
538
                continue
539
            if rule.template is None:
540
                # rule is not templated, skipping
541
                continue
542
            langs_to_generate = self.get_langs_to_generate(rule)
543
            self.build_rule(
544
                rule.id_, rule.title, rule.template, langs_to_generate)
545
546
    def build(self):
547
        """
548
        Builds all templated content for all languages, writing
549
        the output to the correct build directories.
550
        """
551
552
        for dir_ in self.output_dirs.values():
553
            if not os.path.exists(dir_):
554
                os.makedirs(dir_)
555
556
        self.build_extra_ovals()
557
        self.build_all_rules()
558