Completed
Push — master ( c97ece...6b05e2 )
by Jan
27s queued 17s
created

RemediateSystemTask.run()   A

Complexity

Conditions 3

Size

Total Lines 26
Code Lines 23

Duplication

Lines 26
Ratio 100 %

Importance

Changes 0
Metric Value
cc 3
eloc 23
nop 1
dl 26
loc 26
rs 9.328
c 0
b 0
f 0
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import os
20
import shutil
21
22
from pyanaconda.core import util
23
from pyanaconda.modules.common.task import Task
24
from pyanaconda.modules.common.errors.installation import NonCriticalInstallationError
25
26
from org_fedora_oscap import common, data_fetch, rule_handling, utils
27
from org_fedora_oscap.common import _, get_packages_data, set_packages_data
28
from org_fedora_oscap.content_handling import ContentCheckError
29
from org_fedora_oscap import content_discovery
30
31
log = logging.getLogger("anaconda")
32
33
34
# scanner is useful for the post remediation, utils for the firstboot remediation
35
REQUIRED_PACKAGES = ("openscap", "openscap-scanner", "openscap-utils",)
36
37
38
def _handle_error(exception):
39
    log.error("OSCAP Addon: Failed to fetch and initialize SCAP content!")
40
41
    if isinstance(exception, ContentCheckError):
42
        msg = _("The integrity check of the security content failed.")
43
        terminate(msg)
44
    elif (
45
            isinstance(exception, common.OSCAPaddonError)
46
            or isinstance(exception, data_fetch.DataFetchError)):
47
        msg = _("There was an error fetching and loading the security content:\n" +
48
                f"{str(exception)}")
49
        terminate(msg)
50
51
    else:
52
        msg = _("There was an unexpected problem with the supplied content.")
53
        terminate(msg)
54
55
56
def terminate(message):
57
    message += "\n" + _("The installation should be aborted.")
58
    raise NonCriticalInstallationError(message)
59
60
61
class PrepareValidContent(Task):
62
    """The installation task for fetching the content."""
63
64
    def __init__(self, policy_data, file_path, content_path):
65
        """Create a task."""
66
        super().__init__()
67
        self._policy_data = policy_data
68
        self._file_path = file_path
69
        self._content_path = content_path
70
        self.content_bringer = content_discovery.ContentBringer(policy_data)
71
72
    @property
73
    def name(self):
74
        return "Fetch the content, and optionally perform check or archive extraction"
75
76
    def run(self):
77
        """Run the task."""
78
        # Is the content available?
79
        fetching_thread_name = None
80
        if not os.path.exists(self._content_path):
81
            # content not available/fetched yet
82
            fetching_thread_name = self.content_bringer.fetch_content(
83
                _handle_error, self._policy_data.certificates)
84
85
        content_dest = None
86
        if self._policy_data.content_type != "scap-security-guide":
87
            content_dest = self._file_path
88
89
        content = self.content_bringer.finish_content_fetch(
90
            fetching_thread_name, self._policy_data.fingerprint,
91
            lambda msg: log.info("OSCAP Addon: " + msg), content_dest, _handle_error)
92
93
        if not content:
94
            # this shouldn't happen because error handling is supposed to
95
            # terminate the addon before finish_content_fetch returns
96
            _handle_error(Exception())
97
98
        remote_content_was_present = (
99
            not fetching_thread_name
100
            and self._policy_data.content_type != "scap-security-guide")
101
        if remote_content_was_present:
102
            content.add_file(self._content_path)
103
104
        try:
105
            # just check that preferred content exists
106
            _ = self.content_bringer.get_preferred_content(content)
107
        except Exception as exc:
108
            terminate(str(exc))
109
110
111
class EvaluateRulesTask(Task):
112
    """The installation task for the evaluation of the rules."""
113
114
    def __init__(self, policy_data, content_path, tailoring_path):
115
        """Create a task."""
116
        super().__init__()
117
        self._policy_data = policy_data
118
        self._content_path = content_path
119
        self._tailoring_path = tailoring_path
120
121
    @property
122
    def name(self):
123
        return "Evaluate the rules"
124
125
    def run(self):
126
        """Run the task."""
127
        rule_data = self._initialize_rules()
128
        self._evaluate_rules(rule_data)
129
130
    def _initialize_rules(self):
131
        try:
132
            rule_data = rule_handling.get_rule_data_from_content(
133
                self._policy_data.profile_id, self._content_path,
134
                self._policy_data.datastream_id, self._policy_data.xccdf_id,
135
                self._tailoring_path)
136
            return rule_data
137
138
        except common.OSCAPaddonError as e:
139
            _handle_error(e)
140
141
142
    def _evaluate_rules(self, rule_data):
143
        # evaluate rules, do automatic fixes and stop if something that cannot
144
        # be fixed automatically is wrong
145
        all_messages = rule_data.eval_rules(None, None)
146
        fatal_messages = [message for message in all_messages
147
                          if message.type == common.MESSAGE_TYPE_FATAL]
148
        if any(fatal_messages):
149
            msg_lines = [_("Wrong configuration detected!")]
150
            msg_lines.extend([m.text for m in fatal_messages])
151
            terminate("\n".join(msg_lines))
152
            return
153
154
        # add packages needed on the target system to the list of packages
155
        # that are requested to be installed
156
        packages_data = get_packages_data()
157
        pkgs_to_install = list(REQUIRED_PACKAGES)
158
159
        if self._policy_data.content_type == "scap-security-guide":
160
            pkgs_to_install.append("scap-security-guide")
161
162
        for pkg in pkgs_to_install:
163
            if pkg not in packages_data.packages:
164
                packages_data.packages.append(pkg)
165
166
        set_packages_data(packages_data)
167
168
169
class InstallContentTask(Task):
170
    """The installation task for installation of the content."""
171
172
    def __init__(self, sysroot, policy_data, file_path,
173
                 content_path, tailoring_path, target_directory):
174
        """Create a task."""
175
        super().__init__()
176
        self._sysroot = sysroot
177
        self._policy_data = policy_data
178
        self._file_path = file_path
179
        self._content_path = content_path
180
        self._tailoring_path = tailoring_path
181
        self._target_directory = target_directory
182
183
    @property
184
    def name(self):
185
        return "Install the content"
186
187
    def run(self):
188
        """Run the task."""
189
        target_content_dir = utils.join_paths(
190
            self._sysroot,
191
            self._target_directory
192
        )
193
194
        utils.ensure_dir_exists(target_content_dir)
195
196
        if self._policy_data.content_type == "scap-security-guide":
197
            pass  # nothing needed
198
        elif self._policy_data.content_type == "datastream":
199
            shutil.copy2(self._content_path, target_content_dir)
200
        elif self._policy_data.content_type == "rpm":
201
            # copy the RPM to the target system
202
            shutil.copy2(self._file_path, target_content_dir)
203
204
            # get the path of the RPM
205
            content_name = common.get_content_name(self._policy_data)
206
            package_path = utils.join_paths(self._target_directory, content_name)
207
208
            # and install it with yum
209
            ret = util.execInSysroot(
210
                "yum", ["-y", "--nogpg", "install", package_path]
211
            )
212
213
            if ret != 0:
214
                msg = _(f"Failed to install content RPM to the target system.")
215
                terminate(msg)
216
                return
217
        else:
218
            pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*")
219
            utils.universal_copy(pattern, target_content_dir)
220
221
        if os.path.exists(self._tailoring_path):
222
            shutil.copy2(self._tailoring_path, target_content_dir)
223
224
225 View Code Duplication
class RemediateSystemTask(Task):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
226
    """The installation task for running the remediation."""
227
228
    def __init__(self, sysroot, policy_data, target_content_path,
229
                 target_tailoring_path):
230
        """Create a task."""
231
        super().__init__()
232
        self._sysroot = sysroot
233
        self._policy_data = policy_data
234
        self._target_content_path = target_content_path
235
        self._target_tailoring_path = target_tailoring_path
236
237
    @property
238
    def name(self):
239
        return "Remediate the system"
240
241
    def run(self):
242
        """Run the task."""
243
        try:
244
            common.assert_scanner_works(
245
                chroot=self._sysroot, executable="oscap")
246
        except Exception as exc:
247
            msg_lines = [_(
248
                "The 'oscap' scanner doesn't work in the installed system: {error}"
249
                .format(error=str(exc)))]
250
            msg_lines.append(_("As a result, the installed system can't be hardened."))
251
            terminate("\n".join(msg_lines))
252
            return
253
254
        try:
255
            common.run_oscap_remediate(
256
                self._policy_data.profile_id,
257
                self._target_content_path,
258
                self._policy_data.datastream_id,
259
                self._policy_data.xccdf_id,
260
                self._target_tailoring_path,
261
                chroot=self._sysroot
262
            )
263
        except Exception as exc:
264
            msg = _(f"Something went wrong during the final hardening: {str(exc)}.")
265
            terminate(msg)
266
            return
267
268
269 View Code Duplication
class ScheduleFirstbootRemediationTask(Task):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
270
    """The installation task for running the remediation."""
271
272
    def __init__(self, sysroot, policy_data, target_content_path,
273
                 target_tailoring_path):
274
        """Create a task."""
275
        super().__init__()
276
        self._sysroot = sysroot
277
        self._policy_data = policy_data
278
        self._target_content_path = target_content_path
279
        self._target_tailoring_path = target_tailoring_path
280
281
    @property
282
    def name(self):
283
        return "Schedule first-boot remediation"
284
285
    def run(self):
286
        """Run the task."""
287
        try:
288
            common.assert_scanner_works(
289
                chroot=self._sysroot, executable="oscap")
290
        except Exception as exc:
291
            msg_lines = [_(
292
                "The 'oscap' scanner doesn't work in the installed system: {error}"
293
                .format(error=str(exc)))]
294
            msg_lines.append(_("As a result, the installed system can't be hardened."))
295
            terminate("\n".join(msg_lines))
296
            return
297
298
        try:
299
            common.schedule_firstboot_remediation(
300
                self._sysroot,
301
                self._policy_data.profile_id,
302
                self._target_content_path,
303
                self._policy_data.datastream_id,
304
                self._policy_data.xccdf_id,
305
                self._target_tailoring_path,
306
            )
307
        except Exception as exc:
308
            msg = _(
309
                "Something went wrong when scheduling the first-boot remediation: {exc}."
310
                .format(exc=str(exc)))
311
            terminate(msg)
312
            return
313