Passed
Pull Request — master (#136)
by
unknown
01:36
created

org_fedora_oscap.service.installation   A

Complexity

Total Complexity 41

Size/Duplication

Total Lines 301
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 185
dl 0
loc 301
rs 9.1199
c 0
b 0
f 0
wmc 41

17 Methods

Rating   Name   Duplication   Size   Complexity  
A CheckFingerprintTask.name() 0 3 1
A EvaluateRulesTask.run() 0 12 3
A FetchContentTask.__init__() 0 7 1
A EvaluateRulesTask.__init__() 0 7 1
A EvaluateRulesTask.name() 0 3 1
B InstallContentTask.run() 0 44 8
A CheckFingerprintTask.run() 0 24 5
A EvaluateRulesTask._evaluate_rules() 0 13 2
A InstallContentTask.name() 0 3 1
A FetchContentTask.name() 0 3 1
B FetchContentTask.run() 0 43 7
A EvaluateRulesTask._initialize_rules() 0 26 3
A RemediateSystemTask.run() 0 17 3
A RemediateSystemTask.name() 0 3 1
A CheckFingerprintTask.__init__() 0 6 1
A RemediateSystemTask.__init__() 0 9 1
A InstallContentTask.__init__() 0 11 1

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.service.installation often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import os
20
import shutil
21
22
from pyanaconda.core import util
23
from pyanaconda.modules.common.task import Task
24
25
from org_fedora_oscap import common, data_fetch, rule_handling, utils
26
from org_fedora_oscap.common import _
27
28
log = logging.getLogger(__name__)
29
30
31
class FetchContentTask(Task):
32
    """The installation task for fetching the content."""
33
34
    def __init__(self, policy_enabled, policy_data, file_path, content_path):
35
        """Create a task."""
36
        super().__init__()
37
        self._policy_enabled = policy_enabled
38
        self._policy_data = policy_data
39
        self._file_path = file_path
40
        self._content_path = content_path
41
42
    @property
43
    def name(self):
44
        return "Fetch the content"
45
46
    def run(self):
47
        """Run the task."""
48
        if not self._policy_enabled:
49
            log.debug("The security policy is disabled. Skip.")
50
            return
51
52
        if not self._policy_data.profile_id:
53
            log.debug("No profile is selected. Skip.")
54
            return
55
56
        # Is the content available?
57
        if os.path.exists(self._content_path):
58
            log.debug("Content is already available. Skip.")
59
            return
60
61
        if os.path.exists(self._file_path):
62
            log.debug("Content is already available. Skip.")
63
            return
64
65
        try:
66
            data_fetch.fetch_data(
67
                self._policy_data.content_url,
68
                self._file_path,
69
                self._policy_data.certificates
70
            )
71
72
            # RPM is an archive at this phase
73
            if self._policy_data.content_type in ("archive", "rpm"):
74
                # extract the content
75
                common.extract_data(
76
                    self._file_path,
77
                    common.INSTALLATION_CONTENT_DIR,
78
                    [self._policy_data.content_path]
79
                )
80
81
        except (common.OSCAPaddonError, data_fetch.DataFetchError) as e:
82
            log.error("Failed to fetch SCAP content!")
83
84
            # TODO: Raise non-critical error. Make sure that anaconda handles it!
85
            raise Exception(_(
86
                "There was an error fetching the security content:\n%s\n"
87
                "The installation should be aborted. Do you wish to continue anyway?"
88
            ) % e)
89
90
91
class CheckFingerprintTask(Task):
92
    """The installation task for checking the fingerprint."""
93
94
    def __init__(self, policy_enabled, policy_data, file_path):
95
        """Create a task."""
96
        super().__init__()
97
        self._policy_enabled = policy_enabled
98
        self._policy_data = policy_data
99
        self._file_path = file_path
100
101
    @property
102
    def name(self):
103
        return "Check the fingerprint"
104
105
    def run(self):
106
        """Run the task."""
107
        if not self._policy_enabled:
108
            log.debug("The security policy is disabled. Skip.")
109
            return
110
111
        if not self._policy_data.profile_id:
112
            log.debug("No profile is selected. Skip.")
113
            return
114
115
        if not self._policy_data.fingerprint:
116
            log.debug("No fingerprint is provided. Skip.")
117
            return
118
119
        hash_obj = utils.get_hashing_algorithm(self._policy_data.fingerprint)
120
        digest = utils.get_file_fingerprint(self._file_path, hash_obj)
121
122
        if digest != self._policy_data.fingerprint:
123
            log.error("Failed to fetch and initialize SCAP content!")
124
125
            # TODO: Raise non-critical error. Make sure that anaconda handles it!
126
            raise Exception(_(
127
                "The integrity check of the security content failed.\n" +
128
                "The installation should be aborted. Do you wish to continue anyway?"
129
            ))
130
131
132
class EvaluateRulesTask(Task):
133
    """The installation task for the evaluation of the rules."""
134
135
    def __init__(self, policy_enabled, policy_data, content_path, tailoring_path):
136
        """Create a task."""
137
        super().__init__()
138
        self._policy_enabled = policy_enabled
139
        self._policy_data = policy_data
140
        self._content_path = content_path
141
        self._tailoring_path = tailoring_path
142
143
    @property
144
    def name(self):
145
        return "Evaluate the rules"
146
147
    def run(self):
148
        """Run the task."""
149
        if not self._policy_enabled:
150
            log.debug("The security policy is disabled. Skip.")
151
            return
152
153
        if not self._policy_data.profile_id:
154
            log.debug("No profile is selected. Skip.")
155
            return
156
157
        rule_data = self._initialize_rules()
158
        self._evaluate_rules(rule_data)
159
160
    def _initialize_rules(self):
161
        try:
162
            rules = common.get_fix_rules_pre(
163
                self._policy_data.profile_id,
164
                self._content_path,
165
                self._policy_data.datastream_id,
166
                self._policy_data.xccdf_id,
167
                self._tailoring_path
168
            )
169
170
            # parse and store rules
171
            rule_data = rule_handling.RuleData()
172
173
            for rule in rules.splitlines():
174
                rule_data.new_rule(rule)
175
176
            return rule_data
177
178
        except common.OSCAPaddonError as e:
179
            log.error("Failed to load SCAP content!")
180
181
            # TODO: Raise non-critical error. Make sure that anaconda handles it!
182
            raise Exception(_(
183
                "There was an error loading the security content:\n%s\n"
184
                "The installation should be aborted. Do you wish to continue anyway?"
185
            ) % e)
186
187
    def _evaluate_rules(self, rule_data):
188
        # evaluate rules, do automatic fixes and stop if something that cannot
189
        # be fixed automatically is wrong
190
        all_messages = rule_data.eval_rules(None, None)
191
        fatal_messages = [m for m in all_messages if m.type == common.MESSAGE_TYPE_FATAL]
192
193
        if any(fatal_messages):
194
            msg = "Wrong configuration detected!\n"
195
            msg += "\n".join(message.text for message in fatal_messages)
196
            msg += "\nThe installation should be aborted. Do you wish to continue anyway?"
197
198
            # TODO: Raise non-critical error. Make sure that anaconda handles it!
199
            raise Exception(msg)
200
201
202
class InstallContentTask(Task):
203
    """The installation task for installation of the content."""
204
205
    def __init__(self, sysroot, policy_enabled, policy_data, file_path,
206
                 content_path, tailoring_path, target_directory):
207
        """Create a task."""
208
        super().__init__()
209
        self._sysroot = sysroot
210
        self._policy_enabled = policy_enabled
211
        self._policy_data = policy_data
212
        self._file_path = file_path
213
        self._content_path = content_path
214
        self._tailoring_path = tailoring_path
215
        self._target_directory = target_directory
216
217
    @property
218
    def name(self):
219
        return "Install the content"
220
221
    def run(self):
222
        """Run the task."""
223
        if not self._policy_enabled:
224
            log.debug("The security policy is disabled. Skip.")
225
            return
226
227
        if not self._policy_data.profile_id:
228
            log.debug("No profile is selected. Skip.")
229
            return
230
231
        target_content_dir = utils.join_paths(
232
            self._sysroot,
233
            self._target_directory
234
        )
235
236
        utils.ensure_dir_exists(target_content_dir)
237
238
        if self._policy_data.content_type == "scap-security-guide":
239
            pass  # nothing needed
240
        elif self._policy_data.content_type == "datastream":
241
            shutil.copy2(self._content_path, target_content_dir)
242
        elif self._policy_data.content_type == "rpm":
243
            # copy the RPM to the target system
244
            shutil.copy2(self._file_path, target_content_dir)
245
246
            # get the path of the RPM
247
            content_name = common.get_content_name(self._policy_data)
248
            package_path = utils.join_paths(self._target_directory, content_name)
249
250
            # and install it with yum
251
            ret = util.execInSysroot(
252
                "yum", ["-y", "--nogpg", "install", package_path]
253
            )
254
255
            if ret != 0:
256
                raise common.ExtractionError(
257
                    "Failed to install content RPM to the target system"
258
                )
259
        else:
260
            pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*")
261
            utils.universal_copy(pattern, target_content_dir)
262
263
        if os.path.exists(self._tailoring_path):
264
            shutil.copy2(self._tailoring_path, target_content_dir)
265
266
267
class RemediateSystemTask(Task):
268
    """The installation task for running the remediation."""
269
270
    def __init__(self, sysroot, policy_enabled, policy_data, target_content_path,
271
                 target_tailoring_path):
272
        """Create a task."""
273
        super().__init__()
274
        self._sysroot = sysroot
275
        self._policy_enabled = policy_enabled
276
        self._policy_data = policy_data
277
        self._target_content_path = target_content_path
278
        self._target_tailoring_path = target_tailoring_path
279
280
    @property
281
    def name(self):
282
        return "Remediate the system"
283
284
    def run(self):
285
        """Run the task."""
286
        if not self._policy_enabled:
287
            log.debug("The security policy is disabled. Skip.")
288
            return
289
290
        if not self._policy_data.profile_id:
291
            log.debug("No profile is selected. Skip.")
292
            return
293
294
        common.run_oscap_remediate(
295
            self._policy_data.profile_id,
296
            self._target_content_path,
297
            self._policy_data.datastream_id,
298
            self._policy_data.xccdf_id,
299
            self._target_tailoring_path,
300
            chroot=self._sysroot
301
        )
302