Passed
Pull Request — rhel9-branch (#159)
by Matěj
58s
created

CheckFingerprintTask.run()   A

Complexity

Conditions 3

Size

Total Lines 15
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 11
nop 1
dl 0
loc 15
rs 9.85
c 0
b 0
f 0
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import os
20
import shutil
21
22
from pyanaconda.core import util
23
from pyanaconda.modules.common.task import Task
24
from pyanaconda.modules.common.errors.installation import NonCriticalInstallationError
25
26
from org_fedora_oscap import common, data_fetch, rule_handling, utils
27
from org_fedora_oscap.common import _, get_packages_data, set_packages_data
28
from org_fedora_oscap.content_handling import ContentCheckError
29
from org_fedora_oscap import content_discovery
30
31
log = logging.getLogger(__name__)
32
33
34
REQUIRED_PACKAGES = ("openscap", "openscap-scanner",)
35
36
37
def _handle_error(exception):
38
    log.error("Failed to fetch and initialize SCAP content!")
39
40
    if isinstance(exception, ContentCheckError):
41
        msg = _("The integrity check of the security content failed.")
42
        terminate(msg)
43
    elif (isinstance(exception, common.OSCAPaddonError)
44
        or isinstance(exception, data_fetch.DataFetchError)):
45
        msg = _("There was an error fetching and loading the security content:\n" +
46
                f"{str(exception)}")
47
        terminate(msg)
48
49
    else:
50
        msg = _("There was an unexpected problem with the supplied content.")
51
        terminate(msg)
52
53
54
def terminate(message):
55
    message += "\n" + _("The installation should be aborted.")
56
    raise NonCriticalInstallationError(message)
57
58
59
class PrepareValidContent(Task):
60
    """The installation task for fetching the content."""
61
62
    def __init__(self, policy_data, file_path, content_path):
63
        """Create a task."""
64
        super().__init__()
65
        self._policy_data = policy_data
66
        self._file_path = file_path
67
        self._content_path = content_path
68
        self.content_bringer = content_discovery.ContentBringer(policy_data)
69
70
    @property
71
    def name(self):
72
        return "Fetch the content, and optionally perform check or archive extraction"
73
74
    def run(self):
75
        """Run the task."""
76
        # Is the content available?
77
        fetching_thread_name = None
78
        if not os.path.exists(self._content_path) and not os.path.exists(self._file_path):
79
            # content not available/fetched yet
80
            fetching_thread_name = self.content_bringer.fetch_content(_handle_error, self._policy_data.certificates)
81
82
        content_dest = None
83
        if self._policy_data.content_type != "scap-security-guide":
84
            content_dest = self._file_path
85
86
        content = self.content_bringer.finish_content_fetch(
87
            fetching_thread_name, self._policy_data.fingerprint, lambda msg: log.info(msg), content_dest, _handle_error)
88
89
        if not content:
90
            return
91
92
        try:
93
            # just check that preferred content exists
94
            _ = self.content_bringer.get_preferred_content(content)
95
        except Exception as exc:
96
            terminate(str(exc))
97
98
99
class EvaluateRulesTask(Task):
100
    """The installation task for the evaluation of the rules."""
101
102
    def __init__(self, policy_data, content_path, tailoring_path):
103
        """Create a task."""
104
        super().__init__()
105
        self._policy_data = policy_data
106
        self._content_path = content_path
107
        self._tailoring_path = tailoring_path
108
109
    @property
110
    def name(self):
111
        return "Evaluate the rules"
112
113
    def run(self):
114
        """Run the task."""
115
        rule_data = self._initialize_rules()
116
        self._evaluate_rules(rule_data)
117
118
    def _initialize_rules(self):
119
        try:
120
            rule_data = rule_handling.get_rule_data_from_content(
121
                self._policy_data.profile_id, self._content_path,
122
                self._policy_data.datastream_id, self._policy_data.xccdf_id,
123
                self._tailoring_path)
124
            return rule_data
125
126
        except common.OSCAPaddonError as e:
127
            _handle_error(e)
128
129
130
    def _evaluate_rules(self, rule_data):
131
        # evaluate rules, do automatic fixes and stop if something that cannot
132
        # be fixed automatically is wrong
133
        all_messages = rule_data.eval_rules(None, None)
134
        fatal_messages = [message for message in all_messages
135
                          if message.type == common.MESSAGE_TYPE_FATAL]
136
        if any(fatal_messages):
137
            msg_lines = [_("Wrong configuration detected!")]
138
            msg_lines.extend(fatal_messages)
139
            terminate("\n".join(msg_lines))
140
            return
141
142
        # add packages needed on the target system to the list of packages
143
        # that are requested to be installed
144
        packages_data = get_packages_data()
145
        pkgs_to_install = list(REQUIRED_PACKAGES)
146
147
        if self._policy_data.content_type == "scap-security-guide":
148
            pkgs_to_install.append("scap-security-guide")
149
150
        for pkg in pkgs_to_install:
151
            if pkg not in packages_data.packages:
152
                packages_data.packages.append(pkg)
153
154
        set_packages_data(packages_data)
155
156
157
class InstallContentTask(Task):
158
    """The installation task for installation of the content."""
159
160
    def __init__(self, sysroot, policy_data, file_path,
161
                 content_path, tailoring_path, target_directory):
162
        """Create a task."""
163
        super().__init__()
164
        self._sysroot = sysroot
165
        self._policy_data = policy_data
166
        self._file_path = file_path
167
        self._content_path = content_path
168
        self._tailoring_path = tailoring_path
169
        self._target_directory = target_directory
170
171
    @property
172
    def name(self):
173
        return "Install the content"
174
175
    def run(self):
176
        """Run the task."""
177
        target_content_dir = utils.join_paths(
178
            self._sysroot,
179
            self._target_directory
180
        )
181
182
        utils.ensure_dir_exists(target_content_dir)
183
184
        if self._policy_data.content_type == "scap-security-guide":
185
            pass  # nothing needed
186
        elif self._policy_data.content_type == "datastream":
187
            shutil.copy2(self._content_path, target_content_dir)
188
        elif self._policy_data.content_type == "rpm":
189
            # copy the RPM to the target system
190
            shutil.copy2(self._file_path, target_content_dir)
191
192
            # get the path of the RPM
193
            content_name = common.get_content_name(self._policy_data)
194
            package_path = utils.join_paths(self._target_directory, content_name)
195
196
            # and install it with yum
197
            ret = util.execInSysroot(
198
                "yum", ["-y", "--nogpg", "install", package_path]
199
            )
200
201
            if ret != 0:
202
                raise common.ExtractionError(
203
                    "Failed to install content RPM to the target system"
204
                )
205
        else:
206
            pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*")
207
            utils.universal_copy(pattern, target_content_dir)
208
209
        if os.path.exists(self._tailoring_path):
210
            shutil.copy2(self._tailoring_path, target_content_dir)
211
212
213
class RemediateSystemTask(Task):
214
    """The installation task for running the remediation."""
215
216
    def __init__(self, sysroot, policy_data, target_content_path,
217
                 target_tailoring_path):
218
        """Create a task."""
219
        super().__init__()
220
        self._sysroot = sysroot
221
        self._policy_data = policy_data
222
        self._target_content_path = target_content_path
223
        self._target_tailoring_path = target_tailoring_path
224
225
    @property
226
    def name(self):
227
        return "Remediate the system"
228
229
    def run(self):
230
        """Run the task."""
231
        common.run_oscap_remediate(
232
            self._policy_data.profile_id,
233
            self._target_content_path,
234
            self._policy_data.datastream_id,
235
            self._policy_data.xccdf_id,
236
            self._target_tailoring_path,
237
            chroot=self._sysroot
238
        )
239