Passed
Pull Request — rhel8-branch (#190)
by Matěj
01:54
created

OSCAPdata._parse_remediate()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""Module with the OSCAPdata class."""
22
23
import shutil
24
import re
25
import os
26
import time
27
import logging
28
import pathlib
29
30
from pyanaconda.addons import AddonData
31
from pyanaconda.core.configuration.anaconda import conf
32
from pyanaconda.progress import progressQ
33
from pyanaconda import errors
34
from pyanaconda.core import util
35
from pyanaconda import flags
36
from pykickstart.errors import KickstartParseError, KickstartValueError
37
from org_fedora_oscap import utils, common, rule_handling, data_fetch
38
from org_fedora_oscap.common import SUPPORTED_ARCHIVES, _
39
from org_fedora_oscap.content_handling import ContentCheckError, ContentHandlingError
40
from org_fedora_oscap import content_discovery
41
42
log = logging.getLogger("anaconda")
43
44
# export OSCAPdata class to prevent Anaconda's collect method from taking
45
# AddonData class instead of the OSCAPdata class
46
# @see: pyanaconda.kickstart.AnacondaKSHandler.__init__
47
__all__ = ["OSCAPdata"]
48
49
SUPPORTED_CONTENT_TYPES = ("datastream", "rpm", "archive",
50
                           "scap-security-guide",
51
                           )
52
53
SUPPORTED_URL_PREFIXES = ("http://", "https://", "ftp://", "file://"
54
                          # LABEL:?, hdaX:?,
55
                          )
56
57
REQUIRED_PACKAGES = ("openscap", "openscap-scanner", "openscap-utils",)
58
59
FINGERPRINT_REGEX = re.compile(r'^[a-z0-9]+$')
60
61
62
class MisconfigurationError(common.OSCAPaddonError):
63
    """Exception for reporting misconfiguration."""
64
65
    pass
66
67
68
class OSCAPdata(AddonData):
69
    """
70
    Class parsing and storing data for the OSCAP addon.
71
72
    :see: pyanaconda.addons.AddonData
73
74
    """
75
76
    def __init__(self, name, just_clear=False):
77
        """
78
        :param name: name of the addon
79
        :type name: str
80
81
        """
82
83
        if not just_clear:
84
            # do not call the parent's __init__ more than once
85
            AddonData.__init__(self, name)
86
87
        # values specifying the content
88
        self.content_type = ""
89
        self.content_url = ""
90
        self.datastream_id = ""
91
        self.xccdf_id = ""
92
        self.profile_id = ""
93
        self.content_path = ""
94
        self.cpe_path = ""
95
        self.tailoring_path = ""
96
97
        # additional values
98
        self.fingerprint = ""
99
100
        # certificate to verify HTTPS connection or signed data
101
        self.certificates = ""
102
103
        # What remediation(s) to execute
104
        self.remediate = ""
105
106
        # internal values
107
        self.rule_data = rule_handling.RuleData()
108
        self.dry_run = False
109
110
        self.content_bringer = content_discovery.ContentBringer(self)
111
112
    def __str__(self):
113
        """
114
        What should end up in the resulting kickstart file, i.e. string
115
        representation of the stored data.
116
117
        """
118
119
        if self.dry_run or not self.profile_id:
120
            # the addon was run in the dry run mode, omit it from the kickstart
121
            return ""
122
123
        def key_value_pair(key, value, indent=4):
124
            return "%s%s = %s" % (indent * " ", key, value)
125
126
        ret = "%%addon %s" % self.name
127
        ret += "\n%s" % key_value_pair("content-type", self.content_type)
128
129
        if self.content_url:
130
            ret += "\n%s" % key_value_pair("content-url", self.content_url)
131
        if self.datastream_id:
132
            ret += "\n%s" % key_value_pair("datastream-id", self.datastream_id)
133
        if self.xccdf_id:
134
            ret += "\n%s" % key_value_pair("xccdf-id", self.xccdf_id)
135
        if self.content_path and self.content_type != "scap-security-guide":
136
            ret += "\n%s" % key_value_pair("content-path", self.content_path)
137
        if self.cpe_path:
138
            ret += "\n%s" % key_value_pair("cpe-path", self.cpe_path)
139
        if self.tailoring_path:
140
            ret += "\n%s" % key_value_pair("tailoring-path",
141
                                           self.tailoring_path)
142
143
        ret += "\n%s" % key_value_pair("profile", self.profile_id)
144
145
        if self.fingerprint:
146
            ret += "\n%s" % key_value_pair("fingerprint", self.fingerprint)
147
148
        if self.certificates:
149
            ret += "\n%s" % key_value_pair("certificates", self.certificates)
150
151
        if self.remediate:
152
            ret += "\n%s" % key_value_pair("remediate", self.remediate)
153
154
        ret += "\n%end\n\n"
155
        return ret
156
157
    def _parse_content_type(self, value):
158
        value_low = value.lower()
159
        if value_low in SUPPORTED_CONTENT_TYPES:
160
            self.content_type = value_low
161
        else:
162
            msg = "Unsupported content type '%s' in the %s addon" % (value,
163
                                                                     self.name)
164
            raise KickstartValueError(msg)
165
166
    def _parse_content_url(self, value):
167
        if any(value.startswith(prefix)
168
               for prefix in SUPPORTED_URL_PREFIXES):
169
            self.content_url = value
170
        else:
171
            msg = "Unsupported url '%s' in the %s addon" % (value, self.name)
172
            raise KickstartValueError(msg)
173
174
    def _parse_datastream_id(self, value):
175
        # need to be checked?
176
        self.datastream_id = value
177
178
    def _parse_xccdf_id(self, value):
179
        # need to be checked?
180
        self.xccdf_id = value
181
182
    def _parse_profile_id(self, value):
183
        # need to be checked?
184
        self.profile_id = value
185
186
    def _parse_content_path(self, value):
187
        # need to be checked?
188
        self.content_path = value
189
190
    def _parse_cpe_path(self, value):
191
        # need to be checked?
192
        self.cpe_path = value
193
194
    def _parse_tailoring_path(self, value):
195
        # need to be checked?
196
        self.tailoring_path = value
197
198
    def _parse_fingerprint(self, value):
199
        if FINGERPRINT_REGEX.match(value) is None:
200
            msg = "Unsupported or invalid fingerprint"
201
            raise KickstartValueError(msg)
202
203
        if utils.get_hashing_algorithm(value) is None:
204
            msg = "Unsupported fingerprint"
205
            raise KickstartValueError(msg)
206
207
        self.fingerprint = value
208
209
    def _parse_certificates(self, value):
210
        self.certificates = value
211
212
    def _parse_remediate(self, value):
213
        assert value in ("none", "post", "firstboot", "both")
214
        self.remediate = value
215
216
    def handle_line(self, line):
217
        """
218
        The handle_line method that is called with every line from this addon's
219
        %addon section of the kickstart file.
220
221
        :param line: a single line from the %addon section
222
        :type line: str
223
224
        """
225
226
        actions = {"content-type": self._parse_content_type,
227
                   "content-url": self._parse_content_url,
228
                   "content-path": self._parse_content_path,
229
                   "datastream-id": self._parse_datastream_id,
230
                   "profile": self._parse_profile_id,
231
                   "xccdf-id": self._parse_xccdf_id,
232
                   "xccdf-path": self._parse_content_path,
233
                   "cpe-path": self._parse_cpe_path,
234
                   "tailoring-path": self._parse_tailoring_path,
235
                   "fingerprint": self._parse_fingerprint,
236
                   "certificates": self._parse_certificates,
237
                   "remediate": self._parse_remediate,
238
                   }
239
240
        line = line.strip()
241
        (pre, sep, post) = line.partition("=")
242
        pre = pre.strip()
243
        post = post.strip()
244
        post = post.strip('"')
245
246
        try:
247
            actions[pre](post)
248
        except KeyError:
249
            msg = "Unknown item '%s' for %s addon" % (line, self.name)
250
            raise KickstartParseError(msg)
251
252
    def finalize(self):
253
        """
254
        The finalize method that is called when the end of the %addon section
255
        (the %end line) is reached. It means no more kickstart data will come.
256
257
        """
258
259
        tmpl = "%s missing for the %s addon"
260
261
        # check provided data
262
        if not self.content_type:
263
            raise KickstartValueError(tmpl % ("content-type", self.name))
264
265
        if self.content_type != "scap-security-guide" and not self.content_url:
266
            raise KickstartValueError(tmpl % ("content-url", self.name))
267
268
        if not self.profile_id:
269
            self.profile_id = "default"
270
271
        if self.content_type in ("rpm", "archive") and not self.content_path:
272
            msg = "Path to the XCCDF file has to be given if content in RPM "\
273
                  "or archive is used"
274
            raise KickstartValueError(msg)
275
276
        if self.content_type == "rpm" and not self.content_url.endswith(".rpm"):
277
            msg = "Content type set to RPM, but the content URL doesn't end "\
278
                  "with '.rpm'"
279
            raise KickstartValueError(msg)
280
281
        if self.content_type == "archive":
282
            supported_archive = any(self.content_url.endswith(arch_type)
283
                                    for arch_type in SUPPORTED_ARCHIVES)
284
            if not supported_archive:
285
                msg = "Unsupported archive type of the content "\
286
                      "file '%s'" % self.content_url
287
                raise KickstartValueError(msg)
288
289
        # do some initialization magic in case of SSG
290
        if self.content_type == "scap-security-guide":
291
            if not common.ssg_available():
292
                msg = "SCAP Security Guide not found on the system"
293
                raise KickstartValueError(msg)
294
295
            self.content_path = common.SSG_DIR + common.SSG_CONTENT
296
297
    @property
298
    def content_defined(self):
299
        return self.content_url or self.content_type == "scap-security-guide"
300
301
    @property
302
    def content_name(self):
303
        if self.content_type == "scap-security-guide":
304
            raise ValueError("Using scap-security-guide, no single content file")
305
306
        rest = "/anonymous_content"
307
        for prefix in SUPPORTED_URL_PREFIXES:
308
            if self.content_url.startswith(prefix):
309
                rest = self.content_url[len(prefix):]
310
                break
311
312
        parts = rest.rsplit("/", 1)
313
        if len(parts) != 2:
314
            msg = "Unsupported url '%s' in the %s addon" % (self.content_url,
315
                                                            self.name)
316
            raise KickstartValueError(msg)
317
318
        return parts[1]
319
320
    @property
321
    def raw_preinst_content_path(self):
322
        """Path to the raw (unextracted, ...) pre-installation content file"""
323
324
        return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
325
                                self.content_name)
326
327
    @property
328
    def raw_postinst_content_path(self):
329
        """Path to the raw (unextracted, ...) post-installation content file"""
330
331
        return utils.join_paths(common.TARGET_CONTENT_DIR,
332
                                self.content_name)
333
334
    @property
335
    def preinst_content_path(self):
336
        """Path to the pre-installation content file"""
337
338
        if self.content_type == "datastream":
339
            return self.raw_preinst_content_path
340
        elif self.content_type == "scap-security-guide":
341
            # SSG is not copied to the standard place
342
            return self.content_path
343
        else:
344
            return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
345
                                    self.content_path)
346
347
    @property
348
    def postinst_content_path(self):
349
        """Path to the post-installation content file"""
350
351
        if self.content_type == "datastream":
352
            return utils.join_paths(common.TARGET_CONTENT_DIR,
353
                                    self.content_name)
354
        elif self.content_type in ("rpm", "scap-security-guide"):
355
            # no path magic in case of RPM (SSG is installed as an RPM)
356
            return self.content_path
357
        else:
358
            return utils.join_paths(common.TARGET_CONTENT_DIR,
359
                                    self.content_path)
360
361
    @property
362
    def preinst_tailoring_path(self):
363
        """Path to the pre-installation tailoring file (if any)"""
364
365
        if not self.tailoring_path:
366
            return ""
367
368
        return utils.join_paths(common.INSTALLATION_CONTENT_DIR,
369
                                self.tailoring_path)
370
371
    @property
372
    def postinst_tailoring_path(self):
373
        """Path to the post-installation tailoring file (if any)"""
374
375
        if not self.tailoring_path:
376
            return ""
377
378
        if self.content_type == "rpm":
379
            # no path magic in case of RPM
380
            return self.tailoring_path
381
382
        return utils.join_paths(common.TARGET_CONTENT_DIR,
383
                                self.tailoring_path)
384
385
    def _terminate(self, message):
386
        if flags.flags.automatedInstall and not flags.flags.ksprompt:
387
            # cannot have ask in a non-interactive kickstart
388
            # installation
389
            message += "\n" + _("Aborting the installation.")
390
            raise errors.CmdlineError(message)
391
392
        message += "\n" + _("The installation should be aborted.")
393
        message += " " + _("Do you wish to continue anyway?")
394
        answ = errors.errorHandler.ui.showYesNoQuestion(message)
395
        if answ == errors.ERROR_CONTINUE:
396
            # prevent any futher actions here by switching to the dry
397
            # run mode and let things go on
398
            self.dry_run = True
399
            return
400
        else:
401
            # Let's sleep forever to prevent any further actions and
402
            # wait for the main thread to quit the process.
403
            progressQ.send_quit(1)
404
            while True:
405
                time.sleep(100000)
406
407
    def _handle_error(self, exception):
408
        log.error(f"Failed to fetch and initialize SCAP content: {str(exception)}")
409
410
        if isinstance(exception, ContentCheckError):
411
            msg = _("The integrity check of the security content failed.")
412
            self._terminate(msg)
413
        elif (isinstance(exception, common.OSCAPaddonError)
414
            or isinstance(exception, data_fetch.DataFetchError)):
415
            msg = _("There was an error fetching and loading the security content:\n" +
416
                    f"{str(exception)}")
417
            self._terminate(msg)
418
419
        else:
420
            msg = _("There was an unexpected problem with the supplied content.")
421
            self._terminate(msg)
422
423
    def setup(self, storage, ksdata, payload):
424
        """
425
        The setup method that should make changes to the runtime environment
426
        according to the data stored in this object.
427
428
        :param storage: object storing storage-related information
429
                        (disks, partitioning, bootloader, etc.)
430
        :type storage: blivet.Blivet instance
431
        :param ksdata: data parsed from the kickstart file and set in the
432
                       installation process
433
        :type ksdata: pykickstart.base.BaseHandler instance
434
435
        """
436
437
        if self.dry_run or not self.profile_id:
438
            # nothing more to be done in the dry-run mode or if no profile is
439
            # selected
440
            return
441
442
        thread_name = None
443
        if not os.path.exists(self.preinst_content_path) and not os.path.exists(self.raw_preinst_content_path):
444
            # content not available/fetched yet
445
            thread_name = self.content_bringer.fetch_content(self._handle_error, self.certificates)
446
447
        content_dest = None
448
        if self.content_type != "scap-security-guide":
449
            content_dest = self.raw_preinst_content_path
450
451
        content = self.content_bringer.finish_content_fetch(
452
            thread_name, self.fingerprint, lambda msg: log.info(msg), content_dest, self._handle_error)
453
454
        if not content:
455
            return
456
457
        try:
458
            # just check that preferred content exists
459
            self.content_bringer.get_preferred_content(content)
460
        except Exception as exc:
461
            self._terminate(str(exc))
462
            return
463
464
        self.rule_data = rule_handling.get_rule_data_from_content(
465
            self.profile_id, self.preinst_content_path,
466
            self.datastream_id, self.xccdf_id, self.preinst_tailoring_path)
467
468
        # evaluate rules, do automatic fixes and stop if something that cannot
469
        # be fixed automatically is wrong
470
        fatal_messages = [message for message in self.rule_data.eval_rules(ksdata, storage)
471
                          if message.type == common.MESSAGE_TYPE_FATAL]
472
        if any(fatal_messages):
473
            msg_lines = [_("Wrong configuration detected!")]
474
            msg_lines.extend([m.text for m in fatal_messages])
475
            self._terminate("\n".join(msg_lines))
476
            return
477
478
        # add packages needed on the target system to the list of packages
479
        # that are requested to be installed
480
        pkgs_to_install = list(REQUIRED_PACKAGES)
481
        if self.content_type == "scap-security-guide":
482
            pkgs_to_install.append("scap-security-guide")
483
        for pkg in pkgs_to_install:
484
            if pkg not in ksdata.packages.packageList:
485
                ksdata.packages.packageList.append(pkg)
486
487
    def execute(self, storage, ksdata, users, payload):
488
        """
489
        The execute method that should make changes to the installed system. It
490
        is called only once in the post-install setup phase.
491
492
        :see: setup
493
        :param users: information about created users
494
        :type users: pyanaconda.users.Users instance
495
496
        """
497
498
        if self.dry_run or not self.profile_id:
499
            # nothing more to be done in the dry-run mode or if no profile is
500
            # selected
501
            return
502
503
        try:
504
            common.assert_scanner_works(
505
                chroot=conf.target.system_root, executable="oscap")
506
        except Exception as exc:
507
            msg_lines = [_(
508
                "The 'oscap' scanner doesn't work in the installed system: {error}"
509
                .format(error=str(exc)))]
510
            msg_lines.append(_("As a result, the installed system can't be hardened."))
511
            self._terminate("\n".join(msg_lines))
512
            return
513
514
        target_content_dir = utils.join_paths(conf.target.system_root,
515
                                              common.TARGET_CONTENT_DIR)
516
        utils.ensure_dir_exists(target_content_dir)
517
518
        if self.content_type == "datastream":
519
            shutil.copy2(self.preinst_content_path, target_content_dir)
520
        elif self.content_type == "rpm":
521
            # copy the RPM to the target system
522
            shutil.copy2(self.raw_preinst_content_path, target_content_dir)
523
524
            # and install it with yum
525
            ret = util.execInSysroot("yum", ["-y", "--nogpg", "install",
526
                                             self.raw_postinst_content_path])
527
            if ret != 0:
528
                msg = _(f"Failed to install content RPM to the target system.")
529
                self._terminate(msg)
530
                return
531
        elif self.content_type == "scap-security-guide":
532
            # nothing needed
533
            pass
534
        else:
535
            utils.universal_copy(utils.join_paths(common.INSTALLATION_CONTENT_DIR,
536
                                                  "*"),
537
                                 target_content_dir)
538
        if os.path.exists(self.preinst_tailoring_path):
539
            shutil.copy2(self.preinst_tailoring_path, target_content_dir)
540
541
        if self.remediate in ("", "post", "both"):
542
            try:
543
                common.run_oscap_remediate(self.profile_id, self.postinst_content_path,
544
                                           self.datastream_id, self.xccdf_id,
545
                                           self.postinst_tailoring_path,
546
                                           chroot=conf.target.system_root)
547
            except Exception as exc:
548
                msg = _(f"Something went wrong during the final hardening: {str(exc)}.")
549
                self._terminate(msg)
550
                return
551
552
        if self.remediate in ("", "firstboot", "both"):
553
            try:
554
                common.schedule_firstboot_remediation(
555
                    conf.target.system_root,
556
                    self.profile_id,
557
                    self.postinst_content_path,
558
                    self.datastream_id,
559
                    self.xccdf_id,
560
                    self.postinst_tailoring_path,
561
                )
562
            except Exception as exc:
563
                msg = _("Something went wrong when scheduling the first-boot remediation: {exc}."
564
                        .format(exc=str(exc)))
565
                terminate(msg)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable terminate does not seem to be defined.
Loading history...
566
                return
567
568
    def clear_all(self):
569
        """Clear all the stored values."""
570
571
        self.__init__(self.name, just_clear=True)
572