Passed
Pull Request — rhel8-branch (#148)
by Matěj
01:05
created

org_fedora_oscap.ks.oscap.OSCAPdata._terminate()   A

Complexity

Conditions 5

Size

Total Lines 20
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 20
rs 9.3333
c 0
b 0
f 0
cc 5
nop 2
1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""Module with the OSCAPdata class."""
22
23
import shutil
24
import re
25
import os
26
import time
27
import logging
28
import pathlib
29
30
from pyanaconda.addons import AddonData
31
from pyanaconda.core.configuration.anaconda import conf
32
from pyanaconda.progress import progressQ
33
from pyanaconda import errors
34
from pyanaconda.core import util
35
from pyanaconda import flags
36
from pykickstart.errors import KickstartParseError, KickstartValueError
37
from org_fedora_oscap import utils, common, rule_handling, data_fetch
38
from org_fedora_oscap.common import SUPPORTED_ARCHIVES, _
39
from org_fedora_oscap.content_handling import ContentCheckError, ContentHandlingError
40
from org_fedora_oscap import content_discovery
41
42
log = logging.getLogger("anaconda")
43
44
# export OSCAPdata class to prevent Anaconda's collect method from taking
45
# AddonData class instead of the OSCAPdata class
46
# @see: pyanaconda.kickstart.AnacondaKSHandler.__init__
47
__all__ = ["OSCAPdata"]
48
49
SUPPORTED_CONTENT_TYPES = ("datastream", "rpm", "archive",
50
                           "scap-security-guide",
51
                           )
52
53
SUPPORTED_URL_PREFIXES = ("http://", "https://", "ftp://", "file://"
54
                          # LABEL:?, hdaX:?,
55
                          )
56
57
REQUIRED_PACKAGES = ("openscap", "openscap-scanner", )
58
59
FINGERPRINT_REGEX = re.compile(r'^[a-z0-9]+$')
60
61
62
class MisconfigurationError(common.OSCAPaddonError):
63
    """Exception for reporting misconfiguration."""
64
65
    pass
66
67
68
class OSCAPdata(AddonData):
69
    """
70
    Class parsing and storing data for the OSCAP addon.
71
72
    :see: pyanaconda.addons.AddonData
73
74
    """
75
76
    def __init__(self, name, just_clear=False):
77
        """
78
        :param name: name of the addon
79
        :type name: str
80
81
        """
82
83
        if not just_clear:
84
            # do not call the parent's __init__ more than once
85
            AddonData.__init__(self, name)
86
87
        # values specifying the content
88
        self.content_type = ""
89
        self.content_url = ""
90
        self.datastream_id = ""
91
        self.xccdf_id = ""
92
        self.profile_id = ""
93
        self.content_path = ""
94
        self.cpe_path = ""
95
        self.tailoring_path = ""
96
97
        # additional values
98
        self.fingerprint = ""
99
100
        # certificate to verify HTTPS connection or signed data
101
        self.certificates = ""
102
103
        # internal values
104
        self.rule_data = rule_handling.RuleData()
105
        self.dry_run = False
106
107
        self.content_bringer = content_discovery.ContentBringer(self)
108
109
    def __str__(self):
110
        """
111
        What should end up in the resulting kickstart file, i.e. string
112
        representation of the stored data.
113
114
        """
115
116
        if self.dry_run or not self.profile_id:
117
            # the addon was run in the dry run mode, omit it from the kickstart
118
            return ""
119
120
        def key_value_pair(key, value, indent=4):
121
            return "%s%s = %s" % (indent * " ", key, value)
122
123
        ret = "%%addon %s" % self.name
124
        ret += "\n%s" % key_value_pair("content-type", self.content_type)
125
126
        if self.content_url:
127
            ret += "\n%s" % key_value_pair("content-url", self.content_url)
128
        if self.datastream_id:
129
            ret += "\n%s" % key_value_pair("datastream-id", self.datastream_id)
130
        if self.xccdf_id:
131
            ret += "\n%s" % key_value_pair("xccdf-id", self.xccdf_id)
132
        if self.content_path and self.content_type != "scap-security-guide":
133
            ret += "\n%s" % key_value_pair("content-path", self.content_path)
134
        if self.cpe_path:
135
            ret += "\n%s" % key_value_pair("cpe-path", self.cpe_path)
136
        if self.tailoring_path:
137
            ret += "\n%s" % key_value_pair("tailoring-path",
138
                                           self.tailoring_path)
139
140
        ret += "\n%s" % key_value_pair("profile", self.profile_id)
141
142
        if self.fingerprint:
143
            ret += "\n%s" % key_value_pair("fingerprint", self.fingerprint)
144
145
        if self.certificates:
146
            ret += "\n%s" % key_value_pair("certificates", self.certificates)
147
148
        ret += "\n%end\n\n"
149
        return ret
150
151
    def _parse_content_type(self, value):
152
        value_low = value.lower()
153
        if value_low in SUPPORTED_CONTENT_TYPES:
154
            self.content_type = value_low
155
        else:
156
            msg = "Unsupported content type '%s' in the %s addon" % (value,
157
                                                                     self.name)
158
            raise KickstartValueError(msg)
159
160
    def _parse_content_url(self, value):
161
        if any(value.startswith(prefix)
162
               for prefix in SUPPORTED_URL_PREFIXES):
163
            self.content_url = value
164
        else:
165
            msg = "Unsupported url '%s' in the %s addon" % (value, self.name)
166
            raise KickstartValueError(msg)
167
168
    def _parse_datastream_id(self, value):
169
        # need to be checked?
170
        self.datastream_id = value
171
172
    def _parse_xccdf_id(self, value):
173
        # need to be checked?
174
        self.xccdf_id = value
175
176
    def _parse_profile_id(self, value):
177
        # need to be checked?
178
        self.profile_id = value
179
180
    def _parse_content_path(self, value):
181
        # need to be checked?
182
        self.content_path = value
183
184
    def _parse_cpe_path(self, value):
185
        # need to be checked?
186
        self.cpe_path = value
187
188
    def _parse_tailoring_path(self, value):
189
        # need to be checked?
190
        self.tailoring_path = value
191
192
    def _parse_fingerprint(self, value):
193
        if FINGERPRINT_REGEX.match(value) is None:
194
            msg = "Unsupported or invalid fingerprint"
195
            raise KickstartValueError(msg)
196
197
        if utils.get_hashing_algorithm(value) is None:
198
            msg = "Unsupported fingerprint"
199
            raise KickstartValueError(msg)
200
201
        self.fingerprint = value
202
203
    def _parse_certificates(self, value):
204
        self.certificates = value
205
206
    def handle_line(self, line):
207
        """
208
        The handle_line method that is called with every line from this addon's
209
        %addon section of the kickstart file.
210
211
        :param line: a single line from the %addon section
212
        :type line: str
213
214
        """
215
216
        actions = {"content-type": self._parse_content_type,
217
                   "content-url": self._parse_content_url,
218
                   "content-path": self._parse_content_path,
219
                   "datastream-id": self._parse_datastream_id,
220
                   "profile": self._parse_profile_id,
221
                   "xccdf-id": self._parse_xccdf_id,
222
                   "xccdf-path": self._parse_content_path,
223
                   "cpe-path": self._parse_cpe_path,
224
                   "tailoring-path": self._parse_tailoring_path,
225
                   "fingerprint": self._parse_fingerprint,
226
                   "certificates": self._parse_certificates,
227
                   }
228
229
        line = line.strip()
230
        (pre, sep, post) = line.partition("=")
231
        pre = pre.strip()
232
        post = post.strip()
233
        post = post.strip('"')
234
235
        try:
236
            actions[pre](post)
237
        except KeyError:
238
            msg = "Unknown item '%s' for %s addon" % (line, self.name)
239
            raise KickstartParseError(msg)
240
241
    def finalize(self):
242
        """
243
        The finalize method that is called when the end of the %addon section
244
        (the %end line) is reached. It means no more kickstart data will come.
245
246
        """
247
248
        tmpl = "%s missing for the %s addon"
249
250
        # check provided data
251
        if not self.content_type:
252
            raise KickstartValueError(tmpl % ("content-type", self.name))
253
254
        if self.content_type != "scap-security-guide" and not self.content_url:
255
            raise KickstartValueError(tmpl % ("content-url", self.name))
256
257
        if not self.profile_id:
258
            self.profile_id = "default"
259
260
        if self.content_type in ("rpm", "archive") and not self.content_path:
261
            msg = "Path to the XCCDF file has to be given if content in RPM "\
262
                  "or archive is used"
263
            raise KickstartValueError(msg)
264
265
        if self.content_type == "rpm" and not self.content_url.endswith(".rpm"):
266
            msg = "Content type set to RPM, but the content URL doesn't end "\
267
                  "with '.rpm'"
268
            raise KickstartValueError(msg)
269
270
        if self.content_type == "archive":
271
            supported_archive = any(self.content_url.endswith(arch_type)
272
                                    for arch_type in SUPPORTED_ARCHIVES)
273
            if not supported_archive:
274
                msg = "Unsupported archive type of the content "\
275
                      "file '%s'" % self.content_url
276
                raise KickstartValueError(msg)
277
278
        # do some initialization magic in case of SSG
279
        if self.content_type == "scap-security-guide":
280
            if not common.ssg_available():
281
                msg = "SCAP Security Guide not found on the system"
282
                raise KickstartValueError(msg)
283
284
            self.content_path = common.SSG_DIR + common.SSG_CONTENT
285
286
    @property
287
    def content_defined(self):
288
        return self.content_url or self.content_type == "scap-security-guide"
289
290
    @property
291
    def content_name(self):
292
        if self.content_type == "scap-security-guide":
293
            raise ValueError("Using scap-security-guide, no single content file")
294
295
        rest = "/anonymous_content"
296
        for prefix in SUPPORTED_URL_PREFIXES:
297
            if self.content_url.startswith(prefix):
298
                rest = self.content_url[len(prefix):]
299
                break
300
301
        parts = rest.rsplit("/", 1)
302
        if len(parts) != 2:
303
            msg = "Unsupported url '%s' in the %s addon" % (self.content_url,
304
                                                            self.name)
305
            raise KickstartValueError(msg)
306
307
        return parts[1]
308
309
    @property
310
    def raw_preinst_content_path(self):
311
        """Path to the raw (unextracted, ...) pre-installation content file"""
312
313
        return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
314
                                self.content_name)
315
316
    @property
317
    def raw_postinst_content_path(self):
318
        """Path to the raw (unextracted, ...) post-installation content file"""
319
320
        return utils.join_paths(common.TARGET_CONTENT_DIR,
321
                                self.content_name)
322
323
    @property
324
    def preinst_content_path(self):
325
        """Path to the pre-installation content file"""
326
327
        if self.content_type == "datastream":
328
            return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
329
                                    self.content_name)
330
        elif self.content_type == "scap-security-guide":
331
            # SSG is not copied to the standard place
332
            return self.content_path
333
        else:
334
            return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
335
                                    self.content_path)
336
337
    @property
338
    def postinst_content_path(self):
339
        """Path to the post-installation content file"""
340
341
        if self.content_type == "datastream":
342
            return utils.join_paths(common.TARGET_CONTENT_DIR,
343
                                    self.content_name)
344
        elif self.content_type in ("rpm", "scap-security-guide"):
345
            # no path magic in case of RPM (SSG is installed as an RPM)
346
            return self.content_path
347
        else:
348
            return utils.join_paths(common.TARGET_CONTENT_DIR,
349
                                    self.content_path)
350
351
    @property
352
    def preinst_tailoring_path(self):
353
        """Path to the pre-installation tailoring file (if any)"""
354
355
        if not self.tailoring_path:
356
            return ""
357
358
        return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
359
                                self.tailoring_path)
360
361
    @property
362
    def postinst_tailoring_path(self):
363
        """Path to the post-installation tailoring file (if any)"""
364
365
        if not self.tailoring_path:
366
            return ""
367
368
        if self.content_type == "rpm":
369
            # no path magic in case of RPM
370
            return self.tailoring_path
371
372
        return utils.join_paths(common.TARGET_CONTENT_DIR,
373
                                self.tailoring_path)
374
375
    def _terminate(self, message):
376
        message += "\n" + _("The installation should be aborted.")
377
        message += " " + _("Do you wish to continue anyway?")
378
        if flags.flags.automatedInstall and not flags.flags.ksprompt:
379
            # cannot have ask in a non-interactive kickstart
380
            # installation
381
            raise errors.CmdlineError(message)
382
383
        answ = errors.errorHandler.ui.showYesNoQuestion(message)
384
        if answ == errors.ERROR_CONTINUE:
385
            # prevent any futher actions here by switching to the dry
386
            # run mode and let things go on
387
            self.dry_run = True
388
            return
389
        else:
390
            # Let's sleep forever to prevent any further actions and
391
            # wait for the main thread to quit the process.
392
            progressQ.send_quit(1)
393
            while True:
394
                time.sleep(100000)
395
396
    def _handle_error(self, exception):
397
        log.error("Failed to fetch and initialize SCAP content!")
398
399
        if isinstance(exception, ContentCheckError):
400
            msg = _("The integrity check of the security content failed.")
401
            self._terminate(msg)
402
        elif (isinstance(exception, common.OSCAPaddonError)
403
            or isinstance(exception, data_fetch.DataFetchError)):
404
            msg = _("There was an error fetching and loading the security content:\n" +
405
                    f"{str(exception)}")
406
            self._terminate(msg)
407
408
        else:
409
            msg = _("There was an unexpected problem with the supplied content.")
410
            self._terminate(msg)
411
412
    def setup(self, storage, ksdata, payload):
413
        """
414
        The setup method that should make changes to the runtime environment
415
        according to the data stored in this object.
416
417
        :param storage: object storing storage-related information
418
                        (disks, partitioning, bootloader, etc.)
419
        :type storage: blivet.Blivet instance
420
        :param ksdata: data parsed from the kickstart file and set in the
421
                       installation process
422
        :type ksdata: pykickstart.base.BaseHandler instance
423
424
        """
425
426
        if self.dry_run or not self.profile_id:
427
            # nothing more to be done in the dry-run mode or if no profile is
428
            # selected
429
            return
430
431
        thread_name = None
432
        if not os.path.exists(self.preinst_content_path) and not os.path.exists(self.raw_preinst_content_path):
433
            # content not available/fetched yet
434
            thread_name = self.content_bringer.fetch_content(self._handle_error, self.certificates)
435
436
        content_dest = None
437
        if self.content_type != "scap-security-guide":
438
            content_dest = self.raw_preinst_content_path
439
440
        content = self.content_bringer.finish_content_fetch(
441
            thread_name, self.fingerprint, lambda msg: log.info(msg), content_dest, self._handle_error)
442
443
        if not content:
444
            return
445
446
        try:
447
            # just check that preferred content exists
448
            _ = self.content_bringer.get_preferred_content(content)
449
        except Exception as exc:
450
            self._terminate(str(exc))
451
            return
452
453
        self.rule_data = rule_handling.get_rule_data_from_content(
454
            self.profile_id, self.preinst_content_path,
455
            self.datastream_id, self.xccdf_id, self.preinst_tailoring_path)
456
457
        # evaluate rules, do automatic fixes and stop if something that cannot
458
        # be fixed automatically is wrong
459
        fatal_messages = [message for message in self.rule_data.eval_rules(ksdata, storage)
460
                          if message.type == common.MESSAGE_TYPE_FATAL]
461
        if any(fatal_messages):
462
            msg_lines = [_("Wrong configuration detected!")]
463
            msg_lines.extend(fatal_messages)
464
            self._terminate("\n".join(msg_lines))
465
            return
466
467
        # add packages needed on the target system to the list of packages
468
        # that are requested to be installed
469
        pkgs_to_install = list(REQUIRED_PACKAGES)
470
        if self.content_type == "scap-security-guide":
471
            pkgs_to_install.append("scap-security-guide")
472
        for pkg in pkgs_to_install:
473
            if pkg not in ksdata.packages.packageList:
474
                ksdata.packages.packageList.append(pkg)
475
476
    def execute(self, storage, ksdata, users, payload):
477
        """
478
        The execute method that should make changes to the installed system. It
479
        is called only once in the post-install setup phase.
480
481
        :see: setup
482
        :param users: information about created users
483
        :type users: pyanaconda.users.Users instance
484
485
        """
486
487
        if self.dry_run or not self.profile_id:
488
            # nothing more to be done in the dry-run mode or if no profile is
489
            # selected
490
            return
491
492
        target_content_dir = utils.join_paths(conf.target.system_root,
493
                                              common.TARGET_CONTENT_DIR)
494
        utils.ensure_dir_exists(target_content_dir)
495
496
        if self.content_type == "datastream":
497
            shutil.copy2(self.preinst_content_path, target_content_dir)
498
        elif self.content_type == "rpm":
499
            # copy the RPM to the target system
500
            shutil.copy2(self.raw_preinst_content_path, target_content_dir)
501
502
            # and install it with yum
503
            ret = util.execInSysroot("yum", ["-y", "--nogpg", "install",
504
                                             self.raw_postinst_content_path])
505
            if ret != 0:
506
                raise common.ExtractionError("Failed to install content "
507
                                             "RPM to the target system")
508
        elif self.content_type == "scap-security-guide":
509
            # nothing needed
510
            pass
511
        else:
512
            utils.universal_copy(utils.join_paths(common.INSTALLATION_CONTENT_DIR,
513
                                                  "*"),
514
                                 target_content_dir)
515
        if os.path.exists(self.preinst_tailoring_path):
516
            shutil.copy2(self.preinst_tailoring_path, target_content_dir)
517
518
        common.run_oscap_remediate(self.profile_id, self.postinst_content_path,
519
                                   self.datastream_id, self.xccdf_id,
520
                                   self.postinst_tailoring_path,
521
                                   chroot=conf.target.system_root)
522
523
    def clear_all(self):
524
        """Clear all the stored values."""
525
526
        self.__init__(self.name, just_clear=True)
527