Passed
Pull Request — rhel8-branch (#148)
by Matěj
01:28
created

org_fedora_oscap.ks.oscap.OSCAPdata.setup()   D

Complexity

Conditions 13

Size

Total Lines 65
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 36
dl 0
loc 65
rs 4.2
c 0
b 0
f 0
cc 13
nop 4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like org_fedora_oscap.ks.oscap.OSCAPdata.setup() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""Module with the OSCAPdata class."""
22
23
import shutil
24
import re
25
import os
26
import time
27
import logging
28
import pathlib
29
30
from pyanaconda.addons import AddonData
31
from pyanaconda.core.configuration.anaconda import conf
32
from pyanaconda.progress import progressQ
33
from pyanaconda import errors
34
from pyanaconda.core import util
35
from pyanaconda import flags
36
from pykickstart.errors import KickstartParseError, KickstartValueError
37
from org_fedora_oscap import utils, common, rule_handling, data_fetch
38
from org_fedora_oscap.common import SUPPORTED_ARCHIVES, _
39
from org_fedora_oscap.content_handling import ContentCheckError, ContentHandlingError
40
from org_fedora_oscap import model
41
42
log = logging.getLogger("anaconda")
43
44
# export OSCAPdata class to prevent Anaconda's collect method from taking
45
# AddonData class instead of the OSCAPdata class
46
# @see: pyanaconda.kickstart.AnacondaKSHandler.__init__
47
__all__ = ["OSCAPdata"]
48
49
SUPPORTED_CONTENT_TYPES = ("datastream", "rpm", "archive",
50
                           "scap-security-guide",
51
                           )
52
53
SUPPORTED_URL_PREFIXES = ("http://", "https://", "ftp://", "file://"
54
                          # LABEL:?, hdaX:?,
55
                          )
56
57
REQUIRED_PACKAGES = ("openscap", "openscap-scanner", )
58
59
FINGERPRINT_REGEX = re.compile(r'^[a-z0-9]+$')
60
61
62
class MisconfigurationError(common.OSCAPaddonError):
63
    """Exception for reporting misconfiguration."""
64
65
    pass
66
67
68
class OSCAPdata(AddonData):
69
    """
70
    Class parsing and storing data for the OSCAP addon.
71
72
    :see: pyanaconda.addons.AddonData
73
74
    """
75
76
    def __init__(self, name, just_clear=False):
77
        """
78
        :param name: name of the addon
79
        :type name: str
80
81
        """
82
83
        if not just_clear:
84
            # do not call the parent's __init__ more than once
85
            AddonData.__init__(self, name)
86
87
        # values specifying the content
88
        self.content_type = ""
89
        self.content_url = ""
90
        self.datastream_id = ""
91
        self.xccdf_id = ""
92
        self.profile_id = ""
93
        self.content_path = ""
94
        self.cpe_path = ""
95
        self.tailoring_path = ""
96
97
        # additional values
98
        self.fingerprint = ""
99
100
        # certificate to verify HTTPS connection or signed data
101
        self.certificates = ""
102
103
        # internal values
104
        self.rule_data = rule_handling.RuleData()
105
        self.dry_run = False
106
107
        self.model = model.Model(self)
108
        self.model.CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
109
110
    def __str__(self):
111
        """
112
        What should end up in the resulting kickstart file, i.e. string
113
        representation of the stored data.
114
115
        """
116
117
        if self.dry_run or not self.profile_id:
118
            # the addon was run in the dry run mode, omit it from the kickstart
119
            return ""
120
121
        def key_value_pair(key, value, indent=4):
122
            return "%s%s = %s" % (indent * " ", key, value)
123
124
        ret = "%%addon %s" % self.name
125
        ret += "\n%s" % key_value_pair("content-type", self.content_type)
126
127
        if self.content_url:
128
            ret += "\n%s" % key_value_pair("content-url", self.content_url)
129
        if self.datastream_id:
130
            ret += "\n%s" % key_value_pair("datastream-id", self.datastream_id)
131
        if self.xccdf_id:
132
            ret += "\n%s" % key_value_pair("xccdf-id", self.xccdf_id)
133
        if self.content_path and self.content_type != "scap-security-guide":
134
            ret += "\n%s" % key_value_pair("content-path", self.content_path)
135
        if self.cpe_path:
136
            ret += "\n%s" % key_value_pair("cpe-path", self.cpe_path)
137
        if self.tailoring_path:
138
            ret += "\n%s" % key_value_pair("tailoring-path",
139
                                           self.tailoring_path)
140
141
        ret += "\n%s" % key_value_pair("profile", self.profile_id)
142
143
        if self.fingerprint:
144
            ret += "\n%s" % key_value_pair("fingerprint", self.fingerprint)
145
146
        if self.certificates:
147
            ret += "\n%s" % key_value_pair("certificates", self.certificates)
148
149
        ret += "\n%end\n\n"
150
        return ret
151
152
    def _parse_content_type(self, value):
153
        value_low = value.lower()
154
        if value_low in SUPPORTED_CONTENT_TYPES:
155
            self.content_type = value_low
156
        else:
157
            msg = "Unsupported content type '%s' in the %s addon" % (value,
158
                                                                     self.name)
159
            raise KickstartValueError(msg)
160
161
    def _parse_content_url(self, value):
162
        if any(value.startswith(prefix)
163
               for prefix in SUPPORTED_URL_PREFIXES):
164
            self.content_url = value
165
        else:
166
            msg = "Unsupported url '%s' in the %s addon" % (value, self.name)
167
            raise KickstartValueError(msg)
168
169
    def _parse_datastream_id(self, value):
170
        # need to be checked?
171
        self.datastream_id = value
172
173
    def _parse_xccdf_id(self, value):
174
        # need to be checked?
175
        self.xccdf_id = value
176
177
    def _parse_profile_id(self, value):
178
        # need to be checked?
179
        self.profile_id = value
180
181
    def _parse_content_path(self, value):
182
        # need to be checked?
183
        self.content_path = value
184
185
    def _parse_cpe_path(self, value):
186
        # need to be checked?
187
        self.cpe_path = value
188
189
    def _parse_tailoring_path(self, value):
190
        # need to be checked?
191
        self.tailoring_path = value
192
193
    def _parse_fingerprint(self, value):
194
        if FINGERPRINT_REGEX.match(value) is None:
195
            msg = "Unsupported or invalid fingerprint"
196
            raise KickstartValueError(msg)
197
198
        if utils.get_hashing_algorithm(value) is None:
199
            msg = "Unsupported fingerprint"
200
            raise KickstartValueError(msg)
201
202
        self.fingerprint = value
203
204
    def _parse_certificates(self, value):
205
        self.certificates = value
206
207
    def handle_line(self, line):
208
        """
209
        The handle_line method that is called with every line from this addon's
210
        %addon section of the kickstart file.
211
212
        :param line: a single line from the %addon section
213
        :type line: str
214
215
        """
216
217
        actions = {"content-type": self._parse_content_type,
218
                   "content-url": self._parse_content_url,
219
                   "content-path": self._parse_content_path,
220
                   "datastream-id": self._parse_datastream_id,
221
                   "profile": self._parse_profile_id,
222
                   "xccdf-id": self._parse_xccdf_id,
223
                   "xccdf-path": self._parse_content_path,
224
                   "cpe-path": self._parse_cpe_path,
225
                   "tailoring-path": self._parse_tailoring_path,
226
                   "fingerprint": self._parse_fingerprint,
227
                   "certificates": self._parse_certificates,
228
                   }
229
230
        line = line.strip()
231
        (pre, sep, post) = line.partition("=")
232
        pre = pre.strip()
233
        post = post.strip()
234
        post = post.strip('"')
235
236
        try:
237
            actions[pre](post)
238
        except KeyError:
239
            msg = "Unknown item '%s' for %s addon" % (line, self.name)
240
            raise KickstartParseError(msg)
241
242
    def finalize(self):
243
        """
244
        The finalize method that is called when the end of the %addon section
245
        (the %end line) is reached. It means no more kickstart data will come.
246
247
        """
248
249
        tmpl = "%s missing for the %s addon"
250
251
        # check provided data
252
        if not self.content_type:
253
            raise KickstartValueError(tmpl % ("content-type", self.name))
254
255
        if self.content_type != "scap-security-guide" and not self.content_url:
256
            raise KickstartValueError(tmpl % ("content-url", self.name))
257
258
        if not self.profile_id:
259
            self.profile_id = "default"
260
261
        if self.content_type in ("rpm", "archive") and not self.content_path:
262
            msg = "Path to the XCCDF file has to be given if content in RPM "\
263
                  "or archive is used"
264
            raise KickstartValueError(msg)
265
266
        if self.content_type == "rpm" and not self.content_url.endswith(".rpm"):
267
            msg = "Content type set to RPM, but the content URL doesn't end "\
268
                  "with '.rpm'"
269
            raise KickstartValueError(msg)
270
271
        if self.content_type == "archive":
272
            supported_archive = any(self.content_url.endswith(arch_type)
273
                                    for arch_type in SUPPORTED_ARCHIVES)
274
            if not supported_archive:
275
                msg = "Unsupported archive type of the content "\
276
                      "file '%s'" % self.content_url
277
                raise KickstartValueError(msg)
278
279
        # do some initialization magic in case of SSG
280
        if self.content_type == "scap-security-guide":
281
            if not common.ssg_available():
282
                msg = "SCAP Security Guide not found on the system"
283
                raise KickstartValueError(msg)
284
285
            self.content_path = common.SSG_DIR + common.SSG_CONTENT
286
287
    @property
288
    def content_defined(self):
289
        return self.content_url or self.content_type == "scap-security-guide"
290
291
    @property
292
    def content_name(self):
293
        if self.content_type == "scap-security-guide":
294
            raise ValueError("Using scap-security-guide, no single content file")
295
296
        rest = "/anonymous_content"
297
        for prefix in SUPPORTED_URL_PREFIXES:
298
            if self.content_url.startswith(prefix):
299
                rest = self.content_url[len(prefix):]
300
                break
301
302
        parts = rest.rsplit("/", 1)
303
        if len(parts) != 2:
304
            msg = "Unsupported url '%s' in the %s addon" % (self.content_url,
305
                                                            self.name)
306
            raise KickstartValueError(msg)
307
308
        return parts[1]
309
310
    @property
311
    def raw_preinst_content_path(self):
312
        """Path to the raw (unextracted, ...) pre-installation content file"""
313
314
        return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
315
                                self.content_name)
316
317
    @property
318
    def raw_postinst_content_path(self):
319
        """Path to the raw (unextracted, ...) post-installation content file"""
320
321
        return utils.join_paths(common.TARGET_CONTENT_DIR,
322
                                self.content_name)
323
324
    @property
325
    def preinst_content_path(self):
326
        """Path to the pre-installation content file"""
327
328
        if self.content_type == "datastream":
329
            return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
330
                                    self.content_name)
331
        elif self.content_type == "scap-security-guide":
332
            # SSG is not copied to the standard place
333
            return self.content_path
334
        else:
335
            return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
336
                                    self.content_path)
337
338
    @property
339
    def postinst_content_path(self):
340
        """Path to the post-installation content file"""
341
342
        if self.content_type == "datastream":
343
            return utils.join_paths(common.TARGET_CONTENT_DIR,
344
                                    self.content_name)
345
        elif self.content_type in ("rpm", "scap-security-guide"):
346
            # no path magic in case of RPM (SSG is installed as an RPM)
347
            return self.content_path
348
        else:
349
            return utils.join_paths(common.TARGET_CONTENT_DIR,
350
                                    self.content_path)
351
352
    @property
353
    def preinst_tailoring_path(self):
354
        """Path to the pre-installation tailoring file (if any)"""
355
356
        if not self.tailoring_path:
357
            return ""
358
359
        return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
360
                                self.tailoring_path)
361
362
    @property
363
    def postinst_tailoring_path(self):
364
        """Path to the post-installation tailoring file (if any)"""
365
366
        if not self.tailoring_path:
367
            return ""
368
369
        if self.content_type == "rpm":
370
            # no path magic in case of RPM
371
            return self.tailoring_path
372
373
        return utils.join_paths(common.TARGET_CONTENT_DIR,
374
                                self.tailoring_path)
375
376
    def _terminate(self, message):
377
        if flags.flags.automatedInstall and not flags.flags.ksprompt:
378
            # cannot have ask in a non-interactive kickstart
379
            # installation
380
            raise errors.CmdlineError(message)
381
382
        answ = errors.errorHandler.ui.showYesNoQuestion(msg)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable msg does not seem to be defined.
Loading history...
383
        if answ == errors.ERROR_CONTINUE:
384
            # prevent any futher actions here by switching to the dry
385
            # run mode and let things go on
386
            self.dry_run = True
387
            return
388
        else:
389
            # Let's sleep forever to prevent any further actions and
390
            # wait for the main thread to quit the process.
391
            progressQ.send_quit(1)
392
            while True:
393
                time.sleep(100000)
394
395
    def _handle_error(self, exception):
396
        log.error("Failed to fetch and initialize SCAP content!")
397
398
        if isinstance(exception, ContentCheckError):
399
            msg = _("The integrity check of the security content failed.\n" +
400
                    "The installation should be aborted. Do you wish to continue anyway?")
401
            self._terminate(msg)
402
        elif (isinstance(exception, common.OSCAPaddonError)
403
            or isinstance(exception, data_fetch.DataFetchError)):
404
            msg = _("There was an error fetching and loading the security content:\n" +
405
                    "%s\n" +
406
                    "The installation should be aborted. Do you wish to continue anyway?") % str(exception)
407
            self._terminate(msg)
408
409
        else:
410
            raise exception
411
412
    def setup(self, storage, ksdata, payload):
413
        """
414
        The setup method that should make changes to the runtime environment
415
        according to the data stored in this object.
416
417
        :param storage: object storing storage-related information
418
                        (disks, partitioning, bootloader, etc.)
419
        :type storage: blivet.Blivet instance
420
        :param ksdata: data parsed from the kickstart file and set in the
421
                       installation process
422
        :type ksdata: pykickstart.base.BaseHandler instance
423
424
        """
425
426
        if self.dry_run or not self.profile_id:
427
            # nothing more to be done in the dry-run mode or if no profile is
428
            # selected
429
            return
430
431
        thread_name = None
432
        if not os.path.exists(self.preinst_content_path) and not os.path.exists(self.raw_preinst_content_path):
433
            # content not available/fetched yet
434
            self.model.content_uri = self.content_url
435
            thread_name = self.model.fetch_content(self._handle_error, self.certificates)
436
437
        content_dest = None
438
        if self.content_type != "scap-security-guide":
439
            content_dest = self.raw_preinst_content_path
440
441
        content = self.model.finish_content_fetch(
442
            thread_name, self.fingerprint, lambda msg: log.info(msg), content_dest, self._handle_error)
443
444
        if not content:
445
            return
446
447
        try:
448
            # just check that preferred content exists
449
            _ = self.get_preferred_content(content)
450
        except Exception as exc:
451
            self._terminate(str(exc))
452
            return
453
454
        self.rule_data = rule_handling.get_rule_data_from_content(
455
            self.profile_id, self.preinst_content_path,
456
            self.datastream_id, self.xccdf_id, self.preinst_tailoring_path)
457
458
        # evaluate rules, do automatic fixes and stop if something that cannot
459
        # be fixed automatically is wrong
460
        fatal_messages = [message for message in self.rule_data.eval_rules(ksdata, storage)
461
                          if message.type == common.MESSAGE_TYPE_FATAL]
462
        if any(fatal_messages):
463
            msg = ["Wrong configuration detected!"]
464
            msg.extend(fatal_messages)
465
            msg.append("The installation should be aborted. Do you wish to continue anyway?")
466
            self._terminate("\n".join(msg))
467
            return
468
469
        # add packages needed on the target system to the list of packages
470
        # that are requested to be installed
471
        pkgs_to_install = list(REQUIRED_PACKAGES)
472
        if self.content_type == "scap-security-guide":
473
            pkgs_to_install.append("scap-security-guide")
474
        for pkg in pkgs_to_install:
475
            if pkg not in ksdata.packages.packageList:
476
                ksdata.packages.packageList.append(pkg)
477
478
    def execute(self, storage, ksdata, users, payload):
479
        """
480
        The execute method that should make changes to the installed system. It
481
        is called only once in the post-install setup phase.
482
483
        :see: setup
484
        :param users: information about created users
485
        :type users: pyanaconda.users.Users instance
486
487
        """
488
489
        if self.dry_run or not self.profile_id:
490
            # nothing more to be done in the dry-run mode or if no profile is
491
            # selected
492
            return
493
494
        target_content_dir = utils.join_paths(conf.target.system_root,
495
                                              common.TARGET_CONTENT_DIR)
496
        utils.ensure_dir_exists(target_content_dir)
497
498
        if self.content_type == "datastream":
499
            shutil.copy2(self.preinst_content_path, target_content_dir)
500
        elif self.content_type == "rpm":
501
            # copy the RPM to the target system
502
            shutil.copy2(self.raw_preinst_content_path, target_content_dir)
503
504
            # and install it with yum
505
            ret = util.execInSysroot("yum", ["-y", "--nogpg", "install",
506
                                             self.raw_postinst_content_path])
507
            if ret != 0:
508
                raise common.ExtractionError("Failed to install content "
509
                                             "RPM to the target system")
510
        elif self.content_type == "scap-security-guide":
511
            # nothing needed
512
            pass
513
        else:
514
            utils.universal_copy(utils.join_paths(common.INSTALLATION_CONTENT_DIR,
515
                                                  "*"),
516
                                 target_content_dir)
517
        if os.path.exists(self.preinst_tailoring_path):
518
            shutil.copy2(self.preinst_tailoring_path, target_content_dir)
519
520
        common.run_oscap_remediate(self.profile_id, self.postinst_content_path,
521
                                   self.datastream_id, self.xccdf_id,
522
                                   self.postinst_tailoring_path,
523
                                   chroot=conf.target.system_root)
524
525
    def clear_all(self):
526
        """Clear all the stored values."""
527
528
        self.__init__(self.name, just_clear=True)
529