Test Failed
Push — master ( f9535f...609c26 )
by Matěj
01:06 queued 15s
created

ssg.templates.audit_rules_login_events()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 8
nop 2
dl 0
loc 8
ccs 0
cts 7
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import os
4
import sys
5
import re
6
7
import ssg.build_yaml
8
9
languages = ["anaconda", "ansible", "bash", "oval", "puppet"]
10
11
lang_to_ext_map = {
12
    "anaconda": ".anaconda",
13
    "ansible": ".yml",
14
    "bash": ".sh",
15
    "oval": ".xml",
16
    "puppet": ".pp"
17
}
18
19
def sanitize_input(string):
20
    return re.sub(r'[\W_]', '_', string)
21
22
templates = dict()
23
24
25
def template(langs):
26
    def decorator_template(func):
27
        func.langs = langs
28
        templates[func.__name__] = func
29
        return func
30
    return decorator_template
31
32
33
# Callback functions for processing template parameters and/or validating them
34
35
36
@template(["ansible", "bash", "oval"])
37
def accounts_password(data, lang):
38
    if lang == "oval":
39
        data["sign"] = "-?" if data["variable"].endswith("credit") else ""
40
    return data
41
42
43
@template(["ansible", "bash", "oval"])
44
def auditd_lineinfile(data, lang):
45
    missing_parameter_pass = data["missing_parameter_pass"]
46
    if missing_parameter_pass == "true":
47
        missing_parameter_pass = True
48
    elif missing_parameter_pass == "false":
49
        missing_parameter_pass = False
50
    data["missing_parameter_pass"] = missing_parameter_pass
51
    return data
52
53
54
@template(["ansible", "bash", "oval"])
55
def audit_rules_dac_modification(data, lang):
56
    return data
57
58
59
@template(["ansible", "bash", "oval"])
60
def audit_rules_file_deletion_events(data, lang):
61
    return data
62
63
64
@template(["ansible", "bash", "oval"])
65
def audit_rules_login_events(data, lang):
66
    path = data["path"]
67
    name = re.sub(r'[-\./]', '_', os.path.basename(os.path.normpath(path)))
68
    data["name"] = name
69
    if lang == "oval":
70
        data["path"] = path.replace("/", "\\/")
71
    return data
72
73
74
@template(["ansible", "bash", "oval"])
75
def audit_rules_path_syscall(data, lang):
76
    if lang == "oval":
77
        pathid = re.sub(r'[-\./]', '_', data["path"])
78
        # remove root slash made into '_'
79
        pathid = pathid[1:]
80
        data["pathid"] = pathid
81
    return data
82
83
84
@template(["ansible", "bash", "oval"])
85
def audit_rules_privileged_commands(data, lang):
86
    path = data["path"]
87
    name = re.sub(r"[-\./]", "_", os.path.basename(path))
88
    data["name"] = name
89
    if lang == "oval":
90
        data["id"] = data["_rule_id"]
91
        data["title"] = "Record Any Attempts to Run " + name
92
        data["path"] = path.replace("/", "\\/")
93
    return data
94
95
@template(["ansible", "bash", "oval"])
96
def audit_rules_rule_file(data, lang):
97
    return data
98
99
100
@template(["ansible", "bash", "oval"])
101
def audit_rules_unsuccessful_file_modification(data, lang):
102
    return data
103
104
105
@template(["oval"])
106
def audit_rules_unsuccessful_file_modification_o_creat(data, lang):
107
    return data
108
109
110
@template(["oval"])
111
def audit_rules_unsuccessful_file_modification_o_trunc_write(data, lang):
112
    return data
113
114
115
@template(["oval"])
116
def audit_rules_unsuccessful_file_modification_rule_order(data, lang):
117
    return data
118
119
120
@template(["ansible", "bash", "oval"])
121
def audit_rules_usergroup_modification(data, lang):
122
    path = data["path"]
123
    name = re.sub(r'[-\./]', '_', os.path.basename(path))
124
    data["name"] = name
125
    if lang == "oval":
126
        data["path"] = path.replace("/", "\\/")
127
    return data
128
129
130
def _file_owner_groupowner_permissions_regex(data):
131
    data["is_directory"] = data["filepath"].endswith("/")
132
    if "missing_file_pass" not in data:
133
        data["missing_file_pass"] = False
134
    if "file_regex" in data and not data["is_directory"]:
135
        raise ValueError(
136
            "Used 'file_regex' key in rule '{0}' but filepath '{1}' does not "
137
            "specify a directory. Append '/' to the filepath or remove the "
138
            "'file_regex' key.".format(data["_rule_id"], data["filepath"]))
139
140
141
@template(["ansible", "bash", "oval"])
142
def file_groupowner(data, lang):
143
    _file_owner_groupowner_permissions_regex(data)
144
    if lang == "oval":
145
        data["fileid"] = data["_rule_id"].replace("file_groupowner", "")
146
    return data
147
148
149
@template(["ansible", "bash", "oval"])
150
def file_owner(data, lang):
151
    _file_owner_groupowner_permissions_regex(data)
152
    if lang == "oval":
153
        data["fileid"] = data["_rule_id"].replace("file_owner", "")
154
    return data
155
156
157
@template(["ansible", "bash", "oval"])
158
def file_permissions(data, lang):
159
    _file_owner_groupowner_permissions_regex(data)
160
    if lang == "oval":
161
        data["fileid"] = data["_rule_id"].replace("file_permissions", "")
162
        # build the state that describes our mode
163
        # mode_str maps to STATEMODE in the template
164
        mode = data["filemode"]
165
        fields = [
166
            'oexec', 'owrite', 'oread', 'gexec', 'gwrite', 'gread',
167
            'uexec', 'uwrite', 'uread', 'sticky', 'sgid', 'suid']
168
        mode_int = int(mode, 8)
169
        mode_str = ""
170
        for field in fields:
171
            if mode_int & 0x01 == 1:
172
                mode_str = (
173
                    "	<unix:" + field + " datatype=\"boolean\">true</unix:"
174
                    + field + ">\n" + mode_str)
175
            else:
176
                mode_str = (
177
                    "	<unix:" + field + " datatype=\"boolean\">false</unix:"
178
                    + field + ">\n" + mode_str)
179
            mode_int = mode_int >> 1
180
        data["statemode"] = mode_str
181
    return data
182
183
184
@template(["ansible", "bash", "oval"])
185
def grub2_bootloader_argument(data, lang):
186
    data["arg_name_value"] = data["arg_name"] + "=" + data["arg_value"]
187
    return data
188
189
190
@template(["ansible", "bash", "oval"])
191
def kernel_module_disabled(data, lang):
192
    return data
193
194
195
@template(["anaconda", "oval"])
196
def mount(data, lang):
197
    data["pointid"] = re.sub(r'[-\./]', '_', data["mountpoint"])
198
    return data
199
200
201
def _mount_option(data, lang):
202
    if lang == "oval":
203
        data["pointid"] = re.sub(r"[-\./]", "_", data["mountpoint"]).lstrip("_")
204
    else:
205
        data["mountoption"] = re.sub(" ", ",", data["mountoption"])
206
    return data
207
208
209
@template(["anaconda", "ansible", "bash", "oval"])
210
def mount_option(data, lang):
211
    return _mount_option(data, lang)
212
213
214
@template(["ansible", "bash", "oval"])
215
def mount_option_remote_filesystems(data, lang):
216
    if lang == "oval":
217
        data["mountoptionid"] = sanitize_input(data["mountoption"])
218
    return _mount_option(data, lang)
219
220
221
@template(["anaconda", "ansible", "bash", "oval"])
222
def mount_option_removable_partitions(data, lang):
223
    return _mount_option(data, lang)
224
225
226
@template(["anaconda", "ansible", "bash", "oval", "puppet"])
227
def package_installed(data, lang):
228
    if "evr" in data:
229
        evr = data["evr"]
230
        if evr and not re.match(r'\d:\d[\d\w+.]*-\d[\d\w+.]*', evr, 0):
231
            raise RuntimeError(
232
                "ERROR: input violation: evr key should be in "
233
                "epoch:version-release format, but package {0} has set "
234
                "evr to {1}".format(data["pkgname"], evr))
235
    return data
236
237
238
@template(["ansible", "bash", "oval"])
239
def sysctl(data, lang):
240
    data["sysctlid"] = re.sub(r'[-\.]', '_', data["sysctlvar"])
241
    if not data.get("sysctlval"):
242
        data["sysctlval"] = ""
243
    ipv6_flag = "P"
244
    if data["sysctlid"].find("ipv6") >= 0:
245
        ipv6_flag = "I"
246
    data["flags"] = "SR" + ipv6_flag
247
    return data
248
249
250
@template(["anaconda", "ansible", "bash", "oval", "puppet"])
251
def package_removed(data, lang):
252
    return data
253
254
255
@template(["ansible", "bash", "oval"])
256
def sebool(data, lang):
257
    sebool_bool = data.get("sebool_bool", None)
258
    if sebool_bool is not None and sebool_bool not in ["true", "false"]:
259
        raise ValueError(
260
            "ERROR: key sebool_bool in rule {0} contains forbidden "
261
            "value '{1}'.".format(data["_rule_id"], sebool_bool)
262
        )
263
    return data
264
265
266
@template(["ansible", "bash", "oval", "puppet"])
267
def service_disabled(data, lang):
268
    if "packagename" not in data:
269
        data["packagename"] = data["servicename"]
270
    if "daemonname" not in data:
271
        data["daemonname"] = data["servicename"]
272
    if "mask_service" not in data:
273
        data["mask_service"] = "true"
274
    return data
275
276
277
@template(["ansible", "bash", "oval", "puppet"])
278
def service_enabled(data, lang):
279
    if "packagename" not in data:
280
        data["packagename"] = data["servicename"]
281
    if "daemonname" not in data:
282
        data["daemonname"] = data["servicename"]
283
    return data
284
285
286
@template(["ansible", "bash", "oval"])
287
def sshd_lineinfile(data, lang):
288
    missing_parameter_pass = data["missing_parameter_pass"]
289
    if missing_parameter_pass == "true":
290
        missing_parameter_pass = True
291
    elif missing_parameter_pass == "false":
292
        missing_parameter_pass = False
293
    data["missing_parameter_pass"] = missing_parameter_pass
294
    return data
295
296
297
@template(["ansible", "bash", "oval"])
298
def timer_enabled(data, lang):
299
    if "packagename" not in data:
300
        data["packagename"] = data["timername"]
301
    return data
302
303
304
class Builder(object):
305
    """
306
    Class for building all templated content for a given product.
307
308
    To generate content from templates, pass the env_yaml, path to the
309
    directory with resolved rule YAMLs, path to the directory that contains
310
    templates, path to the output directory for checks and a path to the
311
    output directory for remediations into the constructor. Then, call the
312
    method build() to perform a build.
313
    """
314
    def __init__(
315
            self, env_yaml, resolved_rules_dir, templates_dir,
316
            remediations_dir, checks_dir):
317
        self.env_yaml = env_yaml
318
        self.resolved_rules_dir = resolved_rules_dir
319
        self.templates_dir = templates_dir
320
        self.remediations_dir = remediations_dir
321
        self.checks_dir = checks_dir
322
        self.output_dirs = dict()
323
        for lang in languages:
324
            if lang == "oval":
325
                # OVAL checks need to be put to a different directory because
326
                # they are processed differently than remediations later in the
327
                # build process
328
                output_dir = self.checks_dir
329
            else:
330
                output_dir = self.remediations_dir
331
            dir_ = os.path.join(output_dir, lang)
332
            self.output_dirs[lang] = dir_
333
334
    def preprocess_data(self, template, lang, raw_parameters):
335
        """
336
        Processes template data using a callback before the data will be
337
        substituted into the Jinja template.
338
        """
339
        template_func = templates[template]
340
        parameters = template_func(raw_parameters.copy(), lang)
341
        # TODO: Remove this right after the variables in templates are renamed
342
        # to lowercase
343
        uppercases = dict()
344
        for k, v in parameters.items():
345
            uppercases[k.upper()] = v
346
        return uppercases
347
348
    def build_lang(
349
            self, rule_id, template_name, template_vars, lang, local_env_yaml):
350
        """
351
        Builds templated content for a given rule for a given language.
352
        Writes the output to the correct build directories.
353
        """
354
        template_func = templates[template_name]
355
        if lang not in template_func.langs:
356
            return
357
        template_file_name = "template_{0}_{1}".format(
358
            lang.upper(), template_name)
359
        template_file_path = os.path.join(
360
            self.templates_dir, template_file_name)
361
        if not os.path.exists(template_file_path):
362
            raise RuntimeError(
363
                "Rule {0} wants to generate {1} content from template {2}, "
364
                "but file {3} which provides this template does not "
365
                "exist.".format(
366
                    rule_id, lang, template_name, template_file_path)
367
            )
368
        ext = lang_to_ext_map[lang]
369
        output_file_name = rule_id + ext
370
        output_filepath = os.path.join(
371
            self.output_dirs[lang], output_file_name)
372
        template_parameters = self.preprocess_data(
373
            template_name, lang, template_vars)
374
        jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
375
        filled_template = ssg.jinja.process_file_with_macros(
376
            template_file_path, jinja_dict)
377
        with open(output_filepath, "w") as f:
378
            f.write(filled_template)
379
380
    def get_langs_to_generate(self, rule):
381
        """
382
        For a given rule returns list of languages that should be generated
383
        from templates. This is controlled by "template_backends" in rule.yml.
384
        """
385
        if "backends" in rule.template:
386
            backends = rule.template["backends"]
387
            for lang in backends:
388
                if lang not in languages:
389
                    raise RuntimeError(
390
                        "Rule {0} wants to generate unknown language '{1}"
391
                        "from a template.".format(rule.id_, lang)
392
                    )
393
            langs_to_generate = []
394
            for lang in languages:
395
                backend = backends.get(lang, "on")
396
                if backend == "on":
397
                    langs_to_generate.append(lang)
398
            return langs_to_generate
399
        else:
400
            return languages
401
402
    def build_rule(self, rule_id, rule_title, template, langs_to_generate):
403
        """
404
        Builds templated content for a given rule for selected languages,
405
        writing the output to the correct build directories.
406
        """
407
        try:
408
            template_name = template["name"]
409
        except KeyError:
410
            raise ValueError(
411
                "Rule {0} is missing template name under template key".format(
412
                    rule_id))
413
        if template_name not in templates:
414
            raise ValueError(
415
                "Rule {0} uses template {1} which does not exist.".format(
416
                    rule_id, template_name))
417
        try:
418
            template_vars = template["vars"]
419
        except KeyError:
420
            raise ValueError(
421
                "Rule {0} does not contain mandatory 'vars:' key under "
422
                "'template:' key.".format(rule_id))
423
        # Add the rule ID which will be reused in OVAL templates as OVAL
424
        # definition ID so that the build system matches the generated
425
        # check with the rule.
426
        template_vars["_rule_id"] = rule_id
427
        # checks and remediations are processed with a custom YAML dict
428
        local_env_yaml = self.env_yaml.copy()
429
        local_env_yaml["rule_id"] = rule_id
430
        local_env_yaml["rule_title"] = rule_title
431
        local_env_yaml["products"] = self.env_yaml["product"]
432
        for lang in langs_to_generate:
433
            self.build_lang(
434
                rule_id, template_name, template_vars, lang, local_env_yaml)
435
436
    def build_extra_ovals(self):
437
        declaration_path = os.path.join(self.templates_dir, "extra_ovals.yml")
438
        declaration = ssg.yaml.open_raw(declaration_path)
439
        for oval_def_id, template in declaration.items():
440
            langs_to_generate = ["oval"]
441
            # Since OVAL definition ID in shorthand format is always the same
442
            # as rule ID, we can use it instead of the rule ID even if no rule
443
            # with that ID exists
444
            self.build_rule(
445
                oval_def_id, oval_def_id, template, langs_to_generate)
446
447
    def build_all_rules(self):
448
        for rule_file in os.listdir(self.resolved_rules_dir):
449
            rule_path = os.path.join(self.resolved_rules_dir, rule_file)
450
            try:
451
                rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml)
452
            except ssg.build_yaml.DocumentationNotComplete:
453
                # Happens on non-debug build when a rule is "documentation-incomplete"
454
                continue
455
            if rule.template is None:
456
                # rule is not templated, skipping
457
                continue
458
            langs_to_generate = self.get_langs_to_generate(rule)
459
            self.build_rule(
460
                rule.id_, rule.title, rule.template, langs_to_generate)
461
462
    def build(self):
463
        """
464
        Builds all templated content for all languages, writing
465
        the output to the correct build directories.
466
        """
467
468
        for dir_ in self.output_dirs.values():
469
            if not os.path.exists(dir_):
470
                os.makedirs(dir_)
471
472
        self.build_extra_ovals()
473
        self.build_all_rules()
474