Passed
Pull Request — rhel9-branch (#159)
by Matěj
01:02
created

org_fedora_oscap.service.installation.terminate()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import os
20
import shutil
21
22
from pyanaconda.core import util
23
from pyanaconda.modules.common.task import Task
24
from pyanaconda.modules.common.errors.installation import NonCriticalInstallationError
25
26
from org_fedora_oscap import common, data_fetch, rule_handling, utils
27
from org_fedora_oscap.common import _, get_packages_data, set_packages_data
28
from org_fedora_oscap.content_handling import ContentCheckError
29
from org_fedora_oscap import content_discovery
30
31
log = logging.getLogger(__name__)
32
33
34
REQUIRED_PACKAGES = ("openscap", "openscap-scanner",)
35
36
37
def _handle_error(exception):
38
    log.error("Failed to fetch and initialize SCAP content!")
39
40
    if isinstance(exception, ContentCheckError):
41
        msg = _("The integrity check of the security content failed.")
42
        terminate(msg)
43
    elif (isinstance(exception, common.OSCAPaddonError)
44
        or isinstance(exception, data_fetch.DataFetchError)):
45
        msg = _("There was an error fetching and loading the security content:\n" +
46
                f"{str(exception)}")
47
        terminate(msg)
48
49
    else:
50
        msg = _("There was an unexpected problem with the supplied content.")
51
        terminate(msg)
52
53
54
def terminate(message):
55
    message += "\n" + _("The installation should be aborted.")
56
    message += " " + _("Do you wish to continue anyway?")
57
    raise NonCriticalInstallationError(message)
58
59
60
class PrepareValidContent(Task):
61
    """The installation task for fetching the content."""
62
63
    def __init__(self, policy_data, file_path, content_path):
64
        """Create a task."""
65
        super().__init__()
66
        self._policy_data = policy_data
67
        self._file_path = file_path
68
        self._content_path = content_path
69
        self.content_bringer = content_discovery.ContentBringer(policy_data)
70
71
    @property
72
    def name(self):
73
        return "Fetch the content, and optionally perform check or archive extraction"
74
75
    def run(self):
76
        """Run the task."""
77
        # Is the content available?
78
        fetching_thread_name = None
79
        if not os.path.exists(self._content_path) and not os.path.exists(self._file_path):
80
            # content not available/fetched yet
81
            fetching_thread_name = self.content_bringer.fetch_content(_handle_error, self.certificates)
82
83
        content_dest = None
84
        if self._policy_data.content_type != "scap-security-guide":
85
            content_dest = self._file_path
86
87
        content = self.content_bringer.finish_content_fetch(
88
            fetching_thread_name, self._policy_data.fingerprint, lambda msg: log.info(msg), content_dest, _handle_error)
89
90
        if not content:
91
            return
92
93
        try:
94
            # just check that preferred content exists
95
            _ = self.content_bringer.get_preferred_content(content)
96
        except Exception as exc:
97
            terminate(str(exc))
98
99
100
class EvaluateRulesTask(Task):
101
    """The installation task for the evaluation of the rules."""
102
103
    def __init__(self, policy_data, content_path, tailoring_path):
104
        """Create a task."""
105
        super().__init__()
106
        self._policy_data = policy_data
107
        self._content_path = content_path
108
        self._tailoring_path = tailoring_path
109
110
    @property
111
    def name(self):
112
        return "Evaluate the rules"
113
114
    def run(self):
115
        """Run the task."""
116
        rule_data = self._initialize_rules()
117
        self._evaluate_rules(rule_data)
118
119
    def _initialize_rules(self):
120
        try:
121
            rule_data = rule_handling.get_rule_data_from_content(
122
                self._policy_data.profile_id, self._content_path,
123
                self._policy_data.datastream_id, self._policy_data.xccdf_id,
124
                self._tailoring_path)
125
            return rule_data
126
127
        except common.OSCAPaddonError as e:
128
            _handle_error(e)
129
130
131
    def _evaluate_rules(self, rule_data):
132
        # evaluate rules, do automatic fixes and stop if something that cannot
133
        # be fixed automatically is wrong
134
        all_messages = rule_data.eval_rules(None, None)
135
        fatal_messages = [message for message in all_messages
136
                          if message.type == common.MESSAGE_TYPE_FATAL]
137
        if any(fatal_messages):
138
            msg_lines = [_("Wrong configuration detected!")]
139
            msg_lines.extend(fatal_messages)
140
            terminate("\n".join(msg_lines))
141
            return
142
143
        # add packages needed on the target system to the list of packages
144
        # that are requested to be installed
145
        packages_data = get_packages_data()
146
        pkgs_to_install = list(REQUIRED_PACKAGES)
147
148
        if self._policy_data.content_type == "scap-security-guide":
149
            pkgs_to_install.append("scap-security-guide")
150
151
        for pkg in pkgs_to_install:
152
            if pkg not in packages_data.packages:
153
                packages_data.packages.append(pkg)
154
155
        set_packages_data(packages_data)
156
157
158
class InstallContentTask(Task):
159
    """The installation task for installation of the content."""
160
161
    def __init__(self, sysroot, policy_data, file_path,
162
                 content_path, tailoring_path, target_directory):
163
        """Create a task."""
164
        super().__init__()
165
        self._sysroot = sysroot
166
        self._policy_data = policy_data
167
        self._file_path = file_path
168
        self._content_path = content_path
169
        self._tailoring_path = tailoring_path
170
        self._target_directory = target_directory
171
172
    @property
173
    def name(self):
174
        return "Install the content"
175
176
    def run(self):
177
        """Run the task."""
178
        target_content_dir = utils.join_paths(
179
            self._sysroot,
180
            self._target_directory
181
        )
182
183
        utils.ensure_dir_exists(target_content_dir)
184
185
        if self._policy_data.content_type == "scap-security-guide":
186
            pass  # nothing needed
187
        elif self._policy_data.content_type == "datastream":
188
            shutil.copy2(self._content_path, target_content_dir)
189
        elif self._policy_data.content_type == "rpm":
190
            # copy the RPM to the target system
191
            shutil.copy2(self._file_path, target_content_dir)
192
193
            # get the path of the RPM
194
            content_name = common.get_content_name(self._policy_data)
195
            package_path = utils.join_paths(self._target_directory, content_name)
196
197
            # and install it with yum
198
            ret = util.execInSysroot(
199
                "yum", ["-y", "--nogpg", "install", package_path]
200
            )
201
202
            if ret != 0:
203
                raise common.ExtractionError(
204
                    "Failed to install content RPM to the target system"
205
                )
206
        else:
207
            pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*")
208
            utils.universal_copy(pattern, target_content_dir)
209
210
        if os.path.exists(self._tailoring_path):
211
            shutil.copy2(self._tailoring_path, target_content_dir)
212
213
214
class RemediateSystemTask(Task):
215
    """The installation task for running the remediation."""
216
217
    def __init__(self, sysroot, policy_data, target_content_path,
218
                 target_tailoring_path):
219
        """Create a task."""
220
        super().__init__()
221
        self._sysroot = sysroot
222
        self._policy_data = policy_data
223
        self._target_content_path = target_content_path
224
        self._target_tailoring_path = target_tailoring_path
225
226
    @property
227
    def name(self):
228
        return "Remediate the system"
229
230
    def run(self):
231
        """Run the task."""
232
        common.run_oscap_remediate(
233
            self._policy_data.profile_id,
234
            self._target_content_path,
235
            self._policy_data.datastream_id,
236
            self._policy_data.xccdf_id,
237
            self._target_tailoring_path,
238
            chroot=self._sysroot
239
        )
240