Passed
Pull Request — rhel9-branch (#158)
by Matěj
01:18
created

oscap   F

Complexity

Total Complexity 98

Size/Duplication

Total Lines 552
Duplicated Lines 26.63 %

Importance

Changes 0
Metric Value
eloc 307
dl 147
loc 552
rs 2
c 0
b 0
f 0
wmc 98

26 Methods

Rating   Name   Duplication   Size   Complexity  
A OSCAPdata.preinst_tailoring_path() 0 9 2
A OSCAPdata._parse_xccdf_id() 0 3 1
D OSCAPdata.__str__() 41 41 12
A OSCAPdata.postinst_tailoring_path() 0 13 3
A OSCAPdata._parse_datastream_id() 0 3 1
A OSCAPdata.preinst_content_path() 0 13 3
A OSCAPdata.content_name() 18 18 5
A OSCAPdata._parse_profile_id() 0 3 1
A OSCAPdata._parse_tailoring_path() 0 3 1
A OSCAPdata.clear_all() 0 4 1
A OSCAPdata._parse_content_type() 0 8 2
D OSCAPdata.finalize() 44 44 13
A OSCAPdata._parse_certificates() 0 2 1
A OSCAPdata.raw_preinst_content_path() 0 6 1
A OSCAPdata._parse_fingerprint() 10 10 3
A OSCAPdata.raw_postinst_content_path() 0 6 1
A OSCAPdata._parse_content_url() 0 7 2
A OSCAPdata._fetch_content_and_initialize() 0 21 3
A OSCAPdata._parse_cpe_path() 0 3 1
A OSCAPdata._parse_content_path() 0 3 1
A OSCAPdata.__init__() 0 30 2
A OSCAPdata.postinst_content_path() 0 13 3
B OSCAPdata.execute() 0 46 8
F OSCAPdata.setup() 0 108 24
A OSCAPdata.content_defined() 0 3 1
A OSCAPdata.handle_line() 34 34 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like oscap often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""Module with the OSCAPdata class."""
22
23
import shutil
24
import re
25
import os
26
import time
27
import logging
28
29
from pyanaconda.addons import AddonData
30
from pyanaconda.core.configuration.anaconda import conf
31
from pyanaconda.progress import progressQ
32
from pyanaconda import errors
33
from pyanaconda.core import util
34
from pyanaconda import flags
35
from pykickstart.errors import KickstartParseError, KickstartValueError
36
from org_fedora_oscap import utils, common, rule_handling, data_fetch
37
from org_fedora_oscap.common import SUPPORTED_ARCHIVES, _
38
39
log = logging.getLogger("anaconda")
40
41
# export OSCAPdata class to prevent Anaconda's collect method from taking
42
# AddonData class instead of the OSCAPdata class
43
# @see: pyanaconda.kickstart.AnacondaKSHandler.__init__
44
__all__ = ["OSCAPdata"]
45
46
SUPPORTED_CONTENT_TYPES = ("datastream", "rpm", "archive",
47
                           "scap-security-guide",
48
                           )
49
50
SUPPORTED_URL_PREFIXES = ("http://", "https://", "ftp://"
51
                          # LABEL:?, hdaX:?,
52
                          )
53
54
REQUIRED_PACKAGES = ("openscap", "openscap-scanner", )
55
56
FINGERPRINT_REGEX = re.compile(r'^[a-z0-9]+$')
57
58
59
class MisconfigurationError(common.OSCAPaddonError):
60
    """Exception for reporting misconfiguration."""
61
62
    pass
63
64
65
class OSCAPdata(AddonData):
66
    """
67
    Class parsing and storing data for the OSCAP addon.
68
69
    :see: pyanaconda.addons.AddonData
70
71
    """
72
73
    def __init__(self, name, just_clear=False):
74
        """
75
        :param name: name of the addon
76
        :type name: str
77
78
        """
79
80
        if not just_clear:
81
            # do not call the parent's __init__ more than once
82
            AddonData.__init__(self, name)
83
84
        # values specifying the content
85
        self.content_type = ""
86
        self.content_url = ""
87
        self.datastream_id = ""
88
        self.xccdf_id = ""
89
        self.profile_id = ""
90
        self.content_path = ""
91
        self.cpe_path = ""
92
        self.tailoring_path = ""
93
94
        # additional values
95
        self.fingerprint = ""
96
97
        # certificate to verify HTTPS connection or signed data
98
        self.certificates = ""
99
100
        # internal values
101
        self.rule_data = rule_handling.RuleData()
102
        self.dry_run = False
103
104 View Code Duplication
    def __str__(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
105
        """
106
        What should end up in the resulting kickstart file, i.e. string
107
        representation of the stored data.
108
109
        """
110
111
        if self.dry_run or not self.profile_id:
112
            # the addon was run in the dry run mode, omit it from the kickstart
113
            return ""
114
115
        def key_value_pair(key, value, indent=4):
116
            return "%s%s = %s" % (indent * " ", key, value)
117
118
        ret = "%%addon %s" % self.name
119
        ret += "\n%s" % key_value_pair("content-type", self.content_type)
120
121
        if self.content_url:
122
            ret += "\n%s" % key_value_pair("content-url", self.content_url)
123
        if self.datastream_id:
124
            ret += "\n%s" % key_value_pair("datastream-id", self.datastream_id)
125
        if self.xccdf_id:
126
            ret += "\n%s" % key_value_pair("xccdf-id", self.xccdf_id)
127
        if self.content_path and self.content_type != "scap-security-guide":
128
            ret += "\n%s" % key_value_pair("content-path", self.content_path)
129
        if self.cpe_path:
130
            ret += "\n%s" % key_value_pair("cpe-path", self.cpe_path)
131
        if self.tailoring_path:
132
            ret += "\n%s" % key_value_pair("tailoring-path",
133
                                           self.tailoring_path)
134
135
        ret += "\n%s" % key_value_pair("profile", self.profile_id)
136
137
        if self.fingerprint:
138
            ret += "\n%s" % key_value_pair("fingerprint", self.fingerprint)
139
140
        if self.certificates:
141
            ret += "\n%s" % key_value_pair("certificates", self.certificates)
142
143
        ret += "\n%end\n\n"
144
        return ret
145
146
    def _parse_content_type(self, value):
147
        value_low = value.lower()
148
        if value_low in SUPPORTED_CONTENT_TYPES:
149
            self.content_type = value_low
150
        else:
151
            msg = "Unsupported content type '%s' in the %s addon" % (value,
152
                                                                     self.name)
153
            raise KickstartValueError(msg)
154
155
    def _parse_content_url(self, value):
156
        if any(value.startswith(prefix)
157
               for prefix in SUPPORTED_URL_PREFIXES):
158
            self.content_url = value
159
        else:
160
            msg = "Unsupported url '%s' in the %s addon" % (value, self.name)
161
            raise KickstartValueError(msg)
162
163
    def _parse_datastream_id(self, value):
164
        # need to be checked?
165
        self.datastream_id = value
166
167
    def _parse_xccdf_id(self, value):
168
        # need to be checked?
169
        self.xccdf_id = value
170
171
    def _parse_profile_id(self, value):
172
        # need to be checked?
173
        self.profile_id = value
174
175
    def _parse_content_path(self, value):
176
        # need to be checked?
177
        self.content_path = value
178
179
    def _parse_cpe_path(self, value):
180
        # need to be checked?
181
        self.cpe_path = value
182
183
    def _parse_tailoring_path(self, value):
184
        # need to be checked?
185
        self.tailoring_path = value
186
187 View Code Duplication
    def _parse_fingerprint(self, value):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
188
        if FINGERPRINT_REGEX.match(value) is None:
189
            msg = "Unsupported or invalid fingerprint"
190
            raise KickstartValueError(msg)
191
192
        if utils.get_hashing_algorithm(value) is None:
193
            msg = "Unsupported fingerprint"
194
            raise KickstartValueError(msg)
195
196
        self.fingerprint = value
197
198
    def _parse_certificates(self, value):
199
        self.certificates = value
200
201 View Code Duplication
    def handle_line(self, line):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
202
        """
203
        The handle_line method that is called with every line from this addon's
204
        %addon section of the kickstart file.
205
206
        :param line: a single line from the %addon section
207
        :type line: str
208
209
        """
210
211
        actions = {"content-type": self._parse_content_type,
212
                   "content-url": self._parse_content_url,
213
                   "content-path": self._parse_content_path,
214
                   "datastream-id": self._parse_datastream_id,
215
                   "profile": self._parse_profile_id,
216
                   "xccdf-id": self._parse_xccdf_id,
217
                   "xccdf-path": self._parse_content_path,
218
                   "cpe-path": self._parse_cpe_path,
219
                   "tailoring-path": self._parse_tailoring_path,
220
                   "fingerprint": self._parse_fingerprint,
221
                   "certificates": self._parse_certificates,
222
                   }
223
224
        line = line.strip()
225
        (pre, sep, post) = line.partition("=")
226
        pre = pre.strip()
227
        post = post.strip()
228
        post = post.strip('"')
229
230
        try:
231
            actions[pre](post)
232
        except KeyError:
233
            msg = "Unknown item '%s' for %s addon" % (line, self.name)
234
            raise KickstartParseError(msg)
235
236 View Code Duplication
    def finalize(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
237
        """
238
        The finalize method that is called when the end of the %addon section
239
        (the %end line) is reached. It means no more kickstart data will come.
240
241
        """
242
243
        tmpl = "%s missing for the %s addon"
244
245
        # check provided data
246
        if not self.content_type:
247
            raise KickstartValueError(tmpl % ("content-type", self.name))
248
249
        if self.content_type != "scap-security-guide" and not self.content_url:
250
            raise KickstartValueError(tmpl % ("content-url", self.name))
251
252
        if not self.profile_id:
253
            self.profile_id = "default"
254
255
        if self.content_type in ("rpm", "archive") and not self.content_path:
256
            msg = "Path to the XCCDF file has to be given if content in RPM "\
257
                  "or archive is used"
258
            raise KickstartValueError(msg)
259
260
        if self.content_type == "rpm" and not self.content_url.endswith(".rpm"):
261
            msg = "Content type set to RPM, but the content URL doesn't end "\
262
                  "with '.rpm'"
263
            raise KickstartValueError(msg)
264
265
        if self.content_type == "archive":
266
            supported_archive = any(self.content_url.endswith(arch_type)
267
                                    for arch_type in SUPPORTED_ARCHIVES)
268
            if not supported_archive:
269
                msg = "Unsupported archive type of the content "\
270
                      "file '%s'" % self.content_url
271
                raise KickstartValueError(msg)
272
273
        # do some initialization magic in case of SSG
274
        if self.content_type == "scap-security-guide":
275
            if not common.ssg_available():
276
                msg = "SCAP Security Guide not found on the system"
277
                raise KickstartValueError(msg)
278
279
            self.content_path = common.SSG_DIR + common.SSG_CONTENT
280
281
    @property
282
    def content_defined(self):
283
        return self.content_url or self.content_type == "scap-security-guide"
284
285 View Code Duplication
    @property
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
286
    def content_name(self):
287
        if self.content_type == "scap-security-guide":
288
            raise ValueError("Using scap-security-guide, no single content file")
289
290
        rest = "/anonymous_content"
291
        for prefix in SUPPORTED_URL_PREFIXES:
292
            if self.content_url.startswith(prefix):
293
                rest = self.content_url[len(prefix):]
294
                break
295
296
        parts = rest.rsplit("/", 1)
297
        if len(parts) != 2:
298
            msg = "Unsupported url '%s' in the %s addon" % (self.content_url,
299
                                                            self.name)
300
            raise KickstartValueError(msg)
301
302
        return parts[1]
303
304
    @property
305
    def raw_preinst_content_path(self):
306
        """Path to the raw (unextracted, ...) pre-installation content file"""
307
308
        return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
309
                                self.content_name)
310
311
    @property
312
    def raw_postinst_content_path(self):
313
        """Path to the raw (unextracted, ...) post-installation content file"""
314
315
        return utils.join_paths(common.TARGET_CONTENT_DIR,
316
                                self.content_name)
317
318
    @property
319
    def preinst_content_path(self):
320
        """Path to the pre-installation content file"""
321
322
        if self.content_type == "datastream":
323
            return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
324
                                    self.content_name)
325
        elif self.content_type == "scap-security-guide":
326
            # SSG is not copied to the standard place
327
            return self.content_path
328
        else:
329
            return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
330
                                    self.content_path)
331
332
    @property
333
    def postinst_content_path(self):
334
        """Path to the post-installation content file"""
335
336
        if self.content_type == "datastream":
337
            return utils.join_paths(common.TARGET_CONTENT_DIR,
338
                                    self.content_name)
339
        elif self.content_type in ("rpm", "scap-security-guide"):
340
            # no path magic in case of RPM (SSG is installed as an RPM)
341
            return self.content_path
342
        else:
343
            return utils.join_paths(common.TARGET_CONTENT_DIR,
344
                                    self.content_path)
345
346
    @property
347
    def preinst_tailoring_path(self):
348
        """Path to the pre-installation tailoring file (if any)"""
349
350
        if not self.tailoring_path:
351
            return ""
352
353
        return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
354
                                self.tailoring_path)
355
356
    @property
357
    def postinst_tailoring_path(self):
358
        """Path to the post-installation tailoring file (if any)"""
359
360
        if not self.tailoring_path:
361
            return ""
362
363
        if self.content_type == "rpm":
364
            # no path magic in case of RPM
365
            return self.tailoring_path
366
367
        return utils.join_paths(common.TARGET_CONTENT_DIR,
368
                                self.tailoring_path)
369
370
    def _fetch_content_and_initialize(self):
371
        """Fetch content and initialize from it"""
372
373
        data_fetch.fetch_data(self.content_url, self.raw_preinst_content_path,
374
                              self.certificates)
375
        # RPM is an archive at this phase
376
        if self.content_type in ("archive", "rpm"):
377
            # extract the content
378
            common.extract_data(self.raw_preinst_content_path,
379
                                common.INSTALLATION_CONTENT_DIR,
380
                                [self.content_path])
381
382
        rules = common.get_fix_rules_pre(self.profile_id,
383
                                         self.preinst_content_path,
384
                                         self.datastream_id, self.xccdf_id,
385
                                         self.preinst_tailoring_path)
386
387
        # parse and store rules with a clean RuleData instance
388
        self.rule_data = rule_handling.RuleData()
389
        for rule in rules.splitlines():
390
            self.rule_data.new_rule(rule)
391
392
    def setup(self, storage, ksdata, payload):
393
        """
394
        The setup method that should make changes to the runtime environment
395
        according to the data stored in this object.
396
397
        :param storage: object storing storage-related information
398
                        (disks, partitioning, bootloader, etc.)
399
        :type storage: blivet.Blivet instance
400
        :param ksdata: data parsed from the kickstart file and set in the
401
                       installation process
402
        :type ksdata: pykickstart.base.BaseHandler instance
403
404
        """
405
406
        if self.dry_run or not self.profile_id:
407
            # nothing more to be done in the dry-run mode or if no profile is
408
            # selected
409
            return
410
411
        if not os.path.exists(self.preinst_content_path) and not os.path.exists(self.raw_preinst_content_path):
412
            # content not available/fetched yet
413
            try:
414
                self._fetch_content_and_initialize()
415
            except (common.OSCAPaddonError, data_fetch.DataFetchError) as e:
416
                log.error("Failed to fetch and initialize SCAP content!")
417
                msg = _("There was an error fetching and loading the security content:\n" +
418
                        "%s\n" +
419
                        "The installation should be aborted. Do you wish to continue anyway?") % e
420
421
                if flags.flags.automatedInstall and not flags.flags.ksprompt:
422
                    # cannot have ask in a non-interactive kickstart
423
                    # installation
424
                    raise errors.CmdlineError(msg)
425
426
                answ = errors.errorHandler.ui.showYesNoQuestion(msg)
427
                if answ == errors.ERROR_CONTINUE:
428
                    # prevent any futher actions here by switching to the dry
429
                    # run mode and let things go on
430
                    self.dry_run = True
431
                    return
432
                else:
433
                    # Let's sleep forever to prevent any further actions and
434
                    # wait for the main thread to quit the process.
435
                    progressQ.send_quit(1)
436
                    while True:
437
                        time.sleep(100000)
438
439
        # check fingerprint if given
440
        if self.fingerprint:
441
            hash_obj = utils.get_hashing_algorithm(self.fingerprint)
442
            digest = utils.get_file_fingerprint(self.raw_preinst_content_path,
443
                                                hash_obj)
444
            if digest != self.fingerprint:
445
                log.error("Failed to fetch and initialize SCAP content!")
446
                msg = _("The integrity check of the security content failed.\n" +
447
                        "The installation should be aborted. Do you wish to continue anyway?")
448
449
                if flags.flags.automatedInstall and not flags.flags.ksprompt:
450
                    # cannot have ask in a non-interactive kickstart
451
                    # installation
452
                    raise errors.CmdlineError(msg)
453
454
                answ = errors.errorHandler.ui.showYesNoQuestion(msg)
455
                if answ == errors.ERROR_CONTINUE:
456
                    # prevent any futher actions here by switching to the dry
457
                    # run mode and let things go on
458
                    self.dry_run = True
459
                    return
460
                else:
461
                    # Let's sleep forever to prevent any further actions and
462
                    # wait for the main thread to quit the process.
463
                    progressQ.send_quit(1)
464
                    while True:
465
                        time.sleep(100000)
466
467
        # evaluate rules, do automatic fixes and stop if something that cannot
468
        # be fixed automatically is wrong
469
        fatal_messages = [message for message in self.rule_data.eval_rules(ksdata, storage)
470
                          if message.type == common.MESSAGE_TYPE_FATAL]
471
        if any(fatal_messages):
472
            msg = "Wrong configuration detected!\n"
473
            msg += "\n".join(message.text for message in fatal_messages)
474
            msg += "\nThe installation should be aborted. Do you wish to continue anyway?"
475
            if flags.flags.automatedInstall and not flags.flags.ksprompt:
476
                # cannot have ask in a non-interactive kickstart installation
477
                raise errors.CmdlineError(msg)
478
479
            answ = errors.errorHandler.ui.showYesNoQuestion(msg)
480
            if answ == errors.ERROR_CONTINUE:
481
                # prevent any futher actions here by switching to the dry
482
                # run mode and let things go on
483
                self.dry_run = True
484
                return
485
            else:
486
                # Let's sleep forever to prevent any further actions and wait
487
                # for the main thread to quit the process.
488
                progressQ.send_quit(1)
489
                while True:
490
                    time.sleep(100000)
491
492
        # add packages needed on the target system to the list of packages
493
        # that are requested to be installed
494
        pkgs_to_install = list(REQUIRED_PACKAGES)
495
        if self.content_type == "scap-security-guide":
496
            pkgs_to_install.append("scap-security-guide")
497
        for pkg in pkgs_to_install:
498
            if pkg not in ksdata.packages.packageList:
499
                ksdata.packages.packageList.append(pkg)
500
501
    def execute(self, storage, ksdata, users, payload):
502
        """
503
        The execute method that should make changes to the installed system. It
504
        is called only once in the post-install setup phase.
505
506
        :see: setup
507
        :param users: information about created users
508
        :type users: pyanaconda.users.Users instance
509
510
        """
511
512
        if self.dry_run or not self.profile_id:
513
            # nothing more to be done in the dry-run mode or if no profile is
514
            # selected
515
            return
516
517
        target_content_dir = utils.join_paths(conf.target.system_root,
518
                                              common.TARGET_CONTENT_DIR)
519
        utils.ensure_dir_exists(target_content_dir)
520
521
        if self.content_type == "datastream":
522
            shutil.copy2(self.preinst_content_path, target_content_dir)
523
        elif self.content_type == "rpm":
524
            # copy the RPM to the target system
525
            shutil.copy2(self.raw_preinst_content_path, target_content_dir)
526
527
            # and install it with yum
528
            ret = util.execInSysroot("yum", ["-y", "--nogpg", "install",
529
                                             self.raw_postinst_content_path])
530
            if ret != 0:
531
                raise common.ExtractionError("Failed to install content "
532
                                             "RPM to the target system")
533
        elif self.content_type == "scap-security-guide":
534
            # nothing needed
535
            pass
536
        else:
537
            utils.universal_copy(utils.join_paths(common.INSTALLATION_CONTENT_DIR,
538
                                                  "*"),
539
                                 target_content_dir)
540
        if os.path.exists(self.preinst_tailoring_path):
541
            shutil.copy2(self.preinst_tailoring_path, target_content_dir)
542
543
        common.run_oscap_remediate(self.profile_id, self.postinst_content_path,
544
                                   self.datastream_id, self.xccdf_id,
545
                                   self.postinst_tailoring_path,
546
                                   chroot=conf.target.system_root)
547
548
    def clear_all(self):
549
        """Clear all the stored values."""
550
551
        self.__init__(self.name, just_clear=True)
552