Passed
Pull Request — rhel8-branch (#148)
by Matěj
01:06
created

org_fedora_oscap.ks.oscap.OSCAPdata._terminate()   A

Complexity

Conditions 5

Size

Total Lines 18
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 18
rs 9.3333
c 0
b 0
f 0
cc 5
nop 2
1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""Module with the OSCAPdata class."""
22
23
import shutil
24
import re
25
import os
26
import time
27
import logging
28
import pathlib
29
30
from pyanaconda.addons import AddonData
31
from pyanaconda.core.configuration.anaconda import conf
32
from pyanaconda.progress import progressQ
33
from pyanaconda import errors
34
from pyanaconda.core import util
35
from pyanaconda import flags
36
from pykickstart.errors import KickstartParseError, KickstartValueError
37
from org_fedora_oscap import utils, common, rule_handling, data_fetch
38
from org_fedora_oscap.common import SUPPORTED_ARCHIVES, _
39
from org_fedora_oscap.content_handling import ContentCheckError, ContentHandlingError
40
from org_fedora_oscap import model
41
42
log = logging.getLogger("anaconda")
43
44
# export OSCAPdata class to prevent Anaconda's collect method from taking
45
# AddonData class instead of the OSCAPdata class
46
# @see: pyanaconda.kickstart.AnacondaKSHandler.__init__
47
__all__ = ["OSCAPdata"]
48
49
SUPPORTED_CONTENT_TYPES = ("datastream", "rpm", "archive",
50
                           "scap-security-guide",
51
                           )
52
53
SUPPORTED_URL_PREFIXES = ("http://", "https://", "ftp://", "file://"
54
                          # LABEL:?, hdaX:?,
55
                          )
56
57
REQUIRED_PACKAGES = ("openscap", "openscap-scanner", )
58
59
FINGERPRINT_REGEX = re.compile(r'^[a-z0-9]+$')
60
61
62
class MisconfigurationError(common.OSCAPaddonError):
63
    """Exception for reporting misconfiguration."""
64
65
    pass
66
67
68
class OSCAPdata(AddonData):
69
    """
70
    Class parsing and storing data for the OSCAP addon.
71
72
    :see: pyanaconda.addons.AddonData
73
74
    """
75
76
    def __init__(self, name, just_clear=False):
77
        """
78
        :param name: name of the addon
79
        :type name: str
80
81
        """
82
83
        if not just_clear:
84
            # do not call the parent's __init__ more than once
85
            AddonData.__init__(self, name)
86
87
        # values specifying the content
88
        self.content_type = ""
89
        self.content_url = ""
90
        self.datastream_id = ""
91
        self.xccdf_id = ""
92
        self.profile_id = ""
93
        self.content_path = ""
94
        self.cpe_path = ""
95
        self.tailoring_path = ""
96
97
        # additional values
98
        self.fingerprint = ""
99
100
        # certificate to verify HTTPS connection or signed data
101
        self.certificates = ""
102
103
        # internal values
104
        self.rule_data = rule_handling.RuleData()
105
        self.dry_run = False
106
107
        self.model = model.Model(self)
108
        self.model.CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
109
110
    def __str__(self):
111
        """
112
        What should end up in the resulting kickstart file, i.e. string
113
        representation of the stored data.
114
115
        """
116
117
        if self.dry_run or not self.profile_id:
118
            # the addon was run in the dry run mode, omit it from the kickstart
119
            return ""
120
121
        def key_value_pair(key, value, indent=4):
122
            return "%s%s = %s" % (indent * " ", key, value)
123
124
        ret = "%%addon %s" % self.name
125
        ret += "\n%s" % key_value_pair("content-type", self.content_type)
126
127
        if self.content_url:
128
            ret += "\n%s" % key_value_pair("content-url", self.content_url)
129
        if self.datastream_id:
130
            ret += "\n%s" % key_value_pair("datastream-id", self.datastream_id)
131
        if self.xccdf_id:
132
            ret += "\n%s" % key_value_pair("xccdf-id", self.xccdf_id)
133
        if self.content_path and self.content_type != "scap-security-guide":
134
            ret += "\n%s" % key_value_pair("content-path", self.content_path)
135
        if self.cpe_path:
136
            ret += "\n%s" % key_value_pair("cpe-path", self.cpe_path)
137
        if self.tailoring_path:
138
            ret += "\n%s" % key_value_pair("tailoring-path",
139
                                           self.tailoring_path)
140
141
        ret += "\n%s" % key_value_pair("profile", self.profile_id)
142
143
        if self.fingerprint:
144
            ret += "\n%s" % key_value_pair("fingerprint", self.fingerprint)
145
146
        if self.certificates:
147
            ret += "\n%s" % key_value_pair("certificates", self.certificates)
148
149
        ret += "\n%end\n\n"
150
        return ret
151
152
    def _parse_content_type(self, value):
153
        value_low = value.lower()
154
        if value_low in SUPPORTED_CONTENT_TYPES:
155
            self.content_type = value_low
156
        else:
157
            msg = "Unsupported content type '%s' in the %s addon" % (value,
158
                                                                     self.name)
159
            raise KickstartValueError(msg)
160
161
    def _parse_content_url(self, value):
162
        if any(value.startswith(prefix)
163
               for prefix in SUPPORTED_URL_PREFIXES):
164
            self.content_url = value
165
        else:
166
            msg = "Unsupported url '%s' in the %s addon" % (value, self.name)
167
            raise KickstartValueError(msg)
168
169
    def _parse_datastream_id(self, value):
170
        # need to be checked?
171
        self.datastream_id = value
172
173
    def _parse_xccdf_id(self, value):
174
        # need to be checked?
175
        self.xccdf_id = value
176
177
    def _parse_profile_id(self, value):
178
        # need to be checked?
179
        self.profile_id = value
180
181
    def _parse_content_path(self, value):
182
        # need to be checked?
183
        self.content_path = value
184
185
    def _parse_cpe_path(self, value):
186
        # need to be checked?
187
        self.cpe_path = value
188
189
    def _parse_tailoring_path(self, value):
190
        # need to be checked?
191
        self.tailoring_path = value
192
193
    def _parse_fingerprint(self, value):
194
        if FINGERPRINT_REGEX.match(value) is None:
195
            msg = "Unsupported or invalid fingerprint"
196
            raise KickstartValueError(msg)
197
198
        if utils.get_hashing_algorithm(value) is None:
199
            msg = "Unsupported fingerprint"
200
            raise KickstartValueError(msg)
201
202
        self.fingerprint = value
203
204
    def _parse_certificates(self, value):
205
        self.certificates = value
206
207
    def handle_line(self, line):
208
        """
209
        The handle_line method that is called with every line from this addon's
210
        %addon section of the kickstart file.
211
212
        :param line: a single line from the %addon section
213
        :type line: str
214
215
        """
216
217
        actions = {"content-type": self._parse_content_type,
218
                   "content-url": self._parse_content_url,
219
                   "content-path": self._parse_content_path,
220
                   "datastream-id": self._parse_datastream_id,
221
                   "profile": self._parse_profile_id,
222
                   "xccdf-id": self._parse_xccdf_id,
223
                   "xccdf-path": self._parse_content_path,
224
                   "cpe-path": self._parse_cpe_path,
225
                   "tailoring-path": self._parse_tailoring_path,
226
                   "fingerprint": self._parse_fingerprint,
227
                   "certificates": self._parse_certificates,
228
                   }
229
230
        line = line.strip()
231
        (pre, sep, post) = line.partition("=")
232
        pre = pre.strip()
233
        post = post.strip()
234
        post = post.strip('"')
235
236
        try:
237
            actions[pre](post)
238
        except KeyError:
239
            msg = "Unknown item '%s' for %s addon" % (line, self.name)
240
            raise KickstartParseError(msg)
241
242
    def finalize(self):
243
        """
244
        The finalize method that is called when the end of the %addon section
245
        (the %end line) is reached. It means no more kickstart data will come.
246
247
        """
248
249
        tmpl = "%s missing for the %s addon"
250
251
        # check provided data
252
        if not self.content_type:
253
            raise KickstartValueError(tmpl % ("content-type", self.name))
254
255
        if self.content_type != "scap-security-guide" and not self.content_url:
256
            raise KickstartValueError(tmpl % ("content-url", self.name))
257
258
        if not self.profile_id:
259
            self.profile_id = "default"
260
261
        if self.content_type in ("rpm", "archive") and not self.content_path:
262
            msg = "Path to the XCCDF file has to be given if content in RPM "\
263
                  "or archive is used"
264
            raise KickstartValueError(msg)
265
266
        if self.content_type == "rpm" and not self.content_url.endswith(".rpm"):
267
            msg = "Content type set to RPM, but the content URL doesn't end "\
268
                  "with '.rpm'"
269
            raise KickstartValueError(msg)
270
271
        if self.content_type == "archive":
272
            supported_archive = any(self.content_url.endswith(arch_type)
273
                                    for arch_type in SUPPORTED_ARCHIVES)
274
            if not supported_archive:
275
                msg = "Unsupported archive type of the content "\
276
                      "file '%s'" % self.content_url
277
                raise KickstartValueError(msg)
278
279
        # do some initialization magic in case of SSG
280
        if self.content_type == "scap-security-guide":
281
            if not common.ssg_available():
282
                msg = "SCAP Security Guide not found on the system"
283
                raise KickstartValueError(msg)
284
285
            self.content_path = common.SSG_DIR + common.SSG_CONTENT
286
287
    @property
288
    def content_defined(self):
289
        return self.content_url or self.content_type == "scap-security-guide"
290
291
    @property
292
    def content_name(self):
293
        if self.content_type == "scap-security-guide":
294
            raise ValueError("Using scap-security-guide, no single content file")
295
296
        rest = "/anonymous_content"
297
        for prefix in SUPPORTED_URL_PREFIXES:
298
            if self.content_url.startswith(prefix):
299
                rest = self.content_url[len(prefix):]
300
                break
301
302
        parts = rest.rsplit("/", 1)
303
        if len(parts) != 2:
304
            msg = "Unsupported url '%s' in the %s addon" % (self.content_url,
305
                                                            self.name)
306
            raise KickstartValueError(msg)
307
308
        return parts[1]
309
310
    @property
311
    def raw_preinst_content_path(self):
312
        """Path to the raw (unextracted, ...) pre-installation content file"""
313
314
        return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
315
                                self.content_name)
316
317
    @property
318
    def raw_postinst_content_path(self):
319
        """Path to the raw (unextracted, ...) post-installation content file"""
320
321
        return utils.join_paths(common.TARGET_CONTENT_DIR,
322
                                self.content_name)
323
324
    @property
325
    def preinst_content_path(self):
326
        """Path to the pre-installation content file"""
327
328
        if self.content_type == "datastream":
329
            return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
330
                                    self.content_name)
331
        elif self.content_type == "scap-security-guide":
332
            # SSG is not copied to the standard place
333
            return self.content_path
334
        else:
335
            return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
336
                                    self.content_path)
337
338
    @property
339
    def postinst_content_path(self):
340
        """Path to the post-installation content file"""
341
342
        if self.content_type == "datastream":
343
            return utils.join_paths(common.TARGET_CONTENT_DIR,
344
                                    self.content_name)
345
        elif self.content_type in ("rpm", "scap-security-guide"):
346
            # no path magic in case of RPM (SSG is installed as an RPM)
347
            return self.content_path
348
        else:
349
            return utils.join_paths(common.TARGET_CONTENT_DIR,
350
                                    self.content_path)
351
352
    @property
353
    def preinst_tailoring_path(self):
354
        """Path to the pre-installation tailoring file (if any)"""
355
356
        if not self.tailoring_path:
357
            return ""
358
359
        return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
360
                                self.tailoring_path)
361
362
    @property
363
    def postinst_tailoring_path(self):
364
        """Path to the post-installation tailoring file (if any)"""
365
366
        if not self.tailoring_path:
367
            return ""
368
369
        if self.content_type == "rpm":
370
            # no path magic in case of RPM
371
            return self.tailoring_path
372
373
        return utils.join_paths(common.TARGET_CONTENT_DIR,
374
                                self.tailoring_path)
375
376
    def get_preferred_content(self, content):
377
        if self.content_path:
378
            preferred_content = content.find_expected_usable_content(self.content_path)
379
        else:
380
            preferred_content = content.select_main_usable_content()
381
382
        if self.tailoring_path:
383
            if self.tailoring_path != str(content.tailoring.relative_to(content.root)):
384
                msg = f"Expected a tailoring {self.tailoring_path}, but it couldn't be found"
385
                raise content_handling.ContentHandlingError(msg)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable content_handling does not seem to be defined.
Loading history...
386
        return preferred_content
387
388
    def update_with_content(self, content):
389
        preferred_content = self.get_preferred_content(content)
390
391
        self.content_path = str(preferred_content.relative_to(content.root))
392
        if content.tailoring:
393
            self.tailoring_path = str(content.tailoring.relative_to(content.root))
394
395
    def _terminate(self, message):
396
        if flags.flags.automatedInstall and not flags.flags.ksprompt:
397
            # cannot have ask in a non-interactive kickstart
398
            # installation
399
            raise errors.CmdlineError(message)
400
401
        answ = errors.errorHandler.ui.showYesNoQuestion(msg)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable msg does not seem to be defined.
Loading history...
402
        if answ == errors.ERROR_CONTINUE:
403
            # prevent any futher actions here by switching to the dry
404
            # run mode and let things go on
405
            self.dry_run = True
406
            return
407
        else:
408
            # Let's sleep forever to prevent any further actions and
409
            # wait for the main thread to quit the process.
410
            progressQ.send_quit(1)
411
            while True:
412
                time.sleep(100000)
413
414
    def _handle_error(self, exception):
415
        log.error("Failed to fetch and initialize SCAP content!")
416
417
        if isinstance(exception, ContentCheckError):
418
            msg = _("The integrity check of the security content failed.\n" +
419
                    "The installation should be aborted. Do you wish to continue anyway?")
420
            self._terminate(msg)
421
        elif (isinstance(exception, common.OSCAPaddonError)
422
            or isinstance(exception, data_fetch.DataFetchError)):
423
            msg = _("There was an error fetching and loading the security content:\n" +
424
                    "%s\n" +
425
                    "The installation should be aborted. Do you wish to continue anyway?") % str(exception)
426
            self._terminate(msg)
427
428
        else:
429
            raise exception
430
431
    def setup(self, storage, ksdata, payload):
432
        """
433
        The setup method that should make changes to the runtime environment
434
        according to the data stored in this object.
435
436
        :param storage: object storing storage-related information
437
                        (disks, partitioning, bootloader, etc.)
438
        :type storage: blivet.Blivet instance
439
        :param ksdata: data parsed from the kickstart file and set in the
440
                       installation process
441
        :type ksdata: pykickstart.base.BaseHandler instance
442
443
        """
444
445
        if self.dry_run or not self.profile_id:
446
            # nothing more to be done in the dry-run mode or if no profile is
447
            # selected
448
            return
449
450
        thread_name = None
451
        if not os.path.exists(self.preinst_content_path) and not os.path.exists(self.raw_preinst_content_path):
452
            # content not available/fetched yet
453
            self.model.content_uri = self.content_url
454
            thread_name = self.model.fetch_content(self.certificates, self._handle_error)
455
456
        content = self.model.finish_content_fetch(thread_name, self.fingerprint, lambda msg: log.info(msg), self.raw_preinst_content_path, lambda content: content, self._handle_error)
457
458
        if not content:
459
            return
460
461
        try:
462
            # just check that preferred content exists
463
            _ = self.get_preferred_content(content)
464
        except Exception as exc:
465
            self._terminate(str(exc))
466
            return
467
468
        self.rule_data = rule_handling.get_rule_data_from_content(
469
            self.profile_id, self.preinst_content_path,
470
            self.datastream_id, self.xccdf_id, self.preinst_tailoring_path)
471
472
        # evaluate rules, do automatic fixes and stop if something that cannot
473
        # be fixed automatically is wrong
474
        fatal_messages = [message for message in self.rule_data.eval_rules(ksdata, storage)
475
                          if message.type == common.MESSAGE_TYPE_FATAL]
476
        if any(fatal_messages):
477
            msg = ["Wrong configuration detected!"]
478
            msg.extend(fatal_messages)
479
            msg.append("The installation should be aborted. Do you wish to continue anyway?")
480
            self._terminate("\n".join(msg))
481
            return
482
483
        # add packages needed on the target system to the list of packages
484
        # that are requested to be installed
485
        pkgs_to_install = list(REQUIRED_PACKAGES)
486
        if self.content_type == "scap-security-guide":
487
            pkgs_to_install.append("scap-security-guide")
488
        for pkg in pkgs_to_install:
489
            if pkg not in ksdata.packages.packageList:
490
                ksdata.packages.packageList.append(pkg)
491
492
    def execute(self, storage, ksdata, users, payload):
493
        """
494
        The execute method that should make changes to the installed system. It
495
        is called only once in the post-install setup phase.
496
497
        :see: setup
498
        :param users: information about created users
499
        :type users: pyanaconda.users.Users instance
500
501
        """
502
503
        if self.dry_run or not self.profile_id:
504
            # nothing more to be done in the dry-run mode or if no profile is
505
            # selected
506
            return
507
508
        target_content_dir = utils.join_paths(conf.target.system_root,
509
                                              common.TARGET_CONTENT_DIR)
510
        utils.ensure_dir_exists(target_content_dir)
511
512
        if self.content_type == "datastream":
513
            shutil.copy2(self.preinst_content_path, target_content_dir)
514
        elif self.content_type == "rpm":
515
            # copy the RPM to the target system
516
            shutil.copy2(self.raw_preinst_content_path, target_content_dir)
517
518
            # and install it with yum
519
            ret = util.execInSysroot("yum", ["-y", "--nogpg", "install",
520
                                             self.raw_postinst_content_path])
521
            if ret != 0:
522
                raise common.ExtractionError("Failed to install content "
523
                                             "RPM to the target system")
524
        elif self.content_type == "scap-security-guide":
525
            # nothing needed
526
            pass
527
        else:
528
            utils.universal_copy(utils.join_paths(common.INSTALLATION_CONTENT_DIR,
529
                                                  "*"),
530
                                 target_content_dir)
531
        if os.path.exists(self.preinst_tailoring_path):
532
            shutil.copy2(self.preinst_tailoring_path, target_content_dir)
533
534
        common.run_oscap_remediate(self.profile_id, self.postinst_content_path,
535
                                   self.datastream_id, self.xccdf_id,
536
                                   self.postinst_tailoring_path,
537
                                   chroot=conf.target.system_root)
538
539
    def clear_all(self):
540
        """Clear all the stored values."""
541
542
        self.__init__(self.name, just_clear=True)
543