Passed
Push — master ( 42467a...9d011f )
by Matěj
03:19 queued 11s
created

utils.migrate_template_csv_to_rule   F

Complexity

Total Complexity 119

Size/Duplication

Total Lines 904
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 621
dl 0
loc 904
ccs 0
cts 585
cp 0
rs 1.979
c 0
b 0
f 0
wmc 119

30 Functions

Rating   Name   Duplication   Size   Complexity  
A timers_enabled_csv_to_dict() 0 17 2
A audit_rules_execution_csv_to_dict() 0 16 1
A audit_rules_login_events_csv_to_dict() 0 14 1
A ocp_service_runtime_config_csv_to_dict() 0 23 2
A grub2_bootloader_argument_csv_to_dict() 0 16 1
C permissions_csv_to_dict() 0 86 10
A sysctl_values_csv_to_dict() 0 22 2
A accounts_password_csv_to_dict() 0 15 1
A audit_rules_privileged_commands_csv_to_dict() 0 16 1
A lineinfile_csv_to_dict() 0 21 2
A arufm_csv_to_dict() 0 13 1
A sshd_lineinfile_csv_to_dict() 0 2 1
B selinux_booleans_csv_to_dict() 0 24 6
A escape_path() 0 2 1
A packages_installed_csv_to_dict() 0 19 2
A mounts_csv_to_dict() 0 14 1
A audit_rules_path_syscall_csv_to_dict() 0 19 1
A services_enabled_csv_to_dict() 0 22 3
B walk_benchmarks() 0 32 7
A auditd_lineinfile_csv_to_dict() 0 2 1
A audit_rules_file_deletion_events_csv_to_dict() 0 13 1
A services_disabled_csv_to_dict() 0 18 1
A packages_removed_csv_to_dict() 0 17 1
A parse_args() 0 8 1
A arum_csv_to_dict() 0 14 1
A mount_options_csv_to_dict() 0 43 4
A arufm_detailed_csv_to_dict() 0 24 2
A kernel_modules_disabled_csv_to_dict() 0 13 1
D main() 0 41 12
A audit_rules_dac_modification_csv_to_dict() 0 13 1

6 Methods

Rating   Name   Duplication   Size   Complexity  
C ProductCSVData._load_csv() 0 37 10
A ProductCSVData.__init__() 0 8 1
A ProductCSVData._identify_csv_files() 0 11 3
A ProductCSVData._load_csv_files() 0 5 2
C ProductCSVData.merge_product_csv_data() 0 54 9
F ProductCSVData.resolve_csv_data() 0 75 22

How to fix   Complexity   

Complexity

Complex classes like utils.migrate_template_csv_to_rule often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import argparse
2
import csv
3
from collections import defaultdict, OrderedDict
4
import os
5
import pprint
6
import re
7
8
from ssg.constants import product_directories
9
import ssg.utils
10
import ssg.rule_yaml
11
import ssg.yaml
12
13
14
def escape_path(path):
15
    return re.sub(r'[-\./]', '_', path)
16
17
18
def accounts_password_csv_to_dict(csv_line, csv_data):
19
    accounts_password = OrderedDict()
20
    data_accounts_password = {}
21
    accounts_password["name"] = "accounts_password"
22
23
    variable = csv_line[0]
24
    rule_id = f"accounts_password_pam_{variable}"
25
26
    operation = csv_line[1]
27
28
    data_accounts_password["variable"] = variable
29
    data_accounts_password["operation"] = operation
30
    accounts_password["vars"] = data_accounts_password
31
    csv_data[rule_id] = accounts_password
32
    return accounts_password
33
34
35
def audit_rules_execution_csv_to_dict(csv_line, csv_data):
36
    audit_rules_execution = OrderedDict()
37
    data_audit_rules_execution = {}
38
    audit_rules_execution["name"] = "audit_rules_privileged_commands"
39
40
    path = csv_line[0]
41
    name = escape_path(os.path.basename(path))
42
    rule_id = f"audit_rules_execution_{name}"
43
44
    # create_audit_rules_execution.py escapes the '/' when generating the OVAL
45
    # This is not actually necessary
46
    data_audit_rules_execution["path"] = path
47
    audit_rules_execution["vars"] = data_audit_rules_execution
48
49
    csv_data[rule_id] = audit_rules_execution
50
    return audit_rules_execution
51
52
53
def audit_rules_privileged_commands_csv_to_dict(csv_line, csv_data):
54
    audit_rules_privileged_commands = OrderedDict()
55
    data_audit_rules_privileged_commands = {}
56
    audit_rules_privileged_commands["name"] = "audit_rules_privileged_commands"
57
58
    path = csv_line[0]
59
    name = escape_path(os.path.basename(path))
60
    rule_id = f"audit_rules_privileged_commands_{name}"
61
62
    # create_audit_rules_privileged_commands.py escapes the '/' when generating the OVAL
63
    # This is not actually necessary
64
    data_audit_rules_privileged_commands["path"] = path
65
    audit_rules_privileged_commands["vars"] = data_audit_rules_privileged_commands
66
67
    csv_data[rule_id] = audit_rules_privileged_commands
68
    return audit_rules_privileged_commands
69
70
71
def audit_rules_dac_modification_csv_to_dict(csv_line, csv_data):
72
    audit_rules_dac_modification = OrderedDict()
73
    data_audit_rules_dac_modification = {}
74
    audit_rules_dac_modification["name"] = "audit_rules_dac_modification"
75
76
    attr = csv_line[0]
77
    rule_id = f"audit_rules_dac_modification_{attr}"
78
79
    data_audit_rules_dac_modification["attr"] = attr
80
    audit_rules_dac_modification["vars"] = data_audit_rules_dac_modification
81
82
    csv_data[rule_id] = audit_rules_dac_modification
83
    return audit_rules_dac_modification
84
85
86
def audit_rules_file_deletion_events_csv_to_dict(csv_line, csv_data):
87
    audit_rules_file_deletion_events = OrderedDict()
88
    data_audit_rules_file_deletion_events = {}
89
    audit_rules_file_deletion_events["name"] = "audit_rules_file_deletion_events"
90
91
    name = csv_line[0]
92
    rule_id = f"audit_rules_file_deletion_events_{name}"
93
94
    data_audit_rules_file_deletion_events["name"] = name
95
    audit_rules_file_deletion_events["vars"] = data_audit_rules_file_deletion_events
96
97
    csv_data[rule_id] = audit_rules_file_deletion_events
98
    return audit_rules_file_deletion_events
99
100
101
def audit_rules_login_events_csv_to_dict(csv_line, csv_data):
102
    audit_rules_login_events = OrderedDict()
103
    data_audit_rules_login_events = {}
104
    audit_rules_login_events["name"] = "audit_rules_login_events"
105
106
    path = csv_line[0]
107
    name = escape_path(os.path.basename(path))
108
    rule_id = f"audit_rules_login_events_{name}"
109
110
    data_audit_rules_login_events["path"] = path
111
    audit_rules_login_events["vars"] = data_audit_rules_login_events
112
113
    csv_data[rule_id] = audit_rules_login_events
114
    return audit_rules_login_events
115
116
117
def audit_rules_path_syscall_csv_to_dict(csv_line, csv_data):
118
    audit_rules_path_syscall = OrderedDict()
119
    data_audit_rules_path_syscall = {}
120
    audit_rules_path_syscall["name"] = "audit_rules_path_syscall"
121
122
    path = csv_line[0]
123
    syscall = csv_line[1]
124
    arg_pos = csv_line[2]
125
    # remove root slash made into '_'
126
    path_id = escape_path(path)[1:]
127
    rule_id = f"audit_rules_{path_id}_{syscall}"
128
129
    data_audit_rules_path_syscall["path"] = path
130
    data_audit_rules_path_syscall["syscall"] = syscall
131
    data_audit_rules_path_syscall["pos"] = arg_pos
132
    audit_rules_path_syscall["vars"] = data_audit_rules_path_syscall
133
134
    csv_data[rule_id] = audit_rules_path_syscall
135
    return audit_rules_path_syscall
136
137
138
def arufm_csv_to_dict(csv_line, csv_data):
139
    arufm = OrderedDict()
140
    data_arufm = {}
141
    arufm["name"] = "audit_rules_unsuccessful_file_modification"
142
143
    name = csv_line[0]
144
    rule_id = f"audit_rules_unsuccessful_file_modification_{name}"
145
146
    data_arufm["name"] = name
147
    arufm["vars"] = data_arufm
148
149
    csv_data[rule_id] = arufm
150
    return arufm
151
152
153
def arufm_detailed_csv_to_dict(csv_line, csv_data):
154
    arufm_detailed = OrderedDict()
155
    data_arufm_detailed = {}
156
157
    syscall = csv_line[0]
158
    arg_pos = csv_line[1]
159
160
    template_base = "audit_rules_unsuccessful_file_modification_"
161
    template_suffixes = ["o_creat",
162
                         "o_trunc_write",
163
                         "rule_order",
164
                         ]
165
166
    data_arufm_detailed["syscall"] = syscall
167
    data_arufm_detailed["pos"] = arg_pos
168
    arufm_detailed["vars"] = data_arufm_detailed
169
170
    for suffix in template_suffixes:
171
        arufm_detailed["name"] = f"{template_base}{suffix}"
172
        rule_id = f"{template_base}{syscall}_{suffix}"
173
        # If a csv line has except-for, it won't be handled correctly
174
        csv_data[rule_id] = arufm_detailed.copy()
175
176
    return arufm_detailed
177
178
179
def arum_csv_to_dict(csv_line, csv_data):
180
    user_group_modification = OrderedDict()
181
    data_user_group_modification = {}
182
    user_group_modification["name"] = "audit_rules_usergroup_modification"
183
184
    path = csv_line[0]
185
    name = escape_path(os.path.basename(path))
186
    rule_id = f"audit_rules_usergroup_modification_{name}"
187
188
    data_user_group_modification["path"] = path
189
    user_group_modification["vars"] = data_user_group_modification
190
191
    csv_data[rule_id] = user_group_modification
192
    return user_group_modification
193
194
195
def grub2_bootloader_argument_csv_to_dict(csv_line, csv_data):
196
    grub2_bootloader_argument = OrderedDict()
197
    data_grub2_bootloader_argument = {}
198
    grub2_bootloader_argument["name"] = "grub2_bootloader_argument"
199
200
    arg_name = csv_line[0]
201
    arg_value = csv_line[1]
202
    rule_id = f"grub2_{arg_name}_argument"
203
204
    arg_name_value = f"{arg_name}={arg_value}"
205
    data_grub2_bootloader_argument["arg_name"] = arg_name
206
    data_grub2_bootloader_argument["arg_value"] = arg_value
207
    grub2_bootloader_argument["vars"] = data_grub2_bootloader_argument
208
209
    csv_data[rule_id] = grub2_bootloader_argument
210
    return grub2_bootloader_argument
211
212
213
def kernel_modules_disabled_csv_to_dict(csv_line, csv_data):
214
    kernel_modules_disabled = OrderedDict()
215
    data_kernel_modules_disabled = {}
216
    kernel_modules_disabled["name"] = "kernel_module_disabled"
217
218
    kernmod = csv_line[0]
219
    rule_id = f"kernel_module_{kernmod}_disabled"
220
221
    data_kernel_modules_disabled["kernmodule"] = kernmod
222
    kernel_modules_disabled["vars"] = data_kernel_modules_disabled
223
224
    csv_data[rule_id] = kernel_modules_disabled
225
    return kernel_modules_disabled
226
227
228
def lineinfile_csv_to_dict(csv_line, csv_data, _type):
229
    lineinfile = OrderedDict()
230
    data_lineinfile = {}
231
    lineinfile["name"] = f"{_type}_lineinfile"
232
233
    rule_id = csv_line[0]
234
    parameter = csv_line[1]
235
    value = csv_line[2]
236
    if len(csv_line) == 4:
237
        missing_parameter_pass = csv_line[3]
238
    else:
239
        missing_paramteter_pass = "false"
240
241
    data_lineinfile["rule_id"] = rule_id
242
    data_lineinfile["parameter"] = parameter
243
    data_lineinfile["value"] = value
244
    data_lineinfile["missing_parameter_pass"] = missing_parameter_pass
0 ignored issues
show
introduced by
The variable missing_parameter_pass does not seem to be defined for all execution paths.
Loading history...
245
    lineinfile["vars"] = data_lineinfile
246
247
    csv_data[rule_id] = lineinfile
248
    return lineinfile
249
250
251
def auditd_lineinfile_csv_to_dict(csv_line, csv_data):
252
    return lineinfile_csv_to_dict(csv_line, csv_data, "auditd")
253
254
255
def sshd_lineinfile_csv_to_dict(csv_line, csv_data):
256
    return lineinfile_csv_to_dict(csv_line, csv_data, "sshd")
257
258
259
def mount_options_csv_to_dict(csv_line, csv_data):
260
    mount_options = OrderedDict()
261
    data_mount_options = {}
262
263
    mount_point = csv_line[0]
264
    mount_option = csv_line[1].strip()
265
266
    template_base = "mount_option"
267
    mount_has_to_exist = "yes"
268
    filesystem = ""
269
    mount_point_type = ""
270
    if len(csv_line) > 2:
271
        # When create_fstab_entry_if_needed is in CSV file, load next two values
272
        mount_has_to_exist = "no"
273
        filesystem = csv_line[3]
274
        mount_point_type = csv_line[4]
275
276
    point_id = f"{mount_point}"
277
    if mount_point.startswith("var_"):
278
        # var_removable_partition -> removable_partitions
279
        point_id = re.sub(r"^var_(.*)", r"\1s", mount_point)
280
        rule_id = f"mount_option_{mount_option}_{point_id}"
281
        mount_options["name"] = f"{template_base}_{point_id}"
282
    elif mount_point.startswith("/"):
283
        point_id = escape_path(mount_point)[1:]
284
        rule_id = f"mount_option_{point_id}_{mount_option}"
285
        mount_options["name"] = template_base
286
    else:
287
        point_id = mount_point
288
        rule_id = f"mount_option_{mount_option}_{point_id}"
289
        mount_options["name"] = f"{template_base}_{point_id}"
290
291
    # Not all fields will be used by all templates, this is fine,
292
    # they will just be ignored
293
    data_mount_options["mountpoint"] = mount_point
294
    data_mount_options["mountoption"] = mount_option
295
    data_mount_options["mount_has_to_exist"] = mount_has_to_exist
296
    data_mount_options["filesystem"] = filesystem
297
    data_mount_options["type"] = mount_point_type
298
    mount_options["vars"] = data_mount_options
299
300
    csv_data[rule_id] = mount_options
301
    return mount_options
302
303
304
def mounts_csv_to_dict(csv_line, csv_data):
305
    mounts = OrderedDict()
306
    data_mounts = {}
307
    mounts["name"] = "mount"
308
309
    mountpoint = csv_line[0]
310
    point_id = escape_path(mountpoint)
311
    rule_id = f"partition_for{point_id}"
312
313
    data_mounts["mountpoint"] = mountpoint
314
    mounts["vars"] = data_mounts
315
316
    csv_data[rule_id] = mounts
317
    return mounts
318
319
320
# It seems there are no rules for these templated content
321
def ocp_service_runtime_config_csv_to_dict(csv_line, csv_data):
322
    ocp_service = OrderedDict()
323
    data_ocp_service = {}
324
    ocp_service["name"] = "ocp_service_config"
325
326
    process_cmd = csv_line[0]
327
    process_cmd_option = csv_line[1]
328
    process_cmd_val = csv_line[2]
329
330
    ocp_proc_id = re.sub(r'[-._]', '_', process_cmd_option.strip("--="))
331
    if len(csv_line) == 4:
332
        ocp_proc_id = csv_line[3]
333
334
    rule_id = f"ocp_service_runtime_config_{ocp_proc_id}"
335
336
    data_ocp_service["ocpcmdoptionid"] = process_cmd
337
    data_ocp_service["ocpprocess"] = process_cmd
338
    data_ocp_service["ocpcmdoption"] = process_cmd_option
339
    data_ocp_service["ocpcmdval"] = process_cmd_val
340
    ocp_service["vars"] = data_ocp_service
341
342
    csv_data[rule_id] = ocp_service
343
    return ocp_service
344
345
346
def packages_installed_csv_to_dict(csv_line, csv_data):
347
    package_installed = OrderedDict()
348
    data_package_installed = {}
349
    package_installed["name"] = "package_installed"
350
351
    pkgname = csv_line[0]
352
    rule_id = f"package_{pkgname}_installed"
353
354
    if len(csv_line) == 2:
355
        evr = csv_line[1]
356
    else:
357
        evr = ""
358
359
    data_package_installed["pkgname"] = pkgname
360
    data_package_installed["evr"] = evr
361
    package_installed["vars"] = data_package_installed
362
363
    csv_data[rule_id] = package_installed
364
    return package_installed
365
366
367
def packages_removed_csv_to_dict(csv_line, csv_data):
368
    package_removed = OrderedDict()
369
    data_package_removed = {}
370
    package_removed["name"] = "package_removed"
371
372
    pkgname = csv_line[0]
373
    rule_id = f"package_{pkgname}_removed"
374
375
    # Some CSVs have two fields for packages_removed, but
376
    # create_package_removed.py doesn't use the second field.
377
    # So just ignore it as well
378
379
    data_package_removed["pkgname"] = pkgname
380
    package_removed["vars"] = data_package_removed
381
382
    csv_data[rule_id] = package_removed
383
    return package_removed
384
385
386
def permissions_csv_to_dict(csv_line, csv_data):
387
    permissions = OrderedDict()
388
    data_permissions = {}
389
    owner = OrderedDict()
390
    data_owner = {}
391
    groupowner = OrderedDict()
392
    data_groupowner = {}
393
    file_permissions = OrderedDict()
394
    data_file_permissions = {}
395
396
    dir_path = csv_line[0]
397
    file_name = csv_line[1]
398
    uid = csv_line[2]
399
    gid = csv_line[3]
400
    mode = csv_line[4]
401
402
    template_list = []
403
404
    # The following few lines were extracted from create_permissions.py
405
    if len(csv_line) == 6:
406
        path_id = f"_{csv_line[5]}"
407
    elif file_name == '[NULL]':
408
        path_id = re.sub(r'[-\./]', '_', dir_path)
409
    elif re.match(r'\^.*\$', file_name, 0):
410
        path_id = re.sub(r'[-\./]', '_', dir_path) + '_' + re.sub(r'[-\\\./^$*(){}|]',
411
                                                                  '_', file_name)
412
        # cleaning trailing end multiple underscores, make sure id is lowercase
413
        path_id = re.sub(r'_+', '_', path_id)
414
        path_id = re.sub(r'_$', '', path_id)
415
        path_id = path_id.lower()
416
    else:
417
        path_id = re.sub(r'[-\./]', '_', dir_path) + '_' + re.sub(r'[-\./]',
418
                                                                  '_', file_name)
419
        path_id = path_id.lower()
420
421
    # build a string that contains the full path to the file
422
    # full_path maps to FILEPATH in the template
423
    if file_name == '[NULL]' or file_name == '':
424
        full_path = dir_path + '/'
425
    else:
426
        full_path = dir_path + '/' + file_name
427
428
    if not re.match(r'\^.*\$', file_name, 0):
429
        if mode:
430
            rule_id = f"file_permissions{path_id}"
431
            file_permissions["name"] = f"file_permissions"
432
            data_file_permissions["filepath"] = full_path
433
            data_file_permissions["filemode"] = mode
434
            file_permissions["vars"] = data_file_permissions
435
            csv_data[rule_id] = file_permissions
436
        if uid:
437
            rule_id = f"file_owner{path_id}"
438
            owner["name"] = f"file_owner"
439
            data_owner["filepath"] = full_path
440
            data_owner["fileuid"] = uid
441
            owner["vars"] = data_owner
442
            csv_data[rule_id] = owner
443
        if gid:
444
            rule_id = f"file_groupowner{path_id}"
445
            groupowner["name"] = f"file_groupowner"
446
            data_groupowner["filepath"] = full_path
447
            data_groupowner["filegid"] = gid
448
            groupowner["vars"] = data_groupowner
449
            csv_data[rule_id] = groupowner
450
451
        rule_id = f"permissions{path_id}"
452
        permissions["name"] = f"permissions"
453
        data_permissions["filepath"] = full_path
454
        data_permissions["filemode"] = mode
455
        data_permissions["fileuid"] = uid
456
        data_permissions["filegid"] = gid
457
        permissions["vars"] = data_permissions
458
        csv_data[rule_id] = permissions
459
    else:
460
        rule_id = f"file_permissions{path_id}"
461
        file_permissions["name"] = f"file_regex_permissions"
462
        data_file_permissions["path"] = dir_path
463
        data_file_permissions["filename"] = file_name
464
        data_file_permissions["filemode"] = mode
465
        file_permissions["vars"] = data_file_permissions
466
        csv_data[rule_id] = file_permissions
467
468
    # Fields FILEID, STATEMODE, UNIX_DIR, UNIX_FILENAME will be translated into rule.yml
469
    # They will be generated from data above during templated content generation
470
471
    return permissions
472
473
474
def selinux_booleans_csv_to_dict(csv_line, csv_data):
475
    selinux_boolean = OrderedDict()
476
    data_selinux_boolean = {}
477
478
    sebool_name = csv_line[0]
479
    sebool_id = escape_path(sebool_name)
480
    rule_id = f"sebool_{sebool_id}"
481
482
    sebool_state = csv_line[1]
483
    if sebool_state == "on" or sebool_state == "enable":
484
        sebool_bool = "true"
485
    elif sebool_state == "off" or sebool_state == "disable":
486
        sebool_bool = "false"
487
488
    data_selinux_boolean["seboolid"] = sebool_id
489
    if sebool_state == "use_var":
490
        selinux_boolean["name"] = "sebool_var"
491
    else:
492
        selinux_boolean["name"] = "sebool"
493
        data_selinux_boolean["sebool_bool"] = sebool_bool
0 ignored issues
show
introduced by
The variable sebool_bool does not seem to be defined for all execution paths.
Loading history...
494
    selinux_boolean["vars"] = data_selinux_boolean
495
496
    csv_data[rule_id] = selinux_boolean
497
    return selinux_boolean
498
499
500
def services_disabled_csv_to_dict(csv_line, csv_data):
501
    service_disabled = OrderedDict()
502
    data_service_disabled = {}
503
    service_disabled["name"] = "service_disabled"
504
505
    service_name = csv_line[0]
506
    package_name = csv_line[1]
507
    daemon_name = csv_line[2]
508
509
    rule_id = f"service_{service_name}_disabled"
510
511
    data_service_disabled["servicename"] = service_name
512
    data_service_disabled["packagename"] = package_name
513
    data_service_disabled["daemonname"] = daemon_name
514
    service_disabled["vars"] = data_service_disabled
515
516
    csv_data[rule_id] = service_disabled
517
    return service_disabled
518
519
520
def services_enabled_csv_to_dict(csv_line, csv_data):
521
    service_enabled = OrderedDict()
522
    data_service_enabled = {}
523
    service_enabled["name"] = "service_enabled"
524
525
    service_name = csv_line[0]
526
    package_name = csv_line[1]
527
    if not package_name:
528
        package_name = service_name
529
    daemon_name = csv_line[2]
530
    if not daemon_name:
531
        daemon_name = service_name
532
533
    rule_id = f"service_{service_name}_enabled"
534
535
    data_service_enabled["servicename"] = service_name
536
    data_service_enabled["packagename"] = package_name
537
    data_service_enabled["daemonname"] = daemon_name
538
    service_enabled["vars"] = data_service_enabled
539
540
    csv_data[rule_id] = service_enabled
541
    return service_enabled
542
543
544
def sysctl_values_csv_to_dict(csv_line, csv_data):
545
    sysctl_value = OrderedDict()
546
    data_sysctl_value = {}
547
548
    sysctl_var = csv_line[0]
549
    sysctl_val = csv_line[1]
550
    # Default data type for sysctl is int
551
    data_type = "int"
552
    if len(csv_line) == 3:
553
        data_type = csv_line[2]
554
    sysctl_var_id = escape_path(sysctl_var)
555
    rule_id = f"sysctl_{sysctl_var_id}"
556
557
    sysctl_value["name"] = "sysctl"
558
559
    data_sysctl_value["sysctlvar"] = sysctl_var
560
    data_sysctl_value["sysctlval"] = sysctl_val
561
    data_sysctl_value["datatype"] = data_type
562
    sysctl_value["vars"] = data_sysctl_value
563
564
    csv_data[rule_id] = sysctl_value
565
    return sysctl_value
566
567
568
def timers_enabled_csv_to_dict(csv_line, csv_data):
569
    timer_enabled = OrderedDict()
570
    data_timer_enabled = {}
571
    timer_enabled["name"] = "timer_enabled"
572
573
    timer_name = csv_line[0]
574
    package_name = csv_line[1]
575
    if not package_name:
576
        package_name = timer_name
577
    rule_id = f"timer_{timer_name}_enabled"
578
579
    data_timer_enabled["timername"] = timer_name
580
    data_timer_enabled["packagename"] = package_name
581
    timer_enabled["vars"] = data_timer_enabled
582
583
    csv_data[rule_id] = timer_enabled
584
    return timer_enabled
585
586
587
class ProductCSVData(object):
588
    TEMPLATE_TO_CSV_FORMAT_MAP = {
589
            "accounts_password.csv": accounts_password_csv_to_dict,
590
            "audit_rules_execution.csv": audit_rules_execution_csv_to_dict,
591
            "audit_rules_privileged_commands.csv": audit_rules_privileged_commands_csv_to_dict,
592
            "audit_rules_dac_modification.csv": audit_rules_dac_modification_csv_to_dict,
593
            "audit_rules_file_deletion_events.csv": audit_rules_file_deletion_events_csv_to_dict,
594
            "audit_rules_login_events.csv": audit_rules_login_events_csv_to_dict,
595
            "audit_rules_path_syscall.csv": audit_rules_path_syscall_csv_to_dict,
596
            # arufm means audit_rules_unsuccessful_file_modification
597
            "audit_rules_unsuccessful_file_modification.csv": arufm_csv_to_dict,
598
            "audit_rules_unsuccessful_file_modification_detailed.csv": arufm_detailed_csv_to_dict,
599
            # arum means audit_rules_usergroup_modification
600
            "audit_rules_usergroup_modification.csv": arum_csv_to_dict,
601
            "grub2_bootloader_argument.csv": grub2_bootloader_argument_csv_to_dict,
602
            "kernel_modules_disabled.csv": kernel_modules_disabled_csv_to_dict,
603
            "auditd_lineinfile.csv": auditd_lineinfile_csv_to_dict,
604
            "sshd_lineinfile.csv": sshd_lineinfile_csv_to_dict,
605
            "mount_options.csv": mount_options_csv_to_dict,
606
            "mounts.csv": mounts_csv_to_dict,
607
            "ocp_service_runtime_config.csv": ocp_service_runtime_config_csv_to_dict,
608
            "packages_installed.csv": packages_installed_csv_to_dict,
609
            "packages_removed.csv": packages_removed_csv_to_dict,
610
            "file_dir_permissions.csv": permissions_csv_to_dict,
611
            "selinux_booleans.csv": selinux_booleans_csv_to_dict,
612
            "services_disabled.csv": services_disabled_csv_to_dict,
613
            "services_enabled.csv": services_enabled_csv_to_dict,
614
            "sysctl_values.csv": sysctl_values_csv_to_dict,
615
            "timers_enabled.csv": timers_enabled_csv_to_dict,
616
            }
617
618
    def __init__(self, product, ssg_root):
619
        self.product = product
620
        self.ssg_root = ssg_root  # Needed?
621
622
        self.csv_dir = os.path.join(ssg_root, product, "templates/csv")
623
        self.csv_files = self._identify_csv_files(self.csv_dir)
624
625
        self.csv_data = self._load_csv_files(self.csv_files)
626
627
    def _identify_csv_files(self, csv_dir):
628
        try:
629
            # get all CSV files
630
            product_csvs = [csv_filename for csv_filename in os.listdir(csv_dir)
631
                            if csv_filename.endswith(".csv")]
632
        except FileNotFoundError as not_found:
633
            product_csvs = []
634
            # double check that exception is on templates/csv directory
635
            if not_found.filename != csv_dir:
636
                raise not_found
637
        return product_csvs
638
639
    def _load_csv_files(self, csv_files):
640
        csv_data = {}
641
        for csv_filename in csv_files:
642
            self._load_csv(csv_filename, csv_data)
643
        return csv_data
644
645
    def _load_csv(self, csv_filename, csv_data):
646
        # Only load CSV for which we know the format
647
        csv_parser = self.TEMPLATE_TO_CSV_FORMAT_MAP.get(csv_filename, None)
648
        if not csv_parser:
649
            return
650
651
        with open(os.path.join(self.csv_dir, csv_filename), "r") as csv_f:
652
            for line in csv.reader(csv_f):
653
                # Skip empty lines
654
                if len(line) == 0:
655
                    continue
656
657
                # Skip all comment lines
658
                if len(line) >= 1 and line[0].startswith('#'):
659
                    continue
660
661
                except_for_language = None
662
                if "#except-for:" in line[-1]:
663
                    line[-1], except_for_clause = line[-1].split('#')
664
                    line[-1] = line[-1].strip()
665
                    # There are no cases of except-for for multiple languagues
666
                    _, except_for_language = except_for_clause.split(':')
667
668
                try:
669
                    # Each CSV file is particular to its template, as a single CSV line can:
670
                    # - contain data for multiple rules in diferent templates
671
                    #   (audit_rules_unsuccessful_file_modification_detailed);
672
                    # A single CSV file can:
673
                    # - contain data for varying templates (mount_options).
674
                    # We let the CSV specific parser add the data
675
                    line_data_dict = csv_parser(line, csv_data)
676
677
                    if except_for_language:
678
                        line_data_dict["backends"] = {except_for_language: "off"}
679
                except IndexError as e:
680
                    print(f"line:{line} in file: {csv_f}")
681
                    raise e
682
683
    def merge_product_csv_data(self, product):
684
        """
685
        Each rule present in 'product' is incorporated into own csv_data.
686
687
        The added rule's CSV values, are changed from string to a dictionary of lists.
688
        The dictionary counts the occurences of a value, while keeping track of the
689
        products that use the value.
690
        """
691
692
        # Readability variables
693
        product_b = product.product
694
        data_a = self.csv_data
695
        data_b = product.csv_data
696
697
        for rule_id in data_b:
698
            rule_b = data_b[rule_id]
699
            rule_b_vars = rule_b["vars"]
700
701
            if rule_id in data_a:
702
                rule_a = data_a[rule_id]
703
                rule_a_vars = rule_a["vars"]
704
                for var in rule_b_vars:
705
                    new_value = rule_b_vars[var]
706
                    if type(rule_a_vars[var]) == defaultdict:
707
                        value_counter = rule_a_vars[var]
708
                        value_counter[new_value].append(product_b)
709
                    else:
710
                        # We substitute the string value for a dict where
711
                        # each 'key' is the template value, and
712
                        # each 'value' is a list of products that have it
713
                        value_counter = defaultdict(list)
714
                        value_counter[new_value].append(product_b)
715
                        rule_a_vars[var] = value_counter
716
                if "backends" in rule_b:
717
                    if "backends" in rule_a:
718
                        # As backends are turned on/off content wide, idependently of product
719
                        # Just merge them together
720
                        rule_a["backends"] = ssg.utils.merge_dicts(
721
                                rule_a["backends"],
722
                                rule_b["backends"])
723
                    else:
724
                        rule_a["backends"] = rule_b["backends"]
725
            else:
726
                # Rule is new in the product
727
                # Add the rule with its values already in dictionary
728
                data_a[rule_id] = OrderedDict({"name": rule_b["name"]})
729
                data_a[rule_id]["vars"] = OrderedDict()
730
                for var in rule_b_vars:
731
                    value_counter = defaultdict(list)
732
                    new_value = rule_b_vars[var]
733
                    value_counter[new_value].append(product_b)
734
                    data_a[rule_id]["vars"][var] = value_counter
735
                if "backends" in rule_b:
736
                    data_a[rule_id]["backends"] = rule_b["backends"]
737
738
    def resolve_csv_data(self):
739
        """
740
        Go over its own rules, resolving the rules CSV data.
741
742
        For each rule that has a dictionary instead of a string as the value of a
743
        template parameter, it counts the most popular value, and makes it the shared one.
744
        The other values are made product specific with 'param@product' notation.
745
        """
746
        for rule_id in self.csv_data:
747
            rule = self.csv_data[rule_id]
748
            rule_vars = rule["vars"]
749
            # We need a list to be able to iterate over the keys and change the dictionary
750
            for var in list(rule_vars):
751
                value_counter = rule_vars[var]
752
                if type(value_counter) == defaultdict:
753
                    if len(value_counter) == 1:
754
                        # there was only one value
755
                        rule_vars[var] = list(value_counter.keys())[0]
756
                        if rule_vars[var] == '':
757
                            rule_vars.pop(var)
758
                    else:
759
760
                        # Determine which value has most products backing it
761
                        most_popular = 0
762
                        most_popular_value = ""
763
                        for value in value_counter.keys():
764
                            count = len(value_counter[value])
765
                            if count > most_popular:
766
                                most_popular = count
767
                                most_popular_value = value
768
769
                        for value in list(value_counter.keys()):
770
                            if value == most_popular_value:
771
                                # The value with more products will be the shared one
772
                                if value == '':
773
                                    rule_vars.pop(var)
774
                                else:
775
                                    rule_vars[var] = most_popular_value
776
                            else:
777
                                # other values are added with @product
778
                                for product in value_counter[value]:
779
                                    product_var = f"{var}@{product}"
780
                                    rule_vars[product_var] = value
781
                else:
782
                    if rule_vars[var] == '':
783
                        rule_vars.pop(var)
784
785
        # after merging all product specific vars, consolidate a few data
786
        for rule_id in self.csv_data:
787
            if re.match(r"^service_.*abled$", rule_id):
788
                rule_vars = self.csv_data[rule_id]["vars"]
789
                # - services_enabled templates can have redundant data
790
                # - services_disabled templates can have redundant data
791
                rule = self.csv_data[rule_id]
792
                service_name = rule_vars.get("servicename")
793
                package_name = rule_vars.get("packagename", None)
794
                daemon_name = rule_vars.get("daemonname", None)
795
                if package_name == service_name:
796
                    rule_vars.pop("packagename")
797
                if daemon_name == service_name:
798
                    rule_vars.pop("daemonname")
799
800
                # if after cleanup, any product specific key is about a data
801
                # that was removed, also remove it
802
                for var in list(rule_vars.keys()):
803
                    if '@' in var:
804
                        v, product = var.split('@')
805
                        # When shared var doesn't exist, there is no need for
806
                        # empty product specific var, nor
807
                        # product specific var equal to another var
808
                        if v not in rule_vars:
809
                            if rule_vars[var] == "":
810
                                rule_vars.pop(var)
811
                            elif rule_vars[var] == rule_vars.get("servicename"):
812
                                rule_vars.pop(var)
813
814
815
def walk_benchmarks(benchmark_dir, product, override_template=False):
816
    csv_data = product.csv_data
817
818
    for root, dirs, files in os.walk(benchmark_dir):
819
        rule_id = os.path.basename(root)
820
        if rule_id in ["oval", "bash", "ansible", "tests"]:
821
            continue
822
        if rule_id in csv_data:
823
            rule_path = os.path.join(root, "rule.yml")
824
            rule_contents = ssg.utils.read_file_list(rule_path)
825
826
            # Check if rule already has template key (or section)
827
            template_key = ssg.rule_yaml.get_section_lines(rule_path, rule_contents, "template")
828
            if template_key is not None:
829
                if override_template:
830
                    # Erase current template key (or section)
831
                    rule_contents = ssg.rule_yaml.remove_lines(rule_contents, template_key)
832
                else:
833
                    continue
834
835
            # make sure there is blank line at the end, so that template_data is appended nicely
836
            if rule_contents[-1] != "":
837
                rule_contents.extend([""])
838
839
            # Add template key
840
            template_dict = {}
841
            template_dict["template"] = csv_data[rule_id]
842
843
            template_contents = ssg.utils.split_string_content(
844
                    ssg.yaml.ordered_dump(template_dict, indent=4, default_flow_style=False)
845
                    )
846
            ssg.utils.write_list_file(rule_path, rule_contents + template_contents)
847
848
849
def parse_args():
850
    p = argparse.ArgumentParser()
851
    p.add_argument("ssg_root", help="Path to root of ssg git directory")
852
    p.add_argument("--dump", help="Directory to dump collected CSV data")
853
    p.add_argument("--override", action="store_true",
854
                   help="If set, template data in the rules will be overriden")
855
856
    return p.parse_args()
857
858
859
def main():
860
    args = parse_args()
861
862
    if args.dump:
863
        try:
864
            os.mkdir(args.dump)
865
        except FileExistsError:
866
            pass
867
868
    show_data = {}
869
    templated_content = {}
870
871
    # Load all product's CSV data
872
    for product_name in product_directories:
873
        product = ProductCSVData(product_name, args.ssg_root)
874
        if args.dump:
875
            with open(os.path.join(args.dump, f"{product_name}.dump"), "w") as dump_f:
876
                pprint.pprint(product.csv_data, dump_f)
877
        templated_content[product_name] = product
878
879
    # Load shared CSV Data as if it were a Product
880
    product_name = "shared"
881
    shared_product = ProductCSVData(product_name, args.ssg_root)
882
    if args.dump:
883
        with open(os.path.join(args.dump, f"shared.dump"), "w") as dump_f:
884
            pprint.pprint(shared_product.csv_data, dump_f)
885
886
    # Resolve loaded CSV Data
887
    # Use shared "product" as the base reference
888
    for product in templated_content:
889
        shared_product.merge_product_csv_data(templated_content[product])
890
891
    shared_product.resolve_csv_data()
892
    if args.dump:
893
        with open(os.path.join(args.dump, f"shared_resolved.dump"), "w") as dump_f:
894
            pprint.pprint(shared_product.csv_data, dump_f)
895
896
    # Walk through benchmark and add data into rule.yml
897
    benchmarks_list = ["linux_os", "applications"]
898
    for benchmark_name in benchmarks_list:
899
        walk_benchmarks(os.path.join(args.ssg_root, benchmark_name), shared_product, args.override)
900
901
902
if __name__ == "__main__":
903
    main()
904