Passed
Pull Request — master (#133)
by Matěj
01:22
created

org_fedora_oscap.service.installation.FetchContentTask.run()   B

Complexity

Conditions 5

Size

Total Lines 34
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 22
nop 1
dl 0
loc 34
rs 8.8853
c 0
b 0
f 0
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import os
20
import shutil
21
22
from pyanaconda.core import util
23
from pyanaconda.modules.common.task import Task
24
from pyanaconda.modules.common.errors.installation import NonCriticalInstallationError
25
26
from org_fedora_oscap import common, data_fetch, rule_handling, utils
27
from org_fedora_oscap.common import _, get_packages_data, set_packages_data
28
29
log = logging.getLogger(__name__)
30
31
32
class FetchContentTask(Task):
33
    """The installation task for fetching the content."""
34
35
    def __init__(self, policy_data, file_path, content_path):
36
        """Create a task."""
37
        super().__init__()
38
        self._policy_data = policy_data
39
        self._file_path = file_path
40
        self._content_path = content_path
41
42
    @property
43
    def name(self):
44
        return "Fetch the content"
45
46
    def run(self):
47
        """Run the task."""
48
        # Is the content available?
49
        if os.path.exists(self._content_path):
50
            log.debug("Content is already available. Skip.")
51
            return
52
53
        if os.path.exists(self._file_path):
54
            log.debug("Content is already available. Skip.")
55
            return
56
57
        try:
58
            data_fetch.fetch_data(
59
                self._policy_data.content_url,
60
                self._file_path,
61
                self._policy_data.certificates
62
            )
63
64
            # RPM is an archive at this phase
65
            if self._policy_data.content_type in ("archive", "rpm"):
66
                # extract the content
67
                common.extract_data(
68
                    self._file_path,
69
                    common.INSTALLATION_CONTENT_DIR,
70
                    [self._policy_data.content_path]
71
                )
72
73
        except (common.OSCAPaddonError, data_fetch.DataFetchError) as e:
74
            log.error("Failed to fetch SCAP content!")
75
76
            raise NonCriticalInstallationError(_(
77
                "There was an error fetching the security content:\n%s\n"
78
                "The installation should be aborted."
79
            ) % e)
80
81
82
class CheckFingerprintTask(Task):
83
    """The installation task for checking the fingerprint."""
84
85
    def __init__(self, policy_data, file_path):
86
        """Create a task."""
87
        super().__init__()
88
        self._policy_data = policy_data
89
        self._file_path = file_path
90
91
    @property
92
    def name(self):
93
        return "Check the fingerprint"
94
95
    def run(self):
96
        """Run the task."""
97
        if not self._policy_data.fingerprint:
98
            log.debug("No fingerprint is provided. Skip.")
99
            return
100
101
        hash_obj = utils.get_hashing_algorithm(self._policy_data.fingerprint)
102
        digest = utils.get_file_fingerprint(self._file_path, hash_obj)
103
104
        if digest != self._policy_data.fingerprint:
105
            log.error("Failed to fetch and initialize SCAP content!")
106
107
            raise NonCriticalInstallationError(_(
108
                "The integrity check of the security content failed.\n" +
109
                "The installation should be aborted."
110
            ))
111
112
113
class EvaluateRulesTask(Task):
114
    """The installation task for the evaluation of the rules."""
115
116
    def __init__(self, policy_data, content_path, tailoring_path):
117
        """Create a task."""
118
        super().__init__()
119
        self._policy_data = policy_data
120
        self._content_path = content_path
121
        self._tailoring_path = tailoring_path
122
123
    @property
124
    def name(self):
125
        return "Evaluate the rules"
126
127
    def run(self):
128
        """Run the task."""
129
        rule_data = self._initialize_rules()
130
        self._evaluate_rules(rule_data)
131
132
    def _initialize_rules(self):
133
        try:
134
            rules = common.get_fix_rules_pre(
135
                self._policy_data.profile_id,
136
                self._content_path,
137
                self._policy_data.datastream_id,
138
                self._policy_data.xccdf_id,
139
                self._tailoring_path
140
            )
141
142
            # parse and store rules
143
            rule_data = rule_handling.RuleData()
144
145
            for rule in rules.splitlines():
146
                rule_data.new_rule(rule)
147
148
            return rule_data
149
150
        except common.OSCAPaddonError as e:
151
            log.error("Failed to load SCAP content!")
152
153
            raise NonCriticalInstallationError(_(
154
                "There was an error loading the security content:\n%s\n"
155
                "The installation should be aborted."
156
            ) % e)
157
158
    def _evaluate_rules(self, rule_data):
159
        # evaluate rules, do automatic fixes and stop if something that cannot
160
        # be fixed automatically is wrong
161
        all_messages = rule_data.eval_rules(None, None)
162
        fatal_messages = [m for m in all_messages if m.type == common.MESSAGE_TYPE_FATAL]
163
164
        if any(fatal_messages):
165
            raise NonCriticalInstallationError(_(
166
                "There was a wrong configuration detected:\n%s\n"
167
                "The installation should be aborted."
168
            ) % "\n".join(message.text for message in fatal_messages))
169
170
        # add packages needed on the target system to the list of packages
171
        # that are requested to be installed
172
        packages_data = get_packages_data()
173
        pkgs_to_install = list(REQUIRED_PACKAGES)
174
175
        if self._policy_data.content_type == "scap-security-guide":
176
            pkgs_to_install.append("scap-security-guide")
177
178
        for pkg in pkgs_to_install:
179
            if pkg not in packages_data.packages:
180
                packages_data.packages.append(pkg)
181
182
        set_packages_data(packages_data)
183
184
185
class InstallContentTask(Task):
186
    """The installation task for installation of the content."""
187
188
    def __init__(self, sysroot, policy_data, file_path,
189
                 content_path, tailoring_path, target_directory):
190
        """Create a task."""
191
        super().__init__()
192
        self._sysroot = sysroot
193
        self._policy_data = policy_data
194
        self._file_path = file_path
195
        self._content_path = content_path
196
        self._tailoring_path = tailoring_path
197
        self._target_directory = target_directory
198
199
    @property
200
    def name(self):
201
        return "Install the content"
202
203
    def run(self):
204
        """Run the task."""
205
        target_content_dir = utils.join_paths(
206
            self._sysroot,
207
            self._target_directory
208
        )
209
210
        utils.ensure_dir_exists(target_content_dir)
211
212
        if self._policy_data.content_type == "scap-security-guide":
213
            pass  # nothing needed
214
        elif self._policy_data.content_type == "datastream":
215
            shutil.copy2(self._content_path, target_content_dir)
216
        elif self._policy_data.content_type == "rpm":
217
            # copy the RPM to the target system
218
            shutil.copy2(self._file_path, target_content_dir)
219
220
            # get the path of the RPM
221
            content_name = common.get_content_name(self._policy_data)
222
            package_path = utils.join_paths(self._target_directory, content_name)
223
224
            # and install it with yum
225
            ret = util.execInSysroot(
226
                "yum", ["-y", "--nogpg", "install", package_path]
227
            )
228
229
            if ret != 0:
230
                raise common.ExtractionError(
231
                    "Failed to install content RPM to the target system"
232
                )
233
        else:
234
            pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*")
235
            utils.universal_copy(pattern, target_content_dir)
236
237
        if os.path.exists(self._tailoring_path):
238
            shutil.copy2(self._tailoring_path, target_content_dir)
239
240
241
class SetupPackages(Task)
242
243
244
class RemediateSystemTask(Task):
245
    """The installation task for running the remediation."""
246
247
    def __init__(self, sysroot, policy_data, target_content_path,
248
                 target_tailoring_path):
249
        """Create a task."""
250
        super().__init__()
251
        self._sysroot = sysroot
252
        self._policy_data = policy_data
253
        self._target_content_path = target_content_path
254
        self._target_tailoring_path = target_tailoring_path
255
256
    @property
257
    def name(self):
258
        return "Remediate the system"
259
260
    def run(self):
261
        """Run the task."""
262
        common.run_oscap_remediate(
263
            self._policy_data.profile_id,
264
            self._target_content_path,
265
            self._policy_data.datastream_id,
266
            self._policy_data.xccdf_id,
267
            self._target_tailoring_path,
268
            chroot=self._sysroot
269
        )
270