Passed
Pull Request — rhel9-branch (#159)
by Matěj
01:11
created

CheckFingerprintTask.run()   A

Complexity

Conditions 3

Size

Total Lines 15
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 11
nop 1
dl 0
loc 15
rs 9.85
c 0
b 0
f 0
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import os
20
import shutil
21
22
from pyanaconda.core import util
23
from pyanaconda.modules.common.task import Task
24
from pyanaconda.modules.common.errors.installation import NonCriticalInstallationError
25
26
from org_fedora_oscap import common, data_fetch, rule_handling, utils
27
from org_fedora_oscap.common import _, get_packages_data, set_packages_data
28
from org_fedora_oscap.content_handling import ContentCheckError
29
from org_fedora_oscap import content_discovery
30
31
log = logging.getLogger(__name__)
32
33
34
REQUIRED_PACKAGES = ("openscap", "openscap-scanner",)
35
36
37
def _handle_error(exception):
38
    log.error("Failed to fetch and initialize SCAP content!")
39
40
    if isinstance(exception, ContentCheckError):
41
        msg = _("The integrity check of the security content failed.")
42
        terminate(msg)
43
    elif (isinstance(exception, common.OSCAPaddonError)
44
        or isinstance(exception, data_fetch.DataFetchError)):
45
        msg = _("There was an error fetching and loading the security content:\n" +
46
                f"{str(exception)}")
47
        terminate(msg)
48
49
    else:
50
        msg = _("There was an unexpected problem with the supplied content.")
51
        terminate(msg)
52
53
54
def terminate(message):
55
    message += "\n" + _("The installation should be aborted.")
56
    raise NonCriticalInstallationError(message)
57
58
59
class PrepareValidContent(Task):
60
    """The installation task for fetching the content."""
61
62
    def __init__(self, policy_data, file_path, content_path):
63
        """Create a task."""
64
        super().__init__()
65
        self._policy_data = policy_data
66
        self._file_path = file_path
67
        self._content_path = content_path
68
        self.content_bringer = content_discovery.ContentBringer(policy_data)
69
70
    @property
71
    def name(self):
72
        return "Fetch the content, and optionally perform check or archive extraction"
73
74
    def run(self):
75
        """Run the task."""
76
        # Is the content available?
77
        fetching_thread_name = None
78
        if not os.path.exists(self._content_path) and not os.path.exists(self._file_path):
79
            # content not available/fetched yet
80
            fetching_thread_name = self.content_bringer.fetch_content(_handle_error, self._policy_data.certificates)
81
        else:
82
            # Content is already there, so we are either using stock content,
83
            # or it has already been downloaded by previous trusted procedures.
84
            return
85
86
        content_dest = None
87
        if self._policy_data.content_type != "scap-security-guide":
88
            content_dest = self._file_path
89
90
        content = self.content_bringer.finish_content_fetch(
91
            fetching_thread_name, self._policy_data.fingerprint, lambda msg: log.info(msg), content_dest, _handle_error)
92
93
        if not content:
94
            return
95
96
        try:
97
            # just check that preferred content exists
98
            _ = self.content_bringer.get_preferred_content(content)
99
        except Exception as exc:
100
            terminate(str(exc))
101
102
103
class EvaluateRulesTask(Task):
104
    """The installation task for the evaluation of the rules."""
105
106
    def __init__(self, policy_data, content_path, tailoring_path):
107
        """Create a task."""
108
        super().__init__()
109
        self._policy_data = policy_data
110
        self._content_path = content_path
111
        self._tailoring_path = tailoring_path
112
113
    @property
114
    def name(self):
115
        return "Evaluate the rules"
116
117
    def run(self):
118
        """Run the task."""
119
        rule_data = self._initialize_rules()
120
        self._evaluate_rules(rule_data)
121
122
    def _initialize_rules(self):
123
        try:
124
            rule_data = rule_handling.get_rule_data_from_content(
125
                self._policy_data.profile_id, self._content_path,
126
                self._policy_data.datastream_id, self._policy_data.xccdf_id,
127
                self._tailoring_path)
128
            return rule_data
129
130
        except common.OSCAPaddonError as e:
131
            _handle_error(e)
132
133
134
    def _evaluate_rules(self, rule_data):
135
        # evaluate rules, do automatic fixes and stop if something that cannot
136
        # be fixed automatically is wrong
137
        all_messages = rule_data.eval_rules(None, None)
138
        fatal_messages = [message for message in all_messages
139
                          if message.type == common.MESSAGE_TYPE_FATAL]
140
        if any(fatal_messages):
141
            msg_lines = [_("Wrong configuration detected!")]
142
            msg_lines.extend([m.text for m in fatal_messages])
143
            terminate("\n".join(msg_lines))
144
            return
145
146
        # add packages needed on the target system to the list of packages
147
        # that are requested to be installed
148
        packages_data = get_packages_data()
149
        pkgs_to_install = list(REQUIRED_PACKAGES)
150
151
        if self._policy_data.content_type == "scap-security-guide":
152
            pkgs_to_install.append("scap-security-guide")
153
154
        for pkg in pkgs_to_install:
155
            if pkg not in packages_data.packages:
156
                packages_data.packages.append(pkg)
157
158
        set_packages_data(packages_data)
159
160
161
class InstallContentTask(Task):
162
    """The installation task for installation of the content."""
163
164
    def __init__(self, sysroot, policy_data, file_path,
165
                 content_path, tailoring_path, target_directory):
166
        """Create a task."""
167
        super().__init__()
168
        self._sysroot = sysroot
169
        self._policy_data = policy_data
170
        self._file_path = file_path
171
        self._content_path = content_path
172
        self._tailoring_path = tailoring_path
173
        self._target_directory = target_directory
174
175
    @property
176
    def name(self):
177
        return "Install the content"
178
179
    def run(self):
180
        """Run the task."""
181
        target_content_dir = utils.join_paths(
182
            self._sysroot,
183
            self._target_directory
184
        )
185
186
        utils.ensure_dir_exists(target_content_dir)
187
188
        if self._policy_data.content_type == "scap-security-guide":
189
            pass  # nothing needed
190
        elif self._policy_data.content_type == "datastream":
191
            shutil.copy2(self._content_path, target_content_dir)
192
        elif self._policy_data.content_type == "rpm":
193
            # copy the RPM to the target system
194
            shutil.copy2(self._file_path, target_content_dir)
195
196
            # get the path of the RPM
197
            content_name = common.get_content_name(self._policy_data)
198
            package_path = utils.join_paths(self._target_directory, content_name)
199
200
            # and install it with yum
201
            ret = util.execInSysroot(
202
                "yum", ["-y", "--nogpg", "install", package_path]
203
            )
204
205
            if ret != 0:
206
                raise common.ExtractionError(
207
                    "Failed to install content RPM to the target system"
208
                )
209
        else:
210
            pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*")
211
            utils.universal_copy(pattern, target_content_dir)
212
213
        if os.path.exists(self._tailoring_path):
214
            shutil.copy2(self._tailoring_path, target_content_dir)
215
216
217
class RemediateSystemTask(Task):
218
    """The installation task for running the remediation."""
219
220
    def __init__(self, sysroot, policy_data, target_content_path,
221
                 target_tailoring_path):
222
        """Create a task."""
223
        super().__init__()
224
        self._sysroot = sysroot
225
        self._policy_data = policy_data
226
        self._target_content_path = target_content_path
227
        self._target_tailoring_path = target_tailoring_path
228
229
    @property
230
    def name(self):
231
        return "Remediate the system"
232
233
    def run(self):
234
        """Run the task."""
235
        common.run_oscap_remediate(
236
            self._policy_data.profile_id,
237
            self._target_content_path,
238
            self._policy_data.datastream_id,
239
            self._policy_data.xccdf_id,
240
            self._target_tailoring_path,
241
            chroot=self._sysroot
242
        )
243