Passed
Pull Request — master (#5109)
by Matěj
02:52
created

ssg.templates.mount()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 4
ccs 0
cts 3
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import os
4
import sys
5
import re
6
7
import ssg.build_yaml
8
9
languages = ["anaconda", "ansible", "bash", "oval", "puppet"]
10
11
lang_to_ext_map = {
12
    "anaconda": ".anaconda",
13
    "ansible": ".yml",
14
    "bash": ".sh",
15
    "oval": ".xml",
16
    "puppet": ".pp"
17
}
18
19
def sanitize_input(string):
20
    return re.sub(r'[\W_]', '_', string)
21
22
templates = dict()
23
24
25
def template(langs):
26
    def decorator_template(func):
27
        func.langs = langs
28
        templates[func.__name__] = func
29
        return func
30
    return decorator_template
31
32
33
# Callback functions for processing template parameters and/or validating them
34
35
36
@template(["ansible", "bash", "oval"])
37
def accounts_password(data, lang):
38
    if lang == "oval":
39
        data["sign"] = "-?" if data["variable"].endswith("credit") else ""
40
    return data
41
42
43
@template(["ansible", "bash", "oval"])
44
def auditd_lineinfile(data, lang):
45
    missing_parameter_pass = data["missing_parameter_pass"]
46
    if missing_parameter_pass == "true":
47
        missing_parameter_pass = True
48
    elif missing_parameter_pass == "false":
49
        missing_parameter_pass = False
50
    data["missing_parameter_pass"] = missing_parameter_pass
51
    return data
52
53
54
@template(["ansible", "bash", "oval"])
55
def audit_rules_dac_modification(data, lang):
56
    return data
57
58
59
@template(["ansible", "bash", "oval"])
60
def audit_rules_file_deletion_events(data, lang):
61
    return data
62
63
64
@template(["ansible", "bash", "oval"])
65
def audit_rules_login_events(data, lang):
66
    path = data["path"]
67
    name = re.sub(r'[-\./]', '_', os.path.basename(os.path.normpath(path)))
68
    data["name"] = name
69
    if lang == "oval":
70
        data["path"] = path.replace("/", "\\/")
71
    return data
72
73
74
@template(["ansible", "bash", "oval"])
75
def audit_rules_path_syscall(data, lang):
76
    if lang == "oval":
77
        pathid = re.sub(r'[-\./]', '_', data["path"])
78
        # remove root slash made into '_'
79
        pathid = pathid[1:]
80
        data["pathid"] = pathid
81
    return data
82
83
84
@template(["ansible", "bash", "oval"])
85
def audit_rules_privileged_commands(data, lang):
86
    path = data["path"]
87
    name = re.sub(r"[-\./]", "_", os.path.basename(path))
88
    data["name"] = name
89
    if lang == "oval":
90
        data["id"] = data["_rule_id"]
91
        data["title"] = "Record Any Attempts to Run " + name
92
        data["path"] = path.replace("/", "\\/")
93
    return data
94
95
96
@template(["ansible", "bash", "oval"])
97
def audit_rules_unsuccessful_file_modification(data, lang):
98
    return data
99
100
101
@template(["oval"])
102
def audit_rules_unsuccessful_file_modification_o_creat(data, lang):
103
    return data
104
105
106
@template(["oval"])
107
def audit_rules_unsuccessful_file_modification_o_trunc_write(data, lang):
108
    return data
109
110
111
@template(["oval"])
112
def audit_rules_unsuccessful_file_modification_rule_order(data, lang):
113
    return data
114
115
116
@template(["ansible", "bash", "oval"])
117
def audit_rules_usergroup_modification(data, lang):
118
    path = data["path"]
119
    name = re.sub(r'[-\./]', '_', os.path.basename(path))
120
    data["name"] = name
121
    if lang == "oval":
122
        data["path"] = path.replace("/", "\\/")
123
    return data
124
125
126
def _file_owner_groupowner_permissions_regex(data):
127
    data["is_directory"] = data["filepath"].endswith("/")
128
    if "missing_file_pass" not in data:
129
        data["missing_file_pass"] = False
130
    if "file_regex" in data and not data["is_directory"]:
131
        raise ValueError(
132
            "Used 'file_regex' key in rule '{0}' but filepath '{1}' does not "
133
            "specify a directory. Append '/' to the filepath or remove the "
134
            "'file_regex' key.".format(data["_rule_id"], data["filepath"]))
135
136
137
@template(["ansible", "bash", "oval"])
138
def file_groupowner(data, lang):
139
    _file_owner_groupowner_permissions_regex(data)
140
    if lang == "oval":
141
        data["fileid"] = data["_rule_id"].replace("file_groupowner", "")
142
    return data
143
144
145
@template(["ansible", "bash", "oval"])
146
def file_owner(data, lang):
147
    _file_owner_groupowner_permissions_regex(data)
148
    if lang == "oval":
149
        data["fileid"] = data["_rule_id"].replace("file_owner", "")
150
    return data
151
152
153
@template(["ansible", "bash", "oval"])
154
def file_permissions(data, lang):
155
    _file_owner_groupowner_permissions_regex(data)
156
    if lang == "oval":
157
        data["fileid"] = data["_rule_id"].replace("file_permissions", "")
158
        # build the state that describes our mode
159
        # mode_str maps to STATEMODE in the template
160
        mode = data["filemode"]
161
        fields = [
162
            'oexec', 'owrite', 'oread', 'gexec', 'gwrite', 'gread',
163
            'uexec', 'uwrite', 'uread', 'sticky', 'sgid', 'suid']
164
        mode_int = int(mode, 8)
165
        mode_str = ""
166
        for field in fields:
167
            if mode_int & 0x01 == 1:
168
                mode_str = (
169
                    "	<unix:" + field + " datatype=\"boolean\">true</unix:"
170
                    + field + ">\n" + mode_str)
171
            else:
172
                mode_str = (
173
                    "	<unix:" + field + " datatype=\"boolean\">false</unix:"
174
                    + field + ">\n" + mode_str)
175
            mode_int = mode_int >> 1
176
        data["statemode"] = mode_str
177
    return data
178
179
180
@template(["ansible", "bash", "oval"])
181
def grub2_bootloader_argument(data, lang):
182
    data["arg_name_value"] = data["arg_name"] + "=" + data["arg_value"]
183
    return data
184
185
186
@template(["ansible", "bash", "oval"])
187
def kernel_module_disabled(data, lang):
188
    return data
189
190
191
@template(["anaconda", "oval"])
192
def mount(data, lang):
193
    data["pointid"] = re.sub(r'[-\./]', '_', data["mountpoint"])
194
    return data
195
196
197
def _mount_option(data, lang):
198
    if lang == "oval":
199
        data["pointid"] = re.sub(r"[-\./]", "_", data["mountpoint"]).lstrip("_")
200
    else:
201
        data["mountoption"] = re.sub(" ", ",", data["mountoption"])
202
    return data
203
204
205
@template(["anaconda", "ansible", "bash", "oval"])
206
def mount_option(data, lang):
207
    return _mount_option(data, lang)
208
209
210
@template(["ansible", "bash", "oval"])
211
def mount_option_remote_filesystems(data, lang):
212
    if lang == "oval":
213
        data["mountoptionid"] = sanitize_input(data["mountoption"])
214
    return _mount_option(data, lang)
215
216
217
@template(["anaconda", "ansible", "bash", "oval"])
218
def mount_option_removable_partitions(data, lang):
219
    return _mount_option(data, lang)
220
221
222
@template(["anaconda", "ansible", "bash", "oval", "puppet"])
223
def package_installed(data, lang):
224
    if "evr" in data:
225
        evr = data["evr"]
226
        if evr and not re.match(r'\d:\d[\d\w+.]*-\d[\d\w+.]*', evr, 0):
227
            raise RuntimeError(
228
                "ERROR: input violation: evr key should be in "
229
                "epoch:version-release format, but package {0} has set "
230
                "evr to {1}".format(data["pkgname"], evr))
231
    return data
232
233
234
@template(["ansible", "bash", "oval"])
235
def sysctl(data, lang):
236
    data["sysctlid"] = re.sub(r'[-\.]', '_', data["sysctlvar"])
237
    if not data.get("sysctlval"):
238
        data["sysctlval"] = ""
239
    ipv6_flag = "P"
240
    if data["sysctlid"].find("ipv6") >= 0:
241
        ipv6_flag = "I"
242
    data["flags"] = "SR" + ipv6_flag
243
    return data
244
245
246
@template(["anaconda", "ansible", "bash", "oval", "puppet"])
247
def package_removed(data, lang):
248
    return data
249
250
251
@template(["ansible", "bash", "oval"])
252
def sebool(data, lang):
253
    sebool_bool = data.get("sebool_bool", None)
254
    if sebool_bool is not None and sebool_bool not in ["true", "false"]:
255
        raise ValueError(
256
            "ERROR: key sebool_bool in rule {0} contains forbidden "
257
            "value '{1}'.".format(data["_rule_id"], sebool_bool)
258
        )
259
    return data
260
261
262
@template(["ansible", "bash", "oval", "puppet"])
263
def service_disabled(data, lang):
264
    if "packagename" not in data:
265
        data["packagename"] = data["servicename"]
266
    if "daemonname" not in data:
267
        data["daemonname"] = data["servicename"]
268
    if "mask_service" not in data:
269
        data["mask_service"] = "true"
270
    return data
271
272
273
@template(["ansible", "bash", "oval", "puppet"])
274
def service_enabled(data, lang):
275
    if "packagename" not in data:
276
        data["packagename"] = data["servicename"]
277
    if "daemonname" not in data:
278
        data["daemonname"] = data["servicename"]
279
    return data
280
281
282
@template(["ansible", "bash", "oval"])
283
def sshd_lineinfile(data, lang):
284
    missing_parameter_pass = data["missing_parameter_pass"]
285
    if missing_parameter_pass == "true":
286
        missing_parameter_pass = True
287
    elif missing_parameter_pass == "false":
288
        missing_parameter_pass = False
289
    data["missing_parameter_pass"] = missing_parameter_pass
290
    return data
291
292
293
@template(["ansible", "bash", "oval"])
294
def shell_lineinfile(data, lang):
295
    value = data["value"]
296
    if value[0] in ("'", '"') and value[0] == value[-1]:
297
        msg = (
298
            "Value >>{value}<< of shell variable '{varname}' "
299
            "has been supplied with quotes, please fix the content - "
300
            "shell quoting is handled by the check/remediation code."
301
            .format(value=value, varname=data["parameter"]))
302
        raise Exception(msg)
303
    missing_parameter_pass = data.get("missing_parameter_pass", "false")
304
    if missing_parameter_pass == "true":
305
        missing_parameter_pass = True
306
    elif missing_parameter_pass == "false":
307
        missing_parameter_pass = False
308
    data["missing_parameter_pass"] = missing_parameter_pass
309
    no_quotes = False
310
    if data["no_quotes"] == "true":
311
        no_quotes = True
312
    data["no_quotes"] = no_quotes
313
    return data
314
315
316
@template(["ansible", "bash", "oval"])
317
def timer_enabled(data, lang):
318
    if "packagename" not in data:
319
        data["packagename"] = data["timername"]
320
    return data
321
322
323
class Builder(object):
324
    """
325
    Class for building all templated content for a given product.
326
327
    To generate content from templates, pass the env_yaml, path to the
328
    directory with resolved rule YAMLs, path to the directory that contains
329
    templates, path to the output directory for checks and a path to the
330
    output directory for remediations into the constructor. Then, call the
331
    method build() to perform a build.
332
    """
333
    def __init__(
334
            self, env_yaml, resolved_rules_dir, templates_dir,
335
            remediations_dir, checks_dir):
336
        self.env_yaml = env_yaml
337
        self.resolved_rules_dir = resolved_rules_dir
338
        self.templates_dir = templates_dir
339
        self.remediations_dir = remediations_dir
340
        self.checks_dir = checks_dir
341
        self.output_dirs = dict()
342
        for lang in languages:
343
            if lang == "oval":
344
                # OVAL checks need to be put to a different directory because
345
                # they are processed differently than remediations later in the
346
                # build process
347
                output_dir = self.checks_dir
348
            else:
349
                output_dir = self.remediations_dir
350
            dir_ = os.path.join(output_dir, lang)
351
            self.output_dirs[lang] = dir_
352
353
    def preprocess_data(self, template, lang, raw_parameters):
354
        """
355
        Processes template data using a callback before the data will be
356
        substituted into the Jinja template.
357
        """
358
        template_func = templates[template]
359
        parameters = template_func(raw_parameters.copy(), lang)
360
        # TODO: Remove this right after the variables in templates are renamed
361
        # to lowercase
362
        uppercases = dict()
363
        for k, v in parameters.items():
364
            uppercases[k.upper()] = v
365
        return uppercases
366
367
    def build_lang(
368
            self, rule_id, template_name, template_vars, lang, local_env_yaml):
369
        """
370
        Builds templated content for a given rule for a given language.
371
        Writes the output to the correct build directories.
372
        """
373
        template_func = templates[template_name]
374
        if lang not in template_func.langs:
375
            return
376
        template_file_name = "template_{0}_{1}".format(
377
            lang.upper(), template_name)
378
        template_file_path = os.path.join(
379
            self.templates_dir, template_file_name)
380
        if not os.path.exists(template_file_path):
381
            raise RuntimeError(
382
                "Rule {0} wants to generate {1} content from template {2}, "
383
                "but file {3} which provides this template does not "
384
                "exist.".format(
385
                    rule_id, lang, template_name, template_file_path)
386
            )
387
        ext = lang_to_ext_map[lang]
388
        output_file_name = rule_id + ext
389
        output_filepath = os.path.join(
390
            self.output_dirs[lang], output_file_name)
391
        template_parameters = self.preprocess_data(
392
            template_name, lang, template_vars)
393
        jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
394
        filled_template = ssg.jinja.process_file_with_macros(
395
            template_file_path, jinja_dict)
396
        with open(output_filepath, "w") as f:
397
            f.write(filled_template)
398
399
    def get_langs_to_generate(self, rule):
400
        """
401
        For a given rule returns list of languages that should be generated
402
        from templates. This is controlled by "template_backends" in rule.yml.
403
        """
404
        if "backends" in rule.template:
405
            backends = rule.template["backends"]
406
            for lang in backends:
407
                if lang not in languages:
408
                    raise RuntimeError(
409
                        "Rule {0} wants to generate unknown language '{1}"
410
                        "from a template.".format(rule.id_, lang)
411
                    )
412
            langs_to_generate = []
413
            for lang in languages:
414
                backend = backends.get(lang, "on")
415
                if backend == "on":
416
                    langs_to_generate.append(lang)
417
            return langs_to_generate
418
        else:
419
            return languages
420
421
    def build_rule(self, rule_id, rule_title, template, langs_to_generate):
422
        """
423
        Builds templated content for a given rule for selected languages,
424
        writing the output to the correct build directories.
425
        """
426
        try:
427
            template_name = template["name"]
428
        except KeyError:
429
            raise ValueError(
430
                "Rule {0} is missing template name under template key".format(
431
                    rule_id))
432
        if template_name not in templates:
433
            raise ValueError(
434
                "Rule {0} uses template {1} which does not exist.".format(
435
                    rule_id, template_name))
436
        try:
437
            template_vars = template["vars"]
438
        except KeyError:
439
            raise ValueError(
440
                "Rule {0} does not contain mandatory 'vars:' key under "
441
                "'template:' key.".format(rule_id))
442
        # Add the rule ID which will be reused in OVAL templates as OVAL
443
        # definition ID so that the build system matches the generated
444
        # check with the rule.
445
        template_vars["_rule_id"] = rule_id
446
        # checks and remediations are processed with a custom YAML dict
447
        local_env_yaml = self.env_yaml.copy()
448
        local_env_yaml["rule_id"] = rule_id
449
        local_env_yaml["rule_title"] = rule_title
450
        local_env_yaml["products"] = self.env_yaml["product"]
451
        for lang in langs_to_generate:
452
            self.build_lang(
453
                rule_id, template_name, template_vars, lang, local_env_yaml)
454
455
    def build_extra_ovals(self):
456
        declaration_path = os.path.join(self.templates_dir, "extra_ovals.yml")
457
        declaration = ssg.yaml.open_raw(declaration_path)
458
        for oval_def_id, template in declaration.items():
459
            langs_to_generate = ["oval"]
460
            # Since OVAL definition ID in shorthand format is always the same
461
            # as rule ID, we can use it instead of the rule ID even if no rule
462
            # with that ID exists
463
            self.build_rule(
464
                oval_def_id, oval_def_id, template, langs_to_generate)
465
466
    def build_all_rules(self):
467
        for rule_file in os.listdir(self.resolved_rules_dir):
468
            rule_path = os.path.join(self.resolved_rules_dir, rule_file)
469
            try:
470
                rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml)
471
            except ssg.build_yaml.DocumentationNotComplete:
472
                # Happens on non-debug build when a rule is "documentation-incomplete"
473
                continue
474
            if rule.template is None:
475
                # rule is not templated, skipping
476
                continue
477
            langs_to_generate = self.get_langs_to_generate(rule)
478
            self.build_rule(
479
                rule.id_, rule.title, rule.template, langs_to_generate)
480
481
    def build(self):
482
        """
483
        Builds all templated content for all languages, writing
484
        the output to the correct build directories.
485
        """
486
487
        for dir_ in self.output_dirs.values():
488
            if not os.path.exists(dir_):
489
                os.makedirs(dir_)
490
491
        self.build_extra_ovals()
492
        self.build_all_rules()
493