Passed
Pull Request — rhel8-branch (#134)
by Matěj
01:08
created

org_fedora_oscap.service.installation   A

Complexity

Total Complexity 34

Size/Duplication

Total Lines 269
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 34
eloc 159
dl 0
loc 269
rs 9.68
c 0
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A CheckFingerprintTask.name() 0 3 1
A EvaluateRulesTask.run() 0 4 1
A FetchContentTask.__init__() 0 6 1
A EvaluateRulesTask.__init__() 0 6 1
A EvaluateRulesTask.name() 0 3 1
B InstallContentTask.run() 0 36 6
A CheckFingerprintTask.run() 0 15 3
A EvaluateRulesTask._evaluate_rules() 0 25 5
A InstallContentTask.name() 0 3 1
A FetchContentTask.name() 0 3 1
B FetchContentTask.run() 0 34 5
A EvaluateRulesTask._initialize_rules() 0 25 3
A RemediateSystemTask.run() 0 9 1
A RemediateSystemTask.name() 0 3 1
A CheckFingerprintTask.__init__() 0 5 1
A RemediateSystemTask.__init__() 0 8 1
A InstallContentTask.__init__() 0 10 1
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import os
20
import shutil
21
22
from pyanaconda.core import util
23
from pyanaconda.modules.common.task import Task
24
from pyanaconda.modules.common.errors.installation import NonCriticalInstallationError
25
26
from org_fedora_oscap import common, data_fetch, rule_handling, utils
27
from org_fedora_oscap.common import _, get_packages_data, set_packages_data
28
29
log = logging.getLogger(__name__)
30
31
32
REQUIRED_PACKAGES = ("openscap", "openscap-scanner",)
33
34
35
class FetchContentTask(Task):
36
    """The installation task for fetching the content."""
37
38
    def __init__(self, policy_data, file_path, content_path):
39
        """Create a task."""
40
        super().__init__()
41
        self._policy_data = policy_data
42
        self._file_path = file_path
43
        self._content_path = content_path
44
45
    @property
46
    def name(self):
47
        return "Fetch the content"
48
49
    def run(self):
50
        """Run the task."""
51
        # Is the content available?
52
        if os.path.exists(self._content_path):
53
            log.debug("Content is already available. Skip.")
54
            return
55
56
        if os.path.exists(self._file_path):
57
            log.debug("Content is already available. Skip.")
58
            return
59
60
        try:
61
            data_fetch.fetch_data(
62
                self._policy_data.content_url,
63
                self._file_path,
64
                self._policy_data.certificates
65
            )
66
67
            # RPM is an archive at this phase
68
            if self._policy_data.content_type in ("archive", "rpm"):
69
                # extract the content
70
                common.extract_data(
71
                    self._file_path,
72
                    common.INSTALLATION_CONTENT_DIR,
73
                    [self._policy_data.content_path]
74
                )
75
76
        except (common.OSCAPaddonError, data_fetch.DataFetchError) as e:
77
            log.error("Failed to fetch SCAP content!")
78
79
            raise NonCriticalInstallationError(_(
80
                "There was an error fetching the security content:\n%s\n"
81
                "The installation should be aborted."
82
            ) % e)
83
84
85
class CheckFingerprintTask(Task):
86
    """The installation task for checking the fingerprint."""
87
88
    def __init__(self, policy_data, file_path):
89
        """Create a task."""
90
        super().__init__()
91
        self._policy_data = policy_data
92
        self._file_path = file_path
93
94
    @property
95
    def name(self):
96
        return "Check the fingerprint"
97
98
    def run(self):
99
        """Run the task."""
100
        if not self._policy_data.fingerprint:
101
            log.debug("No fingerprint is provided. Skip.")
102
            return
103
104
        hash_obj = utils.get_hashing_algorithm(self._policy_data.fingerprint)
105
        digest = utils.get_file_fingerprint(self._file_path, hash_obj)
106
107
        if digest != self._policy_data.fingerprint:
108
            log.error("Failed to fetch and initialize SCAP content!")
109
110
            raise NonCriticalInstallationError(_(
111
                "The integrity check of the security content failed.\n" +
112
                "The installation should be aborted."
113
            ))
114
115
116
class EvaluateRulesTask(Task):
117
    """The installation task for the evaluation of the rules."""
118
119
    def __init__(self, policy_data, content_path, tailoring_path):
120
        """Create a task."""
121
        super().__init__()
122
        self._policy_data = policy_data
123
        self._content_path = content_path
124
        self._tailoring_path = tailoring_path
125
126
    @property
127
    def name(self):
128
        return "Evaluate the rules"
129
130
    def run(self):
131
        """Run the task."""
132
        rule_data = self._initialize_rules()
133
        self._evaluate_rules(rule_data)
134
135
    def _initialize_rules(self):
136
        try:
137
            rules = common.get_fix_rules_pre(
138
                self._policy_data.profile_id,
139
                self._content_path,
140
                self._policy_data.datastream_id,
141
                self._policy_data.xccdf_id,
142
                self._tailoring_path
143
            )
144
145
            # parse and store rules
146
            rule_data = rule_handling.RuleData()
147
148
            for rule in rules.splitlines():
149
                rule_data.new_rule(rule)
150
151
            return rule_data
152
153
        except common.OSCAPaddonError as e:
154
            log.error("Failed to load SCAP content!")
155
156
            raise NonCriticalInstallationError(_(
157
                "There was an error loading the security content:\n%s\n"
158
                "The installation should be aborted."
159
            ) % e)
160
161
    def _evaluate_rules(self, rule_data):
162
        # evaluate rules, do automatic fixes and stop if something that cannot
163
        # be fixed automatically is wrong
164
        all_messages = rule_data.eval_rules(None, None)
165
        fatal_messages = [m for m in all_messages if m.type == common.MESSAGE_TYPE_FATAL]
166
167
        if any(fatal_messages):
168
            raise NonCriticalInstallationError(_(
169
                "There was a wrong configuration detected:\n%s\n"
170
                "The installation should be aborted."
171
            ) % "\n".join(message.text for message in fatal_messages))
172
173
        # add packages needed on the target system to the list of packages
174
        # that are requested to be installed
175
        packages_data = get_packages_data()
176
        pkgs_to_install = list(REQUIRED_PACKAGES)
177
178
        if self._policy_data.content_type == "scap-security-guide":
179
            pkgs_to_install.append("scap-security-guide")
180
181
        for pkg in pkgs_to_install:
182
            if pkg not in packages_data.packages:
183
                packages_data.packages.append(pkg)
184
185
        set_packages_data(packages_data)
186
187
188
class InstallContentTask(Task):
189
    """The installation task for installation of the content."""
190
191
    def __init__(self, sysroot, policy_data, file_path,
192
                 content_path, tailoring_path, target_directory):
193
        """Create a task."""
194
        super().__init__()
195
        self._sysroot = sysroot
196
        self._policy_data = policy_data
197
        self._file_path = file_path
198
        self._content_path = content_path
199
        self._tailoring_path = tailoring_path
200
        self._target_directory = target_directory
201
202
    @property
203
    def name(self):
204
        return "Install the content"
205
206
    def run(self):
207
        """Run the task."""
208
        target_content_dir = utils.join_paths(
209
            self._sysroot,
210
            self._target_directory
211
        )
212
213
        utils.ensure_dir_exists(target_content_dir)
214
215
        if self._policy_data.content_type == "scap-security-guide":
216
            pass  # nothing needed
217
        elif self._policy_data.content_type == "datastream":
218
            shutil.copy2(self._content_path, target_content_dir)
219
        elif self._policy_data.content_type == "rpm":
220
            # copy the RPM to the target system
221
            shutil.copy2(self._file_path, target_content_dir)
222
223
            # get the path of the RPM
224
            content_name = common.get_content_name(self._policy_data)
225
            package_path = utils.join_paths(self._target_directory, content_name)
226
227
            # and install it with yum
228
            ret = util.execInSysroot(
229
                "yum", ["-y", "--nogpg", "install", package_path]
230
            )
231
232
            if ret != 0:
233
                raise common.ExtractionError(
234
                    "Failed to install content RPM to the target system"
235
                )
236
        else:
237
            pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*")
238
            utils.universal_copy(pattern, target_content_dir)
239
240
        if os.path.exists(self._tailoring_path):
241
            shutil.copy2(self._tailoring_path, target_content_dir)
242
243
244
class RemediateSystemTask(Task):
245
    """The installation task for running the remediation."""
246
247
    def __init__(self, sysroot, policy_data, target_content_path,
248
                 target_tailoring_path):
249
        """Create a task."""
250
        super().__init__()
251
        self._sysroot = sysroot
252
        self._policy_data = policy_data
253
        self._target_content_path = target_content_path
254
        self._target_tailoring_path = target_tailoring_path
255
256
    @property
257
    def name(self):
258
        return "Remediate the system"
259
260
    def run(self):
261
        """Run the task."""
262
        common.run_oscap_remediate(
263
            self._policy_data.profile_id,
264
            self._target_content_path,
265
            self._policy_data.datastream_id,
266
            self._policy_data.xccdf_id,
267
            self._target_tailoring_path,
268
            chroot=self._sysroot
269
        )
270