get_messages_for_partition_rules()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 53
Code Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 35
dl 0
loc 53
rs 8.1066
c 0
b 0
f 0
cc 6
nop 8

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import pytest
2
from unittest import mock
3
from collections import defaultdict
4
5
from pyanaconda.core.constants import PAYLOAD_TYPE_DNF
6
from pyanaconda.core.constants import PASSWORD_POLICY_ROOT
7
from pyanaconda.modules.common.constants.objects import FIREWALL, DEVICE_TREE, BOOTLOADER
8
from pyanaconda.modules.common.constants.objects import USER_INTERFACE
9
from pyanaconda.modules.common.constants.services import NETWORK, STORAGE, USERS, BOSS, PAYLOADS
10
from pyanaconda.modules.common.structures.packages import PackagesSelectionData
11
from pyanaconda.modules.common.structures.policy import PasswordPolicy
12
13
from org_fedora_oscap.common import KDUMP
14
15
16
try:
17
    from org_fedora_oscap import rule_handling, common
18
except ImportError as exc:
19
    pytestmark = pytest.mark.skip(
20
        "Unable to import modules, possibly due to bad version of Anaconda: {error}"
21
        .format(error=str(exc)))
22
23
24
@pytest.fixture()
25
def part_rules():
26
    rules = rule_handling.PartRules()
27
    rules.ensure_mount_point("/tmp")
28
    return rules
29
30
31
# simple tests, shouldn't raise exceptions
32
def test_part_rules_getitem(part_rules):
33
    part_rules["/tmp"]
34
35
36
def test_part_rules_setitem(part_rules):
37
    rule = rule_handling.PartRule("/var/log")
38
    part_rules["/var/log"] = rule
39
40
41
def test_part_rules_len(part_rules):
42
    assert len(part_rules) == 1
43
44
45
def test_part_rules_contains(part_rules):
46
    assert "/tmp" in part_rules
47
48
49
def test_part_rules_delitem(part_rules):
50
    del(part_rules["/tmp"])
51
    assert "/tmp" not in part_rules
52
53
54
@pytest.fixture()
55
def rule_data():
56
    return rule_handling.RuleData()
57
58
59
def test_rule_data_artificial(rule_data):
60
    rule_data.new_rule("  part /tmp --mountoptions=nodev,noauto")
61
    rule_data.new_rule("part /var/log  ")
62
    rule_data.new_rule(" passwd   --minlen=14 ")
63
    rule_data.new_rule("package --add=iptables")
64
    rule_data.new_rule(" package --add=firewalld --remove=telnet")
65
    rule_data.new_rule("package --remove=rlogin --remove=sshd")
66
    rule_data.new_rule("bootloader --passwd")
67
68
    # both partitions should appear in rule_data._part_rules
69
    assert "/tmp" in rule_data._part_rules
70
    assert "/var/log" in rule_data._part_rules
71
72
    # mount options should be parsed
73
    assert "nodev" in rule_data._part_rules["/tmp"]._mount_options
74
    assert "noauto" in rule_data._part_rules["/tmp"]._mount_options
75
76
    # no mount options for /var/log
77
    assert not rule_data._part_rules["/var/log"]._mount_options
78
79
    # minimal password length should be parsed and stored correctly
80
    assert rule_data._passwd_rules._minlen == 14
81
82
    # packages should be parsed correctly
83
    assert "iptables" in rule_data._package_rules._add_pkgs
84
    assert "firewalld" in rule_data._package_rules._add_pkgs
85
    assert "telnet" in rule_data._package_rules._remove_pkgs
86
    assert "rlogin" in rule_data._package_rules._remove_pkgs
87
    assert "sshd" in rule_data._package_rules._remove_pkgs
88
89
    # bootloader should require password
90
    assert rule_data._bootloader_rules._require_password
91
92
93
def test_rule_data_quoted_opt_values(rule_data):
94
    rule_data.new_rule('part /tmp --mountoptions="nodev,noauto"')
95
96
    assert "nodev" in rule_data._part_rules["/tmp"]._mount_options
97
    assert "noauto" in rule_data._part_rules["/tmp"]._mount_options
98
    assert '"' not in rule_data._part_rules["/tmp"]._mount_options
99
100
101
def test_rule_data_real_output(rule_data):
102
    output = """
103
    part /tmp
104
105
    part /tmp --mountoptions=nodev
106
    """
107
    for line in output.splitlines():
108
        rule_data.new_rule(line)
109
110
    assert "/tmp" in rule_data._part_rules
111
    assert "nodev" in rule_data._part_rules["/tmp"]._mount_options
112
113
    # should be stripped and merged
114
    assert str(rule_data._part_rules) == "part /tmp --mountoptions=nodev"
115
116
117
@pytest.fixture()
118
def ksdata_mock():
119
    return mock.Mock()
120
121
122
@pytest.fixture()
123
def storage_mock():
124
    return None
125
126
127
# monkeypatch is a predefined fixture that is used to perform per-test case changes in the test env.
128
# Here, it mocks the Anaconda interface on the module level. For more info, see:
129
# https://docs.pytest.org/en/latest/monkeypatch.html#monkeypatching-mocking-modules-and-environments
130
@pytest.fixture()
131
def proxy_getter(monkeypatch):
132
    proxies = defaultdict(mock.Mock)
133
134
    def mock_get(service_name, object_path, *args, **kwargs):
135
        initialize = not proxies
136
        proxy = proxies[(service_name, object_path)]
137
138
        if initialize:
139
            set_dbus_defaults()
140
141
        return proxy
142
143
    monkeypatch.setattr("pyanaconda.core.dbus.DBus.get_proxy", mock_get)
144
    return mock_get
145
146
147
def set_dbus_defaults():
148
    boss = BOSS.get_proxy()
149
    boss.GetModules.return_value = [
150
        KDUMP.service_name
151
    ]
152
153
    kdump = KDUMP.get_proxy()
154
    kdump.KdumpEnabled = True
155
156
    user_interface = BOSS.get_proxy(USER_INTERFACE)
157
    user_interface.PasswordPolicies = {}
158
159
    network = NETWORK.get_proxy()
160
    network.Connected.return_value = True
161
162
    firewall = NETWORK.get_proxy(FIREWALL)
163
    firewall.EnabledServices = []
164
    firewall.DisabledServices = []
165
    firewall.EnabledPorts = []
166
    firewall.Trusts = []
167
168
    device_tree = STORAGE.get_proxy(DEVICE_TREE)
169
    device_tree.GetDeviceMountOptions.return_value = "defaults"
170
    device_tree.GetMountPoints.return_value = {}
171
172
    bootloader = STORAGE.get_proxy(BOOTLOADER)
173
    bootloader.IsPasswordSet = False
174
175
    users = USERS.get_proxy()
176
    users.IsRootPasswordSet = True
177
    users.IsRootPasswordCrypted = False
178
    users.RootPassword = "anaconda"
179
180
    payloads = PAYLOADS.get_proxy()
181
    payloads.ActivePayload = "/fake/payload/1"
182
183
    dnf_payload = PAYLOADS.get_proxy("/fake/payload/1")
184
    dnf_payload.Type = PAYLOAD_TYPE_DNF
185
186
    packages_data = PackagesSelectionData()
187
    dnf_payload.PackagesSelection = PackagesSelectionData.to_structure(packages_data)
188
189
190
def test_evaluation_existing_part_must_exist_rules(
191
        proxy_getter, rule_data, ksdata_mock, storage_mock):
192
    rules = [
193
        "part /tmp",
194
        "part /",
195
    ]
196
    for rule in rules:
197
        rule_data.new_rule(rule)
198
199
    device_tree_mock = STORAGE.get_proxy(DEVICE_TREE)
200
    device_tree_mock.GetDeviceMountOptions.return_value = "defaults"
201
    device_tree_mock.GetMountPoints.return_value = {
202
        "/tmp": "/dev/sda1",
203
        "/": "/dev/sda2",
204
    }
205
206
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
207
208
    # partitions exist --> no errors, warnings or additional info
209
    assert not messages
210
211
    # no additional mount options specified
212
    device_tree_mock.SetDeviceMountOptions.assert_has_calls([
213
        mock.call("/dev/sda1", "defaults"),
214
        mock.call("/dev/sda2", "defaults"),
215
    ])
216
217
218
def test_evaluation_nonexisting_part_must_exist(
219
        proxy_getter, rule_data, ksdata_mock, storage_mock):
220
    rules = [
221
        "part /tmp",
222
        "part /",
223
    ]
224
    for rule in rules:
225
        rule_data.new_rule(rule)
226
227
    device_tree_mock = STORAGE.get_proxy(DEVICE_TREE)
228
    device_tree_mock.GetDeviceMountOptions.return_value = "defaults"
229
    device_tree_mock.GetMountPoints.return_value = {
230
        "/tmp": "/dev/sda1",
231
    }
232
233
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
234
235
    # / mount point missing --> one error
236
    assert len(messages) == 1
237
    assert messages[0].type == common.MESSAGE_TYPE_FATAL
238
239
    # error has to mention the mount point
240
    assert "/" in messages[0].text
241
242
243
def get_messages_for_partition_rules(
244
        rule_data, ksdata_mock, storage_mock,
245
        rules,
246
        messages_evaluation_count=1,
247
        actual_mountpoints=("/tmp", "/"),
248
        mount_options=None,
249
        report_only=False,
250
        ):
251
252
    assert len(rules) == 2, \
253
        "We need rules for temp partition and root."
254
255
    if mount_options is None:
256
        mount_options = {
257
            "/": "defaults",
258
            "/tmp": "defaults",
259
        }
260
261
    for rule in rules:
262
        rule_data.new_rule(rule)
263
264
    # Map mount points to device names.
265
    mount_points = {}
266
267
    if "/tmp" in actual_mountpoints:
268
        mount_points["/tmp"] = "/dev/sda1"
269
270
    if "/" in actual_mountpoints:
271
        mount_points["/"] = "/dev/sda2"
272
273
    # Map device names to mount options.
274
    mount_options = {
275
        mount_points[mount_point]: mount_options[mount_point]
276
        for mount_point in actual_mountpoints
277
    }
278
279
    # Mock the device tree proxy.
280
    def get_device_mount_options(device_name):
281
        return mount_options[device_name]
282
283
    def set_device_mount_options(device_name, options):
284
        mount_options[device_name] = options
285
286
    device_tree_mock = STORAGE.get_proxy(DEVICE_TREE)
287
    device_tree_mock.GetMountPoints.return_value = mount_points
288
    device_tree_mock.GetDeviceMountOptions.side_effect = get_device_mount_options
289
    device_tree_mock.SetDeviceMountOptions.side_effect = set_device_mount_options
290
291
    messages = []
292
    for _ in range(messages_evaluation_count):
293
        messages = rule_data.eval_rules(ksdata_mock, storage_mock, report_only)
294
295
    return messages
296
297
298
def evaluation_add_mount_options(
299
        rule_data, ksdata_mock, storage_mock,
300
        messages_evaluation_count):
301
    rules = [
302
        "part /tmp --mountoptions=defaults,nodev",
303
        "part / --mountoptions=noauto",
304
    ]
305
306
    messages = get_messages_for_partition_rules(
307
        rule_data, ksdata_mock, storage_mock,
308
        rules, messages_evaluation_count)
309
310
    # two mount options added --> two info messages
311
    assert len(messages) == 2
312
    assert all(message.type == common.MESSAGE_TYPE_INFO for message in messages)
313
314
    # newly added mount options should be mentioned in the messages
315
    # together with their mount points
316
    nodev_found = False
317
    noauto_found = False
318
319
    for message in messages:
320
        if "'nodev'" in message.text:
321
            assert "/tmp" in message.text
322
            nodev_found = True
323
        elif "'noauto'" in message.text:
324
            assert "/" in message.text
325
            noauto_found = True
326
327
    assert all([nodev_found, noauto_found])
328
329
    device_tree_mock = STORAGE.get_proxy(DEVICE_TREE)
330
    device_tree_mock.SetDeviceMountOptions.assert_has_calls([
331
        mock.call("/dev/sda1", "defaults,nodev"),
332
        mock.call("/dev/sda2", "defaults,noauto"),
333
    ])
334
335
336
def test_evaluation_add_mount_options(
337
        proxy_getter, rule_data, ksdata_mock, storage_mock):
338
    evaluation_add_mount_options(rule_data, ksdata_mock, storage_mock, 1)
339
340
341
def test_evaluation_add_mount_options_no_duplicates(
342
        proxy_getter, rule_data, ksdata_mock, storage_mock):
343
    evaluation_add_mount_options(rule_data, ksdata_mock, storage_mock, 2)
344
345
346
def test_evaluation_add_mount_options_report_only(
347
        proxy_getter, rule_data, ksdata_mock, storage_mock):
348
    rules = [
349
        "part /tmp --mountoptions=nodev",
350
        "part / --mountoptions=noauto",
351
    ]
352
    messages = get_messages_for_partition_rules(
353
        rule_data, ksdata_mock, storage_mock,
354
        rules, 1, report_only=True)
355
356
    # two mount options added --> two info messages
357
    assert len(messages) == 2
358
    assert messages[0].type == common.MESSAGE_TYPE_INFO
359
    assert messages[1].type == common.MESSAGE_TYPE_INFO
360
361
    # newly added mount options should be mentioned in the messages
362
    # together with their mount points
363
    nodev_found = False
364
    noauto_found = False
365
366
    for message in messages:
367
        if "'nodev'" in message.text:
368
            assert "/tmp" in message.text
369
            nodev_found = True
370
        elif "'noauto'" in message.text:
371
            assert "/" in message.text
372
            noauto_found = True
373
374
    assert all([nodev_found, noauto_found])
375
376
    # no changes should be made
377
    device_tree_mock = STORAGE.get_proxy(DEVICE_TREE)
378
    device_tree_mock.SetDeviceMountOptions.assert_not_called()
379
380
381
def test_evaluation_add_mount_option_prefix(
382
        proxy_getter, rule_data, ksdata_mock, storage_mock):
383
    rules = [
384
        "part /tmp --mountoptions=nodev",
385
        "part / --mountoptions=noauto",
386
    ]
387
    mount_options = {
388
        "/": "defaults",
389
        "/tmp": "defaults,nodevice",
390
    }
391
    messages = get_messages_for_partition_rules(
392
        rule_data, ksdata_mock, storage_mock,
393
        rules, mount_options=mount_options)
394
395
    # two mount options added (even though it is a prefix of another one)
396
    #   --> two info messages
397
    assert len(messages) == 2
398
    assert messages[0].type == common.MESSAGE_TYPE_INFO
399
    assert messages[1].type == common.MESSAGE_TYPE_INFO
400
401
    # the option should be added even though it is a prefix of another one
402
    device_tree_mock = STORAGE.get_proxy(DEVICE_TREE)
403
    device_tree_mock.SetDeviceMountOptions.assert_has_calls([
404
        mock.call("/dev/sda1", "defaults,nodevice,nodev"),
405
    ])
406
407
408
def test_evaluation_add_mount_options_nonexisting_part(
409
        proxy_getter, rule_data, ksdata_mock, storage_mock):
410
    rules = [
411
        "part /tmp --mountoptions=nodev",
412
        "part / --mountoptions=noauto",
413
    ]
414
    messages = get_messages_for_partition_rules(
415
        rule_data, ksdata_mock, storage_mock,
416
        rules, actual_mountpoints=["/"])
417
418
    # one mount option added, one mount point missing (mount options
419
    # cannot be added) --> one info, one error
420
    assert len(messages) == 2
421
    assert any(message.type == common.MESSAGE_TYPE_INFO for message in messages)
422
    assert any(message.type == common.MESSAGE_TYPE_FATAL for message in messages)
423
424
    # the info message should report mount options added to the existing
425
    # mount point, the error message shoud contain the missing mount point
426
    # and not the mount option
427
    for message in messages:
428
        if message.type == common.MESSAGE_TYPE_INFO:
429
            assert "/" in message.text
430
            assert "'noauto'" in message.text
431
        elif message.type == common.MESSAGE_TYPE_FATAL:
432
            assert "/tmp" in message.text
433
            assert "'nodev'" not in message.text
434
435
436
def test_evaluation_passwd_minlen_no_passwd(
437
        proxy_getter, rule_data, ksdata_mock, storage_mock):
438
    password_proxy_mock = USERS.get_proxy()
439
    password_proxy_mock.IsRootPasswordSet = False
440
    evaluation_passwd_minlen_no_passwd(rule_data, ksdata_mock, storage_mock, 8, (10, 11))
441
    evaluation_passwd_minlen_no_passwd(rule_data, ksdata_mock, storage_mock, 10, (8, 11))
442
    evaluation_passwd_minlen_no_passwd(rule_data, ksdata_mock, storage_mock, 11, (8, 10))
443
444
445
def evaluation_passwd_minlen_no_passwd(
446
        rule_data, ksdata_mock, storage_mock, min_password_length, check_against=tuple()):
447
    rule_data.new_rule("passwd --minlen={0}".format(min_password_length))
448
449
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
450
451
    # minimal password length required --> one warning
452
    assert len(messages) == 1
453
    assert messages[0].type == common.MESSAGE_TYPE_WARNING
454
455
    # warning has to mention the length
456
    assert str(min_password_length) in messages[0].text
457
458
    for not_wanted in check_against:
459
        assert str(not_wanted) not in messages[0].text
460
461
462 View Code Duplication
def test_evaluation_passwd_minlen_short_passwd(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
463
        proxy_getter, rule_data, ksdata_mock, storage_mock):
464
    password_proxy_mock = USERS.get_proxy()
465
    password_proxy_mock.IsRootPasswordCrypted = False
466
    password_proxy_mock.RootPassword = "aaaa"
467
468
    rule_data.new_rule("passwd --minlen=8")
469
470
    messages = rule_data.eval_rules(ksdata_mock, storage_mock, report_only=False)
471
472
    # minimal password length greater than actual length --> one warning
473
    assert len(messages) == 1
474
    assert messages[0].type == common.MESSAGE_TYPE_FATAL
475
476
    # warning has to mention the length
477
    assert "8" in messages[0].text
478
479
    # warning should mention that something is wrong with the old password
480
    assert "is" in messages[0].text
481
482
    # doing changes --> password should not be cleared
483
    assert password_proxy_mock.RootPassword == "aaaa"
484
485
486 View Code Duplication
def test_evaluation_passwd_minlen_short_passwd_report_only(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
487
        proxy_getter, rule_data, ksdata_mock, storage_mock):
488
    password_proxy_mock = USERS.get_proxy()
489
    password_proxy_mock.IsRootPasswordCrypted = False
490
    password_proxy_mock.RootPassword = "aaaa"
491
492
    rule_data.new_rule("passwd --minlen=8")
493
494
    messages = rule_data.eval_rules(ksdata_mock, storage_mock, report_only=True)
495
496
    # minimal password length greater than actual length --> one warning
497
    assert len(messages) == 1
498
    assert messages[0].type == common.MESSAGE_TYPE_FATAL
499
500
    # warning has to mention the length
501
    assert "8" in messages[0].text
502
503
    # warning should mention that something is wrong with the old password
504
    assert "is" in messages[0].text
505
506
    # doing changes --> password should not be cleared
507
    assert password_proxy_mock.RootPassword == "aaaa"
508
509
510
def test_evaluation_passwd_minlen_crypted_passwd(
511
        proxy_getter, rule_data, ksdata_mock, storage_mock):
512
    password_proxy_mock = USERS.get_proxy()
513
    password_proxy_mock.IsRootPasswordCrypted = True
514
    password_proxy_mock.RootPassword = "aaaa"
515
516
    rule_data.new_rule("passwd --minlen=8")
517
518
    messages = rule_data.eval_rules(ksdata_mock, storage_mock, report_only=False)
519
520
    # minimal password length greater than actual length --> one warning
521
    assert len(messages) == 1
522
    assert messages[0].type == common.MESSAGE_TYPE_WARNING
523
524
    # warning has to mention that the password cannot be checked
525
    assert "cannot check" in messages[0].text
526
527
528
def test_evaluation_passwd_minlen_good_passwd(proxy_getter, rule_data, ksdata_mock, storage_mock):
529
    password_proxy_mock = USERS.get_proxy()
530
    password_proxy_mock.IsRootPasswordCrypted = False
531
    password_proxy_mock.RootPassword = "aaaaaaaaaaaaaaaaa"
532
533
    rule_data.new_rule("passwd --minlen=8")
534
535
    messages = rule_data.eval_rules(ksdata_mock, storage_mock, report_only=False)
536
537
    # minimal password length less than actual length --> no warning
538
    assert not messages
539
540
541
def test_evaluation_passwd_minlen_report_only_not_ignored(
542
        proxy_getter, rule_data, ksdata_mock, storage_mock):
543
    password_proxy_mock = USERS.get_proxy()
544
    password_proxy_mock.IsRootPasswordCrypted = False
545
    password_proxy_mock.RootPassword = "aaaaaaaaaaaaaaaaa"
546
547
    rule_data.new_rule("passwd --minlen=8")
548
549
    # call eval_rules with report_only=False
550
    # should set password minimal length to 8
551
    messages = rule_data.eval_rules(ksdata_mock, storage_mock, report_only=False)
552
553
    # Password Policy changed --> no warnings
554
    assert not messages
555
    assert rule_data._passwd_rules._orig_minlen == 6
556
    assert not rule_data._passwd_rules._orig_strict
557
    assert rule_data._passwd_rules._minlen == 8
558
559
    policy = PasswordPolicy.from_defaults(PASSWORD_POLICY_ROOT)
560
    policy.min_length = 8
561
    policy.is_strict = True
562
563
    policies = {PASSWORD_POLICY_ROOT: policy}
564
565
    ui_mock = BOSS.get_proxy(USER_INTERFACE)
566
    assert ui_mock.PasswordPolicies == \
567
        PasswordPolicy.to_structure_dict(policies)
568
569
    # call of eval_rules with report_only=True
570
    # should not change anything
571
    messages = rule_data.eval_rules(ksdata_mock, storage_mock, report_only=True)
572
    # Password Policy stayed the same --> no warnings
573
    assert not messages
574
575
    assert rule_data._passwd_rules._orig_minlen == 6
576
    assert not rule_data._passwd_rules._orig_strict
577
    assert rule_data._passwd_rules._minlen == 8
578
579
    assert ui_mock.PasswordPolicies == \
580
        PasswordPolicy.to_structure_dict(policies)
581
582
583
def _occurences_not_seen_in_strings(seeked, strings):
584
    found = set(seeked)
585
    for string in strings:
586
        for might_have_seen in seeked:
587
            if might_have_seen in string:
588
                found.add(string)
589
                break
590
    return set(seeked).difference(found)
591
592
593
def _quoted_keywords_not_seen_in_messages(keywords, messages):
594
    return _occurences_not_seen_in_strings(
595
        {"'{}'".format(kw) for kw in keywords},
596
        [m.text for m in messages],
597
    )
598
599
600
# Problem with this test: Package order in lists can lead to false positives
601
def test_evaluation_package_rules(proxy_getter, rule_data, ksdata_mock, storage_mock):
602
    rule_data.new_rule("package --add=firewalld --remove=telnet --add=vim")
603
604
    packages_data = PackagesSelectionData()
605
    packages_data.packages = ["vim"]
606
607
    dnf_payload_mock = PAYLOADS.get_proxy("/fake/payload/1")
608
    dnf_payload_mock.PackagesSelection = PackagesSelectionData.to_structure(packages_data)
609
610
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
611
612
    # one info message for each (really) added/removed package
613
    assert len(messages) == 2
614
    assert all(message.type == common.MESSAGE_TYPE_INFO for message in messages)
615
616
    # all packages should appear in the messages
617
    not_seen = _quoted_keywords_not_seen_in_messages(
618
        {"firewalld", "telnet"},
619
        messages,
620
    )
621
    assert not not_seen
622
623
    packages_data = PackagesSelectionData()
624
    packages_data.packages = ["vim", "firewalld"]
625
    packages_data.excluded_packages = ["telnet"]
626
627
    assert dnf_payload_mock.PackagesSelection == \
628
        PackagesSelectionData.to_structure(packages_data)
629
630
631
def test_evaluation_package_rules_report_only(proxy_getter, rule_data, ksdata_mock, storage_mock):
632
    rule_data.new_rule("package --add=firewalld --remove=telnet --add=iptables")
633
634
    messages = rule_data.eval_rules(ksdata_mock, storage_mock, report_only=True)
635
636
    # one info message for each added/removed package
637
    assert len(messages) == 3
638
    assert all(message.type == common.MESSAGE_TYPE_INFO for message in messages)
639
640
    not_seen = _quoted_keywords_not_seen_in_messages(
641
        {"firewalld", "telnet", "iptables"},
642
        messages,
643
    )
644
645
    assert not not_seen
646
647
    # report_only --> no packages should be added or excluded
648
    dnf_payload_mock = PAYLOADS.get_proxy("/fake/payload/1")
649
    packages_data = PackagesSelectionData()
650
651
    assert dnf_payload_mock.PackagesSelection == \
652
        PackagesSelectionData.to_structure(packages_data)
653
654
655
def test_evaluation_bootloader_passwd_not_set(proxy_getter, rule_data, ksdata_mock, storage_mock):
656
    bootloader_proxy_mock = STORAGE.get_proxy(BOOTLOADER)
657
    bootloader_proxy_mock.IsPasswordSet = False
658
659
    rule_data.new_rule("bootloader --passwd")
660
661
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
662
663
    # bootloader password not set --> one warning
664
    assert len(messages) == 1
665
    assert messages[0].type == common.MESSAGE_TYPE_WARNING
666
667
668
def test_evaluation_bootloader_passwd_set(proxy_getter, rule_data, ksdata_mock, storage_mock):
669
    bootloader_proxy_mock = STORAGE.get_proxy(BOOTLOADER)
670
    bootloader_proxy_mock.IsPasswordSet = True
671
672
    rule_data.new_rule("bootloader --passwd")
673
674
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
675
676
    # bootloader password set --> no warnings
677
    assert messages == []
678
679
680
def test_evaluation_various_rules(proxy_getter, rule_data, ksdata_mock, storage_mock):
681
    for rule in ["part /tmp", "part /", "passwd --minlen=14",
682
                 "package --add=firewalld", ]:
683
        rule_data.new_rule(rule)
684
685
    ksdata_mock.packages.packageList = []
686
    ksdata_mock.packages.excludedList = []
687
688
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
689
690
    # four rules, all fail --> four messages
691
    assert len(messages) == 4
692
693
694
def test_revert_mount_options_nonexistent(proxy_getter, rule_data, ksdata_mock, storage_mock):
695
    rule_data.new_rule("part /tmp --mountoptions=nodev")
696
697
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
698
699
    # mount point doesn't exist -> one message, nothing done
700
    assert len(messages) == 1
701
702
    device_tree_mock = STORAGE.get_proxy(DEVICE_TREE)
703
    device_tree_mock.SetDeviceMountOptions.assert_not_called()
704
705
    # mount point doesn't exist -> shouldn't do anything
706
    rule_data.revert_changes(ksdata_mock, storage_mock)
707
708
    device_tree_mock.SetDeviceMountOptions.assert_not_called()
709
710
711
def test_revert_mount_options(proxy_getter, rule_data, ksdata_mock, storage_mock):
712
    rule_data.new_rule("part /tmp --mountoptions=nodev")
713
714
    device_tree_mock = STORAGE.get_proxy(DEVICE_TREE)
715
    device_tree_mock.GetMountPoints.return_value = {
716
        "/tmp": "/dev/sda1",
717
    }
718
719
    def set_mount_options(device_name, mount_options):
720
        assert device_name == "/dev/sda1"
721
        device_tree_mock.GetDeviceMountOptions.return_value = mount_options
722
723
    device_tree_mock.SetDeviceMountOptions.side_effect = set_mount_options
724
    device_tree_mock.GetDeviceMountOptions.return_value = "defaults"
725
726
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
727
728
    # mount option added --> one message
729
    assert len(messages) == 1
730
731
    # "nodev" option should be added
732
    device_tree_mock.SetDeviceMountOptions.assert_called_once_with(
733
        "/dev/sda1", "defaults,nodev"
734
    )
735
736
    rule_data.revert_changes(ksdata_mock, storage_mock)
737
738
    # should be reverted to the original value
739
    device_tree_mock.SetDeviceMountOptions.assert_called_with(
740
        "/dev/sda1", "defaults"
741
    )
742
743
    # another cycle of the same #
744
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
745
746
    # mount option added --> one message
747
    assert len(messages) == 1
748
749
    # "nodev" option should be added
750
    device_tree_mock.SetDeviceMountOptions.assert_called_with(
751
        "/dev/sda1", "defaults,nodev"
752
    )
753
754
    rule_data.revert_changes(ksdata_mock, storage_mock)
755
756
    # should be reverted to the original value
757
    device_tree_mock.SetDeviceMountOptions.assert_called_with(
758
        "/dev/sda1", "defaults"
759
    )
760
761
762
def test_revert_password_policy_changes(proxy_getter, rule_data, ksdata_mock, storage_mock):
763
    password_proxy_mock = USERS.get_proxy()
764
    password_proxy_mock.IsRootPasswordCrypted = False
765
    password_proxy_mock.RootPassword = "aaaa"
766
767
    # FIXME: Add password policy changes to this test. It only checks
768
    # password length right now outside of policy changes.
769
    rule_data.new_rule("passwd --minlen=8")
770
771
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
772
773
    # password error --> one message
774
    assert len(messages) == 1
775
776
    rule_data.revert_changes(ksdata_mock, storage_mock)
777
778
    # with long enough password this time #
779
    password_proxy_mock.RootPassword = "aaaaaaaaaaaaa"
780
781
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
782
783
    # long enough password
784
    # entered --> no message
785
    assert messages == []
786
787
788
def test_revert_package_rules(proxy_getter, rule_data, ksdata_mock, storage_mock):
789
    rule_data.new_rule("package --add=firewalld --remove=telnet --add=iptables --add=vim")
790
791
    packages_data = PackagesSelectionData()
792
    packages_data.packages = ["vim"]
793
794
    dnf_payload_mock = PAYLOADS.get_proxy("/fake/payload/1")
795
    dnf_payload_mock.PackagesSelection = PackagesSelectionData.to_structure(packages_data)
796
797
    # run twice --> nothing should be different in the second run
798
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
799
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
800
801
    # one info message for each added/removed package
802
    assert len(messages) == 3
803
804
    rule_data.revert_changes(ksdata_mock, storage_mock)
805
806
    # (only) added and excluded packages should have been removed from the
807
    # list
808
    assert dnf_payload_mock.PackagesSelection == \
809
        PackagesSelectionData.to_structure(packages_data)
810
811
    # now do the same again #
812
    messages = rule_data.eval_rules(ksdata_mock, storage_mock)
813
814
    # one info message for each added/removed package
815
    assert len(messages) == 3
816
817
    rule_data.revert_changes(ksdata_mock, storage_mock)
818
819
    # (only) added and excluded packages should have been removed from the
820
    # list
821
    assert dnf_payload_mock.PackagesSelection == \
822
        PackagesSelectionData.to_structure(packages_data)
823