Passed
Push — rhel9-branch ( 6fbab8...4bb28d )
by Matěj
01:36 queued 12s
created

org_fedora_oscap.service.installation   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 305
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 210
dl 0
loc 305
rs 8.96
c 0
b 0
f 0
wmc 43

16 Methods

Rating   Name   Duplication   Size   Complexity  
A InstallContentTask.name() 0 3 1
A InstallContentTask.__init__() 0 10 1
A PrepareValidContent.name() 0 3 1
B PrepareValidContent.run() 0 44 8
A EvaluateRulesTask.run() 0 4 1
A EvaluateRulesTask.__init__() 0 6 1
A EvaluateRulesTask.name() 0 3 1
B InstallContentTask.run() 0 26 6
A EvaluateRulesTask._evaluate_rules() 0 25 5
A InstallContentTask._copy_rpm_to_target_and_install() 0 5 1
A EvaluateRulesTask._initialize_rules() 0 10 2
A InstallContentTask._attempt_rpm_installation() 0 22 3
A PrepareValidContent.__init__() 0 8 1
A RemediateSystemTask.run() 0 26 3
A RemediateSystemTask.name() 0 3 1
A RemediateSystemTask.__init__() 0 8 1

2 Functions

Rating   Name   Duplication   Size   Complexity  
A _handle_error() 0 20 5
A terminate() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.service.installation often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import os
20
import shutil
21
import io
22
from org_fedora_oscap.data_handling import PolicyDataHandler
23
24
from pyanaconda.core import util
25
from pyanaconda.modules.common.task import Task
26
from pyanaconda.modules.common.errors.installation import NonCriticalInstallationError
27
from pykickstart.errors import KickstartValueError
28
29
from org_fedora_oscap import common, data_fetch, rule_handling, utils
30
from org_fedora_oscap.common import _, get_packages_data, set_packages_data
31
from org_fedora_oscap.content_handling import ContentCheckError, ContentHandlingError
32
from org_fedora_oscap import content_discovery
33
34
log = logging.getLogger("anaconda")
35
36
37
# scanner is useful for the post remediation
38
REQUIRED_PACKAGES = ("openscap", "openscap-scanner")
39
40
41
def _handle_error(exception):
42
    log.error("OSCAP Addon: Failed to fetch and initialize SCAP content!")
43
44
    if isinstance(exception, ContentCheckError):
45
        msg = _("The integrity check of the security content failed.")
46
        terminate(msg)
47
    elif (
48
            isinstance(exception, common.OSCAPaddonError)
49
            or isinstance(exception, data_fetch.DataFetchError)):
50
        msg = _("There was an error fetching and loading the security content:\n" +
51
                f"{str(exception)}")
52
        terminate(msg)
53
    elif isinstance(exception, ContentHandlingError):
54
        msg = _("There was a problem with the supplied security content:\n" +
55
                f"{str(exception)}")
56
        terminate(msg)
57
58
    else:
59
        msg = _("There was an unexpected problem with the supplied content.")
60
        terminate(msg)
61
62
63
def terminate(message):
64
    message += "\n" + _("The installation should be aborted.")
65
    raise NonCriticalInstallationError(message)
66
67
68
class PrepareValidContent(Task):
69
    """The installation task for fetching the content."""
70
71
    def __init__(self, policy_data, file_path, content_path):
72
        """Create a task."""
73
        super().__init__()
74
        self._policy_data = policy_data
75
        self._file_path = file_path
76
        self._content_path = content_path
77
        self.content_bringer = content_discovery.ContentBringer(_handle_error)
78
        self.data_handler = PolicyDataHandler(self._policy_data)
79
80
    @property
81
    def name(self):
82
        return "Fetch the content, and optionally perform check or archive extraction"
83
84
    def run(self):
85
        """Run the task."""
86
        # Is the content available?
87
        fetching_thread_name = None
88
        if (self.data_handler.content_is_fetchable()
89
                and not os.path.exists(self._content_path)):
90
            # content not available/fetched yet
91
            fetching_thread_name = self.content_bringer.fetch_content(
92
                self._policy_data.content_url,
93
                self._policy_data.certificates)
94
95
        content_dest = None
96
        fingerprint = None
97
        if self._policy_data.content_type != "scap-security-guide":
98
            content_dest = self._file_path
99
            fingerprint = self._policy_data.fingerprint
100
101
        expected_path = common.get_preinst_content_path(self._policy_data)
102
        expected_tailoring = common.get_preinst_tailoring_path(self._policy_data)
103
        expected_cpe_path = self._policy_data.cpe_path
104
        if fetching_thread_name is not None:
105
            self.content_bringer.finish_content_fetch(
106
                fetching_thread_name, fingerprint)
107
        content = content_discovery.ContentAnalyzer.analyze(
108
            fetching_thread_name, self._policy_data.fingerprint,
109
            content_dest, _handle_error, expected_path, expected_tailoring,
110
            expected_cpe_path)
111
112
        if not content:
113
            # this shouldn't happen because error handling is supposed to
114
            # terminate the addon before finish_content_fetch returns
115
            _handle_error(Exception())
116
117
        remote_content_was_present = (
118
            not fetching_thread_name
119
            and self._policy_data.content_type != "scap-security-guide")
120
        if remote_content_was_present:
121
            content.add_file(self._content_path)
122
123
        try:
124
            # just check that preferred content exists
125
            _ = content.get_preferred_content(self._policy_data.content_path)
126
        except Exception as exc:
127
            terminate(str(exc))
128
129
130
class EvaluateRulesTask(Task):
131
    """The installation task for the evaluation of the rules."""
132
133
    def __init__(self, policy_data, content_path, tailoring_path):
134
        """Create a task."""
135
        super().__init__()
136
        self._policy_data = policy_data
137
        self._content_path = content_path
138
        self._tailoring_path = tailoring_path
139
140
    @property
141
    def name(self):
142
        return "Evaluate the rules"
143
144
    def run(self):
145
        """Run the task."""
146
        rule_data = self._initialize_rules()
147
        self._evaluate_rules(rule_data)
148
149
    def _initialize_rules(self):
150
        try:
151
            rule_data = rule_handling.get_rule_data_from_content(
152
                self._policy_data.profile_id, self._content_path,
153
                self._policy_data.datastream_id, self._policy_data.xccdf_id,
154
                self._tailoring_path)
155
            return rule_data
156
157
        except common.OSCAPaddonError as e:
158
            _handle_error(e)
159
160
161
    def _evaluate_rules(self, rule_data):
162
        # evaluate rules, do automatic fixes and stop if something that cannot
163
        # be fixed automatically is wrong
164
        all_messages = rule_data.eval_rules(None, None)
165
        fatal_messages = [message for message in all_messages
166
                          if message.type == common.MESSAGE_TYPE_FATAL]
167
        if any(fatal_messages):
168
            msg_lines = [_("Wrong configuration detected!")]
169
            msg_lines.extend([m.text for m in fatal_messages])
170
            terminate("\n".join(msg_lines))
171
            return
172
173
        # add packages needed on the target system to the list of packages
174
        # that are requested to be installed
175
        packages_data = get_packages_data()
176
        pkgs_to_install = list(REQUIRED_PACKAGES)
177
178
        if self._policy_data.content_type == "scap-security-guide":
179
            pkgs_to_install.append("scap-security-guide")
180
181
        for pkg in pkgs_to_install:
182
            if pkg not in packages_data.packages:
183
                packages_data.packages.append(pkg)
184
185
        set_packages_data(packages_data)
186
187
188
class InstallContentTask(Task):
189
    """The installation task for installation of the content."""
190
191
    def __init__(self, sysroot, policy_data, file_path,
192
                 content_path, tailoring_path, target_directory):
193
        """Create a task."""
194
        super().__init__()
195
        self._sysroot = sysroot
196
        self._policy_data = policy_data
197
        self._file_path = file_path
198
        self._content_path = content_path
199
        self._tailoring_path = tailoring_path
200
        self._target_directory = target_directory
201
202
    @property
203
    def name(self):
204
        return "Install the content"
205
206
    def run(self):
207
        """Run the task."""
208
        target_content_dir = utils.join_paths(
209
            self._sysroot,
210
            self._target_directory
211
        )
212
213
        utils.ensure_dir_exists(target_content_dir)
214
215
        if self._policy_data.content_type == "scap-security-guide":
216
            pass  # nothing needed
217
        elif self._policy_data.content_type == "datastream":
218
            shutil.copy2(self._content_path, target_content_dir)
219
        elif self._policy_data.content_type == "rpm":
220
            try:
221
                self._copy_rpm_to_target_and_install(target_content_dir)
222
223
            except Exception as exc:
224
                terminate(str(exc))
225
                return
226
        else:
227
            pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*")
228
            utils.universal_copy(pattern, target_content_dir)
229
230
        if os.path.exists(self._tailoring_path):
231
            shutil.copy2(self._tailoring_path, target_content_dir)
232
233
    def _attempt_rpm_installation(self, chroot_package_path):
234
        log.info("OSCAP addon: Installing the security content RPM to the installed system.")
235
        stdout = io.StringIO()
236
        ret = util.execWithRedirect(
237
                "dnf", ["-y", "--nogpg", "install", chroot_package_path],
238
                stdout=stdout, root=self._sysroot)
239
        stdout.seek(0)
240
        if ret != 0:
241
            log.error(
242
                "OSCAP addon: Error installing security content RPM using yum: {0}",
243
                stdout.read())
244
245
            stdout = io.StringIO()
246
            ret = util.execWithRedirect(
247
                    "rpm", ["--install", "--nodeps", chroot_package_path],
248
                    stdout=stdout, root=self._sysroot)
249
            if ret != 0:
250
                log.error(
251
                    "OSCAP addon: Error installing security content RPM using rpm: {0}",
252
                    stdout.read())
253
                msg = _(f"Failed to install content RPM to the target system.")
254
                raise RuntimeError(msg)
255
256
    def _copy_rpm_to_target_and_install(self, target_content_dir):
257
        shutil.copy2(self._file_path, target_content_dir)
258
        content_name = common.get_content_name(self._policy_data)
259
        chroot_package_path = utils.join_paths(self._target_directory, content_name)
260
        self._attempt_rpm_installation(chroot_package_path)
261
262
263
class RemediateSystemTask(Task):
264
    """The installation task for running the remediation."""
265
266
    def __init__(self, sysroot, policy_data, target_content_path,
267
                 target_tailoring_path):
268
        """Create a task."""
269
        super().__init__()
270
        self._sysroot = sysroot
271
        self._policy_data = policy_data
272
        self._target_content_path = target_content_path
273
        self._target_tailoring_path = target_tailoring_path
274
275
    @property
276
    def name(self):
277
        return "Remediate the system"
278
279
    def run(self):
280
        """Run the task."""
281
        try:
282
            common.assert_scanner_works(
283
                chroot=self._sysroot, executable="oscap")
284
        except Exception as exc:
285
            msg_lines = [_(
286
                "The 'oscap' scanner doesn't work in the installed system: {error}"
287
                .format(error=str(exc)))]
288
            msg_lines.append(_("As a result, the installed system can't be hardened."))
289
            terminate("\n".join(msg_lines))
290
            return
291
292
        try:
293
            common.run_oscap_remediate(
294
                self._policy_data.profile_id,
295
                self._target_content_path,
296
                self._policy_data.datastream_id,
297
                self._policy_data.xccdf_id,
298
                self._target_tailoring_path,
299
                chroot=self._sysroot
300
            )
301
        except Exception as exc:
302
            msg = _(f"Something went wrong during the final hardening: {str(exc)}.")
303
            terminate(msg)
304
            return
305