Completed
Push — master ( f4924a...c72d79 )
by Matěj
16s queued 13s
created

org_fedora_oscap.service.installation   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 252
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 150
dl 0
loc 252
rs 9.92
c 0
b 0
f 0
wmc 31

17 Methods

Rating   Name   Duplication   Size   Complexity  
A FetchContentTask.name() 0 3 1
A CheckFingerprintTask.name() 0 3 1
A EvaluateRulesTask.run() 0 4 1
A FetchContentTask.__init__() 0 6 1
A EvaluateRulesTask.__init__() 0 6 1
A EvaluateRulesTask.name() 0 3 1
B InstallContentTask.run() 0 36 6
A CheckFingerprintTask.run() 0 15 3
A EvaluateRulesTask._evaluate_rules() 0 11 2
A InstallContentTask.name() 0 3 1
B FetchContentTask.run() 0 34 5
A EvaluateRulesTask._initialize_rules() 0 25 3
A RemediateSystemTask.run() 0 9 1
A RemediateSystemTask.name() 0 3 1
A CheckFingerprintTask.__init__() 0 5 1
A RemediateSystemTask.__init__() 0 8 1
A InstallContentTask.__init__() 0 10 1
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import os
20
import shutil
21
22
from pyanaconda.core import util
23
from pyanaconda.modules.common.task import Task
24
from pyanaconda.modules.common.errors.installation import NonCriticalInstallationError
25
26
from org_fedora_oscap import common, data_fetch, rule_handling, utils
27
from org_fedora_oscap.common import _
28
29
log = logging.getLogger(__name__)
30
31
32
class FetchContentTask(Task):
33
    """The installation task for fetching the content."""
34
35
    def __init__(self, policy_data, file_path, content_path):
36
        """Create a task."""
37
        super().__init__()
38
        self._policy_data = policy_data
39
        self._file_path = file_path
40
        self._content_path = content_path
41
42
    @property
43
    def name(self):
44
        return "Fetch the content"
45
46
    def run(self):
47
        """Run the task."""
48
        # Is the content available?
49
        if os.path.exists(self._content_path):
50
            log.debug("Content is already available. Skip.")
51
            return
52
53
        if os.path.exists(self._file_path):
54
            log.debug("Content is already available. Skip.")
55
            return
56
57
        try:
58
            data_fetch.fetch_data(
59
                self._policy_data.content_url,
60
                self._file_path,
61
                self._policy_data.certificates
62
            )
63
64
            # RPM is an archive at this phase
65
            if self._policy_data.content_type in ("archive", "rpm"):
66
                # extract the content
67
                common.extract_data(
68
                    self._file_path,
69
                    common.INSTALLATION_CONTENT_DIR,
70
                    [self._policy_data.content_path]
71
                )
72
73
        except (common.OSCAPaddonError, data_fetch.DataFetchError) as e:
74
            log.error("Failed to fetch SCAP content!")
75
76
            raise NonCriticalInstallationError(_(
77
                "There was an error fetching the security content:\n%s\n"
78
                "The installation should be aborted."
79
            ) % e)
80
81
82
class CheckFingerprintTask(Task):
83
    """The installation task for checking the fingerprint."""
84
85
    def __init__(self, policy_data, file_path):
86
        """Create a task."""
87
        super().__init__()
88
        self._policy_data = policy_data
89
        self._file_path = file_path
90
91
    @property
92
    def name(self):
93
        return "Check the fingerprint"
94
95
    def run(self):
96
        """Run the task."""
97
        if not self._policy_data.fingerprint:
98
            log.debug("No fingerprint is provided. Skip.")
99
            return
100
101
        hash_obj = utils.get_hashing_algorithm(self._policy_data.fingerprint)
102
        digest = utils.get_file_fingerprint(self._file_path, hash_obj)
103
104
        if digest != self._policy_data.fingerprint:
105
            log.error("Failed to fetch and initialize SCAP content!")
106
107
            raise NonCriticalInstallationError(_(
108
                "The integrity check of the security content failed.\n" +
109
                "The installation should be aborted."
110
            ))
111
112
113
class EvaluateRulesTask(Task):
114
    """The installation task for the evaluation of the rules."""
115
116
    def __init__(self, policy_data, content_path, tailoring_path):
117
        """Create a task."""
118
        super().__init__()
119
        self._policy_data = policy_data
120
        self._content_path = content_path
121
        self._tailoring_path = tailoring_path
122
123
    @property
124
    def name(self):
125
        return "Evaluate the rules"
126
127
    def run(self):
128
        """Run the task."""
129
        rule_data = self._initialize_rules()
130
        self._evaluate_rules(rule_data)
131
132
    def _initialize_rules(self):
133
        try:
134
            rules = common.get_fix_rules_pre(
135
                self._policy_data.profile_id,
136
                self._content_path,
137
                self._policy_data.datastream_id,
138
                self._policy_data.xccdf_id,
139
                self._tailoring_path
140
            )
141
142
            # parse and store rules
143
            rule_data = rule_handling.RuleData()
144
145
            for rule in rules.splitlines():
146
                rule_data.new_rule(rule)
147
148
            return rule_data
149
150
        except common.OSCAPaddonError as e:
151
            log.error("Failed to load SCAP content!")
152
153
            raise NonCriticalInstallationError(_(
154
                "There was an error loading the security content:\n%s\n"
155
                "The installation should be aborted."
156
            ) % e)
157
158
    def _evaluate_rules(self, rule_data):
159
        # evaluate rules, do automatic fixes and stop if something that cannot
160
        # be fixed automatically is wrong
161
        all_messages = rule_data.eval_rules(None, None)
162
        fatal_messages = [m for m in all_messages if m.type == common.MESSAGE_TYPE_FATAL]
163
164
        if any(fatal_messages):
165
            raise NonCriticalInstallationError(_(
166
                "There was a wrong configuration detected:\n%s\n"
167
                "The installation should be aborted."
168
            ) % "\n".join(message.text for message in fatal_messages))
169
170
171
class InstallContentTask(Task):
172
    """The installation task for installation of the content."""
173
174
    def __init__(self, sysroot, policy_data, file_path,
175
                 content_path, tailoring_path, target_directory):
176
        """Create a task."""
177
        super().__init__()
178
        self._sysroot = sysroot
179
        self._policy_data = policy_data
180
        self._file_path = file_path
181
        self._content_path = content_path
182
        self._tailoring_path = tailoring_path
183
        self._target_directory = target_directory
184
185
    @property
186
    def name(self):
187
        return "Install the content"
188
189
    def run(self):
190
        """Run the task."""
191
        target_content_dir = utils.join_paths(
192
            self._sysroot,
193
            self._target_directory
194
        )
195
196
        utils.ensure_dir_exists(target_content_dir)
197
198
        if self._policy_data.content_type == "scap-security-guide":
199
            pass  # nothing needed
200
        elif self._policy_data.content_type == "datastream":
201
            shutil.copy2(self._content_path, target_content_dir)
202
        elif self._policy_data.content_type == "rpm":
203
            # copy the RPM to the target system
204
            shutil.copy2(self._file_path, target_content_dir)
205
206
            # get the path of the RPM
207
            content_name = common.get_content_name(self._policy_data)
208
            package_path = utils.join_paths(self._target_directory, content_name)
209
210
            # and install it with yum
211
            ret = util.execInSysroot(
212
                "yum", ["-y", "--nogpg", "install", package_path]
213
            )
214
215
            if ret != 0:
216
                raise common.ExtractionError(
217
                    "Failed to install content RPM to the target system"
218
                )
219
        else:
220
            pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*")
221
            utils.universal_copy(pattern, target_content_dir)
222
223
        if os.path.exists(self._tailoring_path):
224
            shutil.copy2(self._tailoring_path, target_content_dir)
225
226
227
class RemediateSystemTask(Task):
228
    """The installation task for running the remediation."""
229
230
    def __init__(self, sysroot, policy_data, target_content_path,
231
                 target_tailoring_path):
232
        """Create a task."""
233
        super().__init__()
234
        self._sysroot = sysroot
235
        self._policy_data = policy_data
236
        self._target_content_path = target_content_path
237
        self._target_tailoring_path = target_tailoring_path
238
239
    @property
240
    def name(self):
241
        return "Remediate the system"
242
243
    def run(self):
244
        """Run the task."""
245
        common.run_oscap_remediate(
246
            self._policy_data.profile_id,
247
            self._target_content_path,
248
            self._policy_data.datastream_id,
249
            self._policy_data.xccdf_id,
250
            self._target_tailoring_path,
251
            chroot=self._sysroot
252
        )
253