Test Failed
Push — master ( f76445...3e6819 )
by Matěj
01:11 queued 15s
created

audit_rules_unsuccessful_file_modification()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
ccs 0
cts 2
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import os
4
import sys
5
import re
6
7
import ssg.build_yaml
8
9
languages = ["anaconda", "ansible", "bash", "oval", "puppet"]
10
11
lang_to_ext_map = {
12
    "anaconda": ".anaconda",
13
    "ansible": ".yml",
14
    "bash": ".sh",
15
    "oval": ".xml",
16
    "puppet": ".pp"
17
}
18
19
# Callback functions for processing template parameters and/or validating them
20
21
22
def accounts_password(data, lang):
23
    if lang == "oval":
24
        data["sign"] = "-?" if data["variable"].endswith("credit") else ""
25
    return data
26
27
28
def auditd_lineinfile(data, lang):
29
    missing_parameter_pass = data["missing_parameter_pass"]
30
    if missing_parameter_pass == "true":
31
        missing_parameter_pass = True
32
    elif missing_parameter_pass == "false":
33
        missing_parameter_pass = False
34
    data["missing_parameter_pass"] = missing_parameter_pass
35
    return data
36
37
38
def audit_rules_dac_modification(data, lang):
39
    return data
40
41
42
def audit_rules_file_deletion_events(data, lang):
43
    return data
44
45
46
def audit_rules_login_events(data, lang):
47
    path = data["path"]
48
    name = re.sub(r'[-\./]', '_', os.path.basename(os.path.normpath(path)))
49
    data["name"] = name
50
    if lang == "oval":
51
        data["path"] = path.replace("/", "\\/")
52
    return data
53
54
55
def audit_rules_path_syscall(data, lang):
56
    if lang == "oval":
57
        pathid = re.sub(r'[-\./]', '_', data["path"])
58
        # remove root slash made into '_'
59
        pathid = pathid[1:]
60
        data["pathid"] = pathid
61
    return data
62
63
64
def audit_rules_privileged_commands(data, lang):
65
    path = data["path"]
66
    name = re.sub(r"[-\./]", "_", os.path.basename(path))
67
    data["name"] = name
68
    if lang == "oval":
69
        data["id"] = data["_rule_id"]
70
        data["title"] = "Record Any Attempts to Run " + name
71
        data["path"] = path.replace("/", "\\/")
72
    return data
73
74
75
def audit_rules_unsuccessful_file_modification(data, lang):
76
    return data
77
78
79
def audit_rules_unsuccessful_file_modification_o_creat(data, lang):
80
    return data
81
82
83
def audit_rules_unsuccessful_file_modification_o_trunc_write(data, lang):
84
    return data
85
86
87
def audit_rules_unsuccessful_file_modification_rule_order(data, lang):
88
    return data
89
90
91
def audit_rules_usergroup_modification(data, lang):
92
    path = data["path"]
93
    name = re.sub(r'[-\./]', '_', os.path.basename(path))
94
    data["name"] = name
95
    if lang == "oval":
96
        data["path"] = path.replace("/", "\\/")
97
    return data
98
99
100
def _file_owner_groupowner_permissions_regex(data):
101
    data["is_directory"] = data["filepath"].endswith("/")
102
    if "file_regex" in data and not data["is_directory"]:
103
        raise ValueError(
104
            "Used 'file_regex' key in rule '{0}' but filepath '{1}' does not "
105
            "specify a directory. Append '/' to the filepath or remove the "
106
            "'file_regex' key.".format(data["_rule_id"], data["filepath"]))
107
108
109
def file_groupowner(data, lang):
110
    _file_owner_groupowner_permissions_regex(data)
111
    if lang == "oval":
112
        data["fileid"] = data["_rule_id"].replace("file_groupowner", "")
113
    return data
114
115
116
def file_owner(data, lang):
117
    _file_owner_groupowner_permissions_regex(data)
118
    if lang == "oval":
119
        data["fileid"] = data["_rule_id"].replace("file_owner", "")
120
    return data
121
122
123
def file_permissions(data, lang):
124
    _file_owner_groupowner_permissions_regex(data)
125
    if lang == "oval":
126
        data["fileid"] = data["_rule_id"].replace("file_permissions", "")
127
        # build the state that describes our mode
128
        # mode_str maps to STATEMODE in the template
129
        mode = data["filemode"]
130
        fields = [
131
            'oexec', 'owrite', 'oread', 'gexec', 'gwrite', 'gread',
132
            'uexec', 'uwrite', 'uread', 'sticky', 'sgid', 'suid']
133
        mode_int = int(mode, 8)
134
        mode_str = ""
135
        for field in fields:
136
            if mode_int & 0x01 == 1:
137
                mode_str = (
138
                    "	<unix:" + field + " datatype=\"boolean\">true</unix:"
139
                    + field + ">\n" + mode_str)
140
            else:
141
                mode_str = (
142
                    "	<unix:" + field + " datatype=\"boolean\">false</unix:"
143
                    + field + ">\n" + mode_str)
144
            mode_int = mode_int >> 1
145
        data["statemode"] = mode_str
146
    return data
147
148
149
def grub2_bootloader_argument(data, lang):
150
    data["arg_name_value"] = data["arg_name"] + "=" + data["arg_value"]
151
    return data
152
153
154
def kernel_module_disabled(data, lang):
155
    return data
156
157
158
def mount(data, lang):
159
    data["pointid"] = re.sub(r'[-\./]', '_', data["mountpoint"])
160
    return data
161
162
163
def mount_option(data, lang):
164
    if lang == "oval":
165
        data["pointid"] = re.sub(r"[-\./]", "_", data["mountpoint"]).lstrip("_")
166
    else:
167
        data["mountoption"] = re.sub(" ", ",", data["mountoption"])
168
    return data
169
170
171
def package_installed(data, lang):
172
    if "evr" in data:
173
        evr = data["evr"]
174
        if evr and not re.match(r'\d:\d[\d\w+.]*-\d[\d\w+.]*', evr, 0):
175
            raise RuntimeError(
176
                "ERROR: input violation: evr key should be in "
177
                "epoch:version-release format, but package {0} has set "
178
                "evr to {1}".format(data["pkgname"], evr))
179
    return data
180
181
182
def sysctl(data, lang):
183
    data["sysctlid"] = re.sub(r'[-\.]', '_', data["sysctlvar"])
184
    if not data.get("sysctlval"):
185
        data["sysctlval"] = ""
186
    ipv6_flag = "P"
187
    if data["sysctlid"].find("ipv6") >= 0:
188
        ipv6_flag = "I"
189
    data["flags"] = "SR" + ipv6_flag
190
    return data
191
192
193
def package_removed(data, lang):
194
    return data
195
196
197
def sebool(data, lang):
198
    sebool_bool = data.get("sebool_bool", None)
199
    if sebool_bool is not None and sebool_bool not in ["true", "false"]:
200
        raise ValueError(
201
            "ERROR: key sebool_bool in rule {0} contains forbidden "
202
            "value '{1}'.".format(data["_rule_id"], sebool_bool)
203
        )
204
    return data
205
206
207
def service_disabled(data, lang):
208
    if "packagename" not in data:
209
        data["packagename"] = data["servicename"]
210
    if "daemonname" not in data:
211
        data["daemonname"] = data["servicename"]
212
    if "mask_service" not in data:
213
        data["mask_service"] = "true"
214
    return data
215
216
217
def service_enabled(data, lang):
218
    if "packagename" not in data:
219
        data["packagename"] = data["servicename"]
220
    if "daemonname" not in data:
221
        data["daemonname"] = data["servicename"]
222
    return data
223
224
225
def sshd_lineinfile(data, lang):
226
    missing_parameter_pass = data["missing_parameter_pass"]
227
    if missing_parameter_pass == "true":
228
        missing_parameter_pass = True
229
    elif missing_parameter_pass == "false":
230
        missing_parameter_pass = False
231
    data["missing_parameter_pass"] = missing_parameter_pass
232
    return data
233
234
235
def timer_enabled(data, lang):
236
    if "packagename" not in data:
237
        data["packagename"] = data["timername"]
238
    return data
239
240
241
templates = {
242
    "accounts_password": accounts_password,
243
    "auditd_lineinfile": auditd_lineinfile,
244
    "audit_rules_dac_modification": audit_rules_dac_modification,
245
    "audit_rules_file_deletion_events": audit_rules_file_deletion_events,
246
    "audit_rules_login_events": audit_rules_login_events,
247
    "audit_rules_path_syscall": audit_rules_path_syscall,
248
    "audit_rules_privileged_commands": audit_rules_privileged_commands,
249
    "audit_rules_unsuccessful_file_modification":
250
        audit_rules_unsuccessful_file_modification,
251
    "audit_rules_unsuccessful_file_modification_o_creat":
252
        audit_rules_unsuccessful_file_modification_o_creat,
253
    "audit_rules_unsuccessful_file_modification_o_trunc_write":
254
        audit_rules_unsuccessful_file_modification_o_trunc_write,
255
    "audit_rules_unsuccessful_file_modification_rule_order":
256
        audit_rules_unsuccessful_file_modification_rule_order,
257
    "audit_rules_usergroup_modification": audit_rules_usergroup_modification,
258
    "file_groupowner": file_groupowner,
259
    "file_owner": file_owner,
260
    "file_permissions": file_permissions,
261
    "grub2_bootloader_argument": grub2_bootloader_argument,
262
    "kernel_module_disabled": kernel_module_disabled,
263
    "mount": mount,
264
    "mount_option": mount_option,
265
    "mount_option_remote_filesystems": mount_option,
266
    "mount_option_removable_partitions": mount_option,
267
    "package_installed": package_installed,
268
    "package_removed": package_removed,
269
    "sebool": sebool,
270
    "service_disabled": service_disabled,
271
    "service_enabled": service_enabled,
272
    "sshd_lineinfile": sshd_lineinfile,
273
    "sysctl": sysctl,
274
    "timer_enabled": timer_enabled,
275
}
276
277
278
class Builder(object):
279
    """
280
    Class for building all templated content for a given product.
281
282
    To generate content from templates, pass the env_yaml, path to the
283
    directory with resolved rule YAMLs, path to the directory that contains
284
    templates, path to the output directory for checks and a path to the
285
    output directory for remediations into the constructor. Then, call the
286
    method build() to perform a build.
287
    """
288
    def __init__(
289
            self, env_yaml, resolved_rules_dir, templates_dir,
290
            remediations_dir, checks_dir):
291
        self.env_yaml = env_yaml
292
        self.resolved_rules_dir = resolved_rules_dir
293
        self.templates_dir = templates_dir
294
        self.remediations_dir = remediations_dir
295
        self.checks_dir = checks_dir
296
        self.output_dirs = dict()
297
        for lang in languages:
298
            if lang == "oval":
299
                # OVAL checks need to be put to a different directory because
300
                # they are processed differently than remediations later in the
301
                # build process
302
                output_dir = self.checks_dir
303
            else:
304
                output_dir = self.remediations_dir
305
            dir_ = os.path.join(output_dir, lang)
306
            self.output_dirs[lang] = dir_
307
308
    def preprocess_data(self, template, lang, raw_parameters):
309
        """
310
        Processes template data using a callback before the data will be
311
        substituted into the Jinja template.
312
        """
313
        template_func = templates[template]
314
        if template_func is not None:
315
            parameters = template_func(raw_parameters.copy(), lang)
316
        else:
317
            parameters = raw_parameters.copy()
318
        # TODO: Remove this right after the variables in templates are renamed
319
        # to lowercase
320
        parameters = {k.upper(): v for k, v in parameters.items()}
321
        return parameters
322
323
    def build_lang(self, rule, template_name, lang, local_env_yaml):
324
        """
325
        Builds templated content for a given rule for a given language.
326
        Writes the output to the correct build directories.
327
        """
328
        template_file_name = "template_{0}_{1}".format(
329
            lang.upper(), template_name)
330
        template_file_path = os.path.join(
331
            self.templates_dir, template_file_name)
332
        if not os.path.exists(template_file_path):
333
            return
334
        ext = lang_to_ext_map[lang]
335
        output_file_name = rule.id_ + ext
336
        output_filepath = os.path.join(
337
            self.output_dirs[lang], output_file_name)
338
339
        try:
340
            template_vars = rule.template["vars"]
341
        except KeyError:
342
            raise ValueError(
343
                "Rule {0} does not contain mandatory 'vars:' key under "
344
                "'template:' key.".format(rule.id_))
345
        # Add the rule ID which will be reused in OVAL templates as OVAL
346
        # definition ID so that the build system matches the generated
347
        # check with the rule.
348
        template_vars["_rule_id"] = rule.id_
349
        template_parameters = self.preprocess_data(
350
            template_name, lang, template_vars)
351
        jinja_dict = ssg.utils.merge_dicts(local_env_yaml, template_parameters)
352
        filled_template = ssg.jinja.process_file_with_macros(
353
            template_file_path, jinja_dict)
354
        with open(output_filepath, "w") as f:
355
            f.write(filled_template)
356
357
    def get_langs_to_generate(self, rule):
358
        """
359
        For a given rule returns list of languages that should be generated
360
        from templates. This is controlled by "template_backends" in rule.yml.
361
        """
362
        if "backends" in rule.template:
363
            backends = rule.template["backends"]
364
            for lang in backends:
365
                if lang not in languages:
366
                    raise RuntimeError(
367
                        "Rule {0} wants to generate unknown language '{1}"
368
                        "from a template.".format(rule.id_, lang)
369
                    )
370
            langs_to_generate = []
371
            for lang in languages:
372
                backend = backends.get(lang, "on")
373
                if backend == "on":
374
                    langs_to_generate.append(lang)
375
            return langs_to_generate
376
        else:
377
            return languages
378
379
    def build_rule(self, rule):
380
        """
381
        Builds templated content for a given rule for all languages, writing
382
        the output to the correct build directories.
383
        """
384
        if rule.template is None:
385
            # rule is not templated, skipping
386
            return
387
        try:
388
            template_name = rule.template["name"]
389
        except KeyError:
390
            raise ValueError(
391
                "Rule {0} is missing template name under template key".format(
392
                    rule.id_))
393
        if template_name not in templates:
394
            raise ValueError(
395
                "Rule {0} uses template {1} which does not exist.".format(
396
                    rule.id_, template_name))
397
        if templates[template_name] is None:
398
            sys.stderr.write(
399
                "The template {0} has not been completely implemented, no "
400
                "content will be generated for rule {1}.\n".format(
401
                    template_name, rule.id_))
402
            return
403
        langs_to_generate = self.get_langs_to_generate(rule)
404
        # checks and remediations are processed with a custom YAML dict
405
        local_env_yaml = self.env_yaml.copy()
406
        local_env_yaml["rule_id"] = rule.id_
407
        local_env_yaml["rule_title"] = rule.title
408
        local_env_yaml["products"] = self.env_yaml["product"]
409
        for lang in langs_to_generate:
410
            self.build_lang(rule, template_name, lang, local_env_yaml)
411
412
    def build(self):
413
        """
414
        Builds all templated content for all languages, writing
415
        the output to the correct build directories.
416
        """
417
418
        for dir_ in self.output_dirs.values():
419
            if not os.path.exists(dir_):
420
                os.makedirs(dir_)
421
422
        for rule_file in os.listdir(self.resolved_rules_dir):
423
            rule_path = os.path.join(self.resolved_rules_dir, rule_file)
424
            rule = ssg.build_yaml.Rule.from_yaml(rule_path, self.env_yaml)
425
            self.build_rule(rule)
426