Passed
Pull Request — rhel9-branch (#159)
by Matěj
59s
created

org_fedora_oscap.service.installation.terminate()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import os
20
import shutil
21
22
from pyanaconda.core import util
23
from pyanaconda.modules.common.task import Task
24
from pyanaconda.modules.common.errors.installation import NonCriticalInstallationError
25
26
from org_fedora_oscap import common, data_fetch, rule_handling, utils
27
from org_fedora_oscap.common import _, get_packages_data, set_packages_data
28
from org_fedora_oscap.content_handling import ContentCheckError
29
from org_fedora_oscap import content_discovery
30
31
log = logging.getLogger(__name__)
32
33
34
REQUIRED_PACKAGES = ("openscap", "openscap-scanner",)
35
36
37
def _handle_error(exception):
38
    log.error("Failed to fetch and initialize SCAP content!")
39
40
    if isinstance(exception, ContentCheckError):
41
        msg = _("The integrity check of the security content failed.")
42
        terminate(msg)
43
    elif (isinstance(exception, common.OSCAPaddonError)
44
        or isinstance(exception, data_fetch.DataFetchError)):
45
        msg = _("There was an error fetching and loading the security content:\n" +
46
                f"{str(exception)}")
47
        terminate(msg)
48
49
    else:
50
        msg = _("There was an unexpected problem with the supplied content.")
51
        terminate(msg)
52
53
54
def terminate(message):
55
    message += "\n" + _("The installation should be aborted.")
56
    raise NonCriticalInstallationError(message)
57
58
59
class PrepareValidContent(Task):
60
    """The installation task for fetching the content."""
61
62
    def __init__(self, policy_data, file_path, content_path):
63
        """Create a task."""
64
        super().__init__()
65
        self._policy_data = policy_data
66
        self._file_path = file_path
67
        self._content_path = content_path
68
        self.content_bringer = content_discovery.ContentBringer(policy_data)
69
70
    @property
71
    def name(self):
72
        return "Fetch the content, and optionally perform check or archive extraction"
73
74
    def run(self):
75
        """Run the task."""
76
        # Is the content available?
77
        fetching_thread_name = None
78
        if not os.path.exists(self._content_path):
79
            # content not available/fetched yet
80
            fetching_thread_name = self.content_bringer.fetch_content(_handle_error, self._policy_data.certificates)
81
82
        content_dest = None
83
        if self._policy_data.content_type != "scap-security-guide":
84
            content_dest = self._file_path
85
86
        content = self.content_bringer.finish_content_fetch(
87
            fetching_thread_name, self._policy_data.fingerprint, lambda msg: log.info(msg), content_dest, _handle_error)
88
89
        if not content:
90
            # this shouldn't happen because error handling is supposed to
91
            # terminate the addon before finish_content_fetch returns
92
            _handle_error(Exception())
93
94
        remote_content_was_present = (
95
            not fetching_thread_name
96
            and self._policy_data.content_type != "scap-security-guide")
97
        if remote_content_was_present:
98
            content.add_file(self._content_path)
99
100
        try:
101
            # just check that preferred content exists
102
            _ = self.content_bringer.get_preferred_content(content)
103
        except Exception as exc:
104
            terminate(str(exc))
105
106
107
class EvaluateRulesTask(Task):
108
    """The installation task for the evaluation of the rules."""
109
110
    def __init__(self, policy_data, content_path, tailoring_path):
111
        """Create a task."""
112
        super().__init__()
113
        self._policy_data = policy_data
114
        self._content_path = content_path
115
        self._tailoring_path = tailoring_path
116
117
    @property
118
    def name(self):
119
        return "Evaluate the rules"
120
121
    def run(self):
122
        """Run the task."""
123
        rule_data = self._initialize_rules()
124
        self._evaluate_rules(rule_data)
125
126
    def _initialize_rules(self):
127
        try:
128
            rule_data = rule_handling.get_rule_data_from_content(
129
                self._policy_data.profile_id, self._content_path,
130
                self._policy_data.datastream_id, self._policy_data.xccdf_id,
131
                self._tailoring_path)
132
            return rule_data
133
134
        except common.OSCAPaddonError as e:
135
            _handle_error(e)
136
137
138
    def _evaluate_rules(self, rule_data):
139
        # evaluate rules, do automatic fixes and stop if something that cannot
140
        # be fixed automatically is wrong
141
        all_messages = rule_data.eval_rules(None, None)
142
        fatal_messages = [message for message in all_messages
143
                          if message.type == common.MESSAGE_TYPE_FATAL]
144
        if any(fatal_messages):
145
            msg_lines = [_("Wrong configuration detected!")]
146
            msg_lines.extend([m.text for m in fatal_messages])
147
            terminate("\n".join(msg_lines))
148
            return
149
150
        # add packages needed on the target system to the list of packages
151
        # that are requested to be installed
152
        packages_data = get_packages_data()
153
        pkgs_to_install = list(REQUIRED_PACKAGES)
154
155
        if self._policy_data.content_type == "scap-security-guide":
156
            pkgs_to_install.append("scap-security-guide")
157
158
        for pkg in pkgs_to_install:
159
            if pkg not in packages_data.packages:
160
                packages_data.packages.append(pkg)
161
162
        set_packages_data(packages_data)
163
164
165
class InstallContentTask(Task):
166
    """The installation task for installation of the content."""
167
168
    def __init__(self, sysroot, policy_data, file_path,
169
                 content_path, tailoring_path, target_directory):
170
        """Create a task."""
171
        super().__init__()
172
        self._sysroot = sysroot
173
        self._policy_data = policy_data
174
        self._file_path = file_path
175
        self._content_path = content_path
176
        self._tailoring_path = tailoring_path
177
        self._target_directory = target_directory
178
179
    @property
180
    def name(self):
181
        return "Install the content"
182
183
    def run(self):
184
        """Run the task."""
185
        target_content_dir = utils.join_paths(
186
            self._sysroot,
187
            self._target_directory
188
        )
189
190
        utils.ensure_dir_exists(target_content_dir)
191
192
        if self._policy_data.content_type == "scap-security-guide":
193
            pass  # nothing needed
194
        elif self._policy_data.content_type == "datastream":
195
            shutil.copy2(self._content_path, target_content_dir)
196
        elif self._policy_data.content_type == "rpm":
197
            # copy the RPM to the target system
198
            shutil.copy2(self._file_path, target_content_dir)
199
200
            # get the path of the RPM
201
            content_name = common.get_content_name(self._policy_data)
202
            package_path = utils.join_paths(self._target_directory, content_name)
203
204
            # and install it with yum
205
            ret = util.execInSysroot(
206
                "yum", ["-y", "--nogpg", "install", package_path]
207
            )
208
209
            if ret != 0:
210
                raise common.ExtractionError(
211
                    "Failed to install content RPM to the target system"
212
                )
213
        else:
214
            pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*")
215
            utils.universal_copy(pattern, target_content_dir)
216
217
        if os.path.exists(self._tailoring_path):
218
            shutil.copy2(self._tailoring_path, target_content_dir)
219
220
221
class RemediateSystemTask(Task):
222
    """The installation task for running the remediation."""
223
224
    def __init__(self, sysroot, policy_data, target_content_path,
225
                 target_tailoring_path):
226
        """Create a task."""
227
        super().__init__()
228
        self._sysroot = sysroot
229
        self._policy_data = policy_data
230
        self._target_content_path = target_content_path
231
        self._target_tailoring_path = target_tailoring_path
232
233
    @property
234
    def name(self):
235
        return "Remediate the system"
236
237
    def run(self):
238
        """Run the task."""
239
        common.run_oscap_remediate(
240
            self._policy_data.profile_id,
241
            self._target_content_path,
242
            self._policy_data.datastream_id,
243
            self._policy_data.xccdf_id,
244
            self._target_tailoring_path,
245
            chroot=self._sysroot
246
        )
247