Completed
Push — master ( 8229ad...0db8a8 )
by Jan
17s queued 13s
created

InstallContentTask.run()   B

Complexity

Conditions 6

Size

Total Lines 36
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 22
nop 1
dl 0
loc 36
rs 8.4186
c 0
b 0
f 0
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import os
20
import shutil
21
22
from pyanaconda.core import util
23
from pyanaconda.modules.common.task import Task
24
from pyanaconda.modules.common.errors.installation import NonCriticalInstallationError
25
26
from org_fedora_oscap import common, data_fetch, rule_handling, utils
27
from org_fedora_oscap.common import _, get_packages_data, set_packages_data
28
from org_fedora_oscap.content_handling import ContentCheckError
29
from org_fedora_oscap import content_discovery
30
31
log = logging.getLogger("anaconda")
32
33
34
REQUIRED_PACKAGES = ("openscap", "openscap-scanner",)
35
36
37
def _handle_error(exception):
38
    log.error("OSCAP Addon: Failed to fetch and initialize SCAP content!")
39
40
    if isinstance(exception, ContentCheckError):
41
        msg = _("The integrity check of the security content failed.")
42
        terminate(msg)
43
    elif (
44
            isinstance(exception, common.OSCAPaddonError)
45
            or isinstance(exception, data_fetch.DataFetchError)):
46
        msg = _("There was an error fetching and loading the security content:\n" +
47
                f"{str(exception)}")
48
        terminate(msg)
49
50
    else:
51
        msg = _("There was an unexpected problem with the supplied content.")
52
        terminate(msg)
53
54
55
def terminate(message):
56
    message += "\n" + _("The installation should be aborted.")
57
    raise NonCriticalInstallationError(message)
58
59
60
class PrepareValidContent(Task):
61
    """The installation task for fetching the content."""
62
63
    def __init__(self, policy_data, file_path, content_path):
64
        """Create a task."""
65
        super().__init__()
66
        self._policy_data = policy_data
67
        self._file_path = file_path
68
        self._content_path = content_path
69
        self.content_bringer = content_discovery.ContentBringer(policy_data)
70
71
    @property
72
    def name(self):
73
        return "Fetch the content, and optionally perform check or archive extraction"
74
75
    def run(self):
76
        """Run the task."""
77
        # Is the content available?
78
        fetching_thread_name = None
79
        if not os.path.exists(self._content_path):
80
            # content not available/fetched yet
81
            fetching_thread_name = self.content_bringer.fetch_content(
82
                _handle_error, self._policy_data.certificates)
83
84
        content_dest = None
85
        if self._policy_data.content_type != "scap-security-guide":
86
            content_dest = self._file_path
87
88
        content = self.content_bringer.finish_content_fetch(
89
            fetching_thread_name, self._policy_data.fingerprint,
90
            lambda msg: log.info("OSCAP Addon: " + msg), content_dest, _handle_error)
91
92
        if not content:
93
            # this shouldn't happen because error handling is supposed to
94
            # terminate the addon before finish_content_fetch returns
95
            _handle_error(Exception())
96
97
        remote_content_was_present = (
98
            not fetching_thread_name
99
            and self._policy_data.content_type != "scap-security-guide")
100
        if remote_content_was_present:
101
            content.add_file(self._content_path)
102
103
        try:
104
            # just check that preferred content exists
105
            _ = self.content_bringer.get_preferred_content(content)
106
        except Exception as exc:
107
            terminate(str(exc))
108
109
110
class EvaluateRulesTask(Task):
111
    """The installation task for the evaluation of the rules."""
112
113
    def __init__(self, policy_data, content_path, tailoring_path):
114
        """Create a task."""
115
        super().__init__()
116
        self._policy_data = policy_data
117
        self._content_path = content_path
118
        self._tailoring_path = tailoring_path
119
120
    @property
121
    def name(self):
122
        return "Evaluate the rules"
123
124
    def run(self):
125
        """Run the task."""
126
        rule_data = self._initialize_rules()
127
        self._evaluate_rules(rule_data)
128
129
    def _initialize_rules(self):
130
        try:
131
            rule_data = rule_handling.get_rule_data_from_content(
132
                self._policy_data.profile_id, self._content_path,
133
                self._policy_data.datastream_id, self._policy_data.xccdf_id,
134
                self._tailoring_path)
135
            return rule_data
136
137
        except common.OSCAPaddonError as e:
138
            _handle_error(e)
139
140
141
    def _evaluate_rules(self, rule_data):
142
        # evaluate rules, do automatic fixes and stop if something that cannot
143
        # be fixed automatically is wrong
144
        all_messages = rule_data.eval_rules(None, None)
145
        fatal_messages = [message for message in all_messages
146
                          if message.type == common.MESSAGE_TYPE_FATAL]
147
        if any(fatal_messages):
148
            msg_lines = [_("Wrong configuration detected!")]
149
            msg_lines.extend([m.text for m in fatal_messages])
150
            terminate("\n".join(msg_lines))
151
            return
152
153
        # add packages needed on the target system to the list of packages
154
        # that are requested to be installed
155
        packages_data = get_packages_data()
156
        pkgs_to_install = list(REQUIRED_PACKAGES)
157
158
        if self._policy_data.content_type == "scap-security-guide":
159
            pkgs_to_install.append("scap-security-guide")
160
161
        for pkg in pkgs_to_install:
162
            if pkg not in packages_data.packages:
163
                packages_data.packages.append(pkg)
164
165
        set_packages_data(packages_data)
166
167
168
class InstallContentTask(Task):
169
    """The installation task for installation of the content."""
170
171
    def __init__(self, sysroot, policy_data, file_path,
172
                 content_path, tailoring_path, target_directory):
173
        """Create a task."""
174
        super().__init__()
175
        self._sysroot = sysroot
176
        self._policy_data = policy_data
177
        self._file_path = file_path
178
        self._content_path = content_path
179
        self._tailoring_path = tailoring_path
180
        self._target_directory = target_directory
181
182
    @property
183
    def name(self):
184
        return "Install the content"
185
186
    def run(self):
187
        """Run the task."""
188
        target_content_dir = utils.join_paths(
189
            self._sysroot,
190
            self._target_directory
191
        )
192
193
        utils.ensure_dir_exists(target_content_dir)
194
195
        if self._policy_data.content_type == "scap-security-guide":
196
            pass  # nothing needed
197
        elif self._policy_data.content_type == "datastream":
198
            shutil.copy2(self._content_path, target_content_dir)
199
        elif self._policy_data.content_type == "rpm":
200
            # copy the RPM to the target system
201
            shutil.copy2(self._file_path, target_content_dir)
202
203
            # get the path of the RPM
204
            content_name = common.get_content_name(self._policy_data)
205
            package_path = utils.join_paths(self._target_directory, content_name)
206
207
            # and install it with yum
208
            ret = util.execInSysroot(
209
                "yum", ["-y", "--nogpg", "install", package_path]
210
            )
211
212
            if ret != 0:
213
                raise common.ExtractionError(
214
                    "Failed to install content RPM to the target system"
215
                )
216
        else:
217
            pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*")
218
            utils.universal_copy(pattern, target_content_dir)
219
220
        if os.path.exists(self._tailoring_path):
221
            shutil.copy2(self._tailoring_path, target_content_dir)
222
223
224
class RemediateSystemTask(Task):
225
    """The installation task for running the remediation."""
226
227
    def __init__(self, sysroot, policy_data, target_content_path,
228
                 target_tailoring_path):
229
        """Create a task."""
230
        super().__init__()
231
        self._sysroot = sysroot
232
        self._policy_data = policy_data
233
        self._target_content_path = target_content_path
234
        self._target_tailoring_path = target_tailoring_path
235
236
    @property
237
    def name(self):
238
        return "Remediate the system"
239
240
    def run(self):
241
        """Run the task."""
242
        common.run_oscap_remediate(
243
            self._policy_data.profile_id,
244
            self._target_content_path,
245
            self._policy_data.datastream_id,
246
            self._policy_data.xccdf_id,
247
            self._target_tailoring_path,
248
            chroot=self._sysroot
249
        )
250