Passed
Pull Request — rhel9-branch (#189)
by Matěj
58s
created

schedule_firstboot_remediation()   A

Complexity

Conditions 2

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 10
nop 6
dl 0
loc 14
rs 9.9
c 0
b 0
f 0
1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""
22
Module with various classes and functions needed by the OSCAP addon that are
23
not specific to any installation mode (tui, gui, ks).
24
25
"""
26
27
import os
28
import tempfile
29
import subprocess
30
import zipfile
31
import tarfile
32
import textwrap
33
34
import re
35
import logging
36
37
from collections import namedtuple
38
import gettext
39
from functools import wraps
40
41
from dasbus.identifier import DBusServiceIdentifier
42
from pyanaconda.core import constants
43
from pyanaconda.core.dbus import DBus
44
from pyanaconda.core.constants import PAYLOAD_TYPE_DNF
45
from pyanaconda.modules.common.constants.namespaces import ADDONS_NAMESPACE
46
from pyanaconda.modules.common.constants.services import PAYLOADS
47
from pyanaconda.modules.common.structures.payload import PackagesConfigurationData
48
from pyanaconda.threading import threadMgr, AnacondaThread
49
50
from org_fedora_oscap import utils
51
from org_fedora_oscap import cpioarchive
52
53
54
log = logging.getLogger("anaconda")
55
56
57
# mimick pyanaconda/core/i18n.py
58
def _(string):
59
    if string:
60
        return gettext.translation("oscap-anaconda-addon", fallback=True).gettext(string)
61
    else:
62
        return ""
63
64
65
def N_(string): return string
66
67
68
# everything else should be private
69
__all__ = ["run_oscap_remediate", "get_fix_rules_pre",
70
           "extract_data", "strip_content_dir",
71
           "OSCAPaddonError", "get_payload_proxy", "get_packages_data",
72
           "set_packages_data"]
73
74
INSTALLATION_CONTENT_DIR = "/tmp/openscap_data/"
75
TARGET_CONTENT_DIR = "/root/openscap_data/"
76
77
SSG_DIR = "/usr/share/xml/scap/ssg/content/"
78
79
# Make it easy to change e.g. by sed substitution in spec files
80
# First name is the canonical addon name, rest are adapters
81
ADDON_NAMES = ["com_redhat_oscap", "org_fedora_oscap"]
82
83
COMPLAIN_ABOUT_NON_CANONICAL_NAMES = True
84
85
# Enable patches that set the content name at package-time
86
DEFAULT_SSG_CONTENT_NAME = ""
87
SSG_CONTENT = DEFAULT_SSG_CONTENT_NAME
88
if not SSG_CONTENT:
89
    if constants.shortProductName != 'anaconda':
90
        if constants.shortProductName == 'fedora':
91
            SSG_CONTENT = "ssg-fedora-ds.xml"
92
        else:
93
            SSG_CONTENT = (
94
                "ssg-{name}{version}-ds.xml"
95
                .format(
96
                    name=constants.shortProductName,
97
                    version=constants.productVersion.strip(".")[0]))
98
99
RESULTS_PATH = utils.join_paths(TARGET_CONTENT_DIR,
100
                                "eval_remediate_results.xml")
101
REPORT_PATH = utils.join_paths(TARGET_CONTENT_DIR,
102
                               "eval_remediate_report.html")
103
104
PRE_INSTALL_FIX_SYSTEM_ATTR = "urn:redhat:anaconda:pre"
105
106
THREAD_FETCH_DATA = "AnaOSCAPdataFetchThread"
107
108
SUPPORTED_ARCHIVES = (".zip", ".tar", ".tar.gz", ".tar.bz2", )
109
110
SUPPORTED_CONTENT_TYPES = (
111
    "datastream", "rpm", "archive", "scap-security-guide",
112
)
113
114
SUPPORTED_URL_PREFIXES = (
115
    "http://", "https://", "ftp://",  # LABEL:?, hdaX:?,
116
)
117
118
# buffer size for reading and writing out data (in bytes)
119
IO_BUF_SIZE = 2 * 1024 * 1024
120
121
# DBus constants
122
KDUMP = DBusServiceIdentifier(
123
    namespace=ADDONS_NAMESPACE,
124
    basename="Kdump",
125
    message_bus=DBus
126
)
127
128
129
class OSCAPaddonError(Exception):
130
    """Exception class for OSCAP addon related errors."""
131
132
    pass
133
134
135
class OSCAPaddonNetworkError(OSCAPaddonError):
136
    """Exception class for OSCAP addon related network errors."""
137
138
    pass
139
140
141
class ExtractionError(OSCAPaddonError):
142
    """Exception class for the extraction errors."""
143
144
    pass
145
146
147
MESSAGE_TYPE_FATAL = 0
148
MESSAGE_TYPE_WARNING = 1
149
MESSAGE_TYPE_INFO = 2
150
151
# namedtuple for messages returned from the rules evaluation
152
#   origin -- class (inherited from RuleHandler) that generated the message
153
#   type -- one of the MESSAGE_TYPE_* constants defined above
154
#   text -- the actual message that should be displayed, logged, ...
155
RuleMessage = namedtuple("RuleMessage", ["origin", "type", "text"])
156
157
158
class SubprocessLauncher(object):
159
    def __init__(self, args):
160
        self.args = args
161
        self.stdout = ""
162
        self.stderr = ""
163
        self.messages = []
164
        self.returncode = None
165
166
    def execute(self, ** kwargs):
167
        command_string = " ".join(self.args)
168
        log.info(
169
            "OSCAP addon: Executing subprocess: '{command_string}'"
170
            .format(command_string=command_string))
171
        try:
172
            proc = subprocess.Popen(self.args, stdout=subprocess.PIPE,
173
                                    stderr=subprocess.PIPE, ** kwargs)
174
        except OSError as oserr:
175
            msg = ("Failed to execute command '{command_string}': {oserr}"
176
                   .format(command_string=command_string, oserr=oserr))
177
            raise OSCAPaddonError(msg)
178
179
        (stdout, stderr) = proc.communicate()
180
        self.stdout = stdout.decode()
181
        self.stderr = stderr.decode(errors="replace")
182
        self.messages = re.findall(r'OpenSCAP Error:.*', self.stderr)
183
        self.messages = self.messages + re.findall(r'E: oscap:.*', self.stderr)
184
185
        self.returncode = proc.returncode
186
187
    def log_messages(self):
188
        for message in self.messages:
189
            log.warning("OSCAP addon: " + message)
190
191
192
def get_fix_rules_pre(profile, fpath, ds_id="", xccdf_id="", tailoring=""):
193
    """
194
    Get fix rules for the pre-installation environment for a given profile in a
195
    given datastream and checklist in a given file.
196
197
    :see: run_oscap_remediate
198
    :see: _run_oscap_gen_fix
199
    :return: fix rules for a given profile
200
    :rtype: str
201
202
    """
203
204
    return _run_oscap_gen_fix(profile, fpath, PRE_INSTALL_FIX_SYSTEM_ATTR,
205
                              ds_id=ds_id, xccdf_id=xccdf_id,
206
                              tailoring=tailoring)
207
208
209
def _run_oscap_gen_fix(profile, fpath, template, ds_id="", xccdf_id="",
210
                       tailoring=""):
211
    """
212
    Run oscap tool on a given file to get the contents of fix elements with the
213
    'system' attribute equal to a given template for a given datastream,
214
    checklist and profile.
215
216
    :see: run_oscap_remediate
217
    :param template: the value of the 'system' attribute of the fix elements
218
    :type template: str
219
    :return: oscap tool's stdout
220
    :rtype: str
221
222
    """
223
224
    if not profile:
225
        return ""
226
227
    args = ["oscap", "xccdf", "generate", "fix"]
228
    args.append("--template=%s" % template)
229
230
    # oscap uses the default profile by default
231
    if profile.lower() != "default":
232
        args.append("--profile=%s" % profile)
233
    if ds_id:
234
        args.append("--datastream-id=%s" % ds_id)
235
    if xccdf_id:
236
        args.append("--xccdf-id=%s" % xccdf_id)
237
    if tailoring:
238
        args.append("--tailoring-file=%s" % tailoring)
239
240
    args.append(fpath)
241
242
    proc = SubprocessLauncher(args)
243
    proc.execute()
244
    proc.log_messages()
245
    if proc.returncode != 0:
246
        msg = "Failed to generate fix rules with the oscap tool: %s" % proc.stderr
247
        raise OSCAPaddonError(msg)
248
249
    return proc.stdout
250
251
252
def do_chroot(chroot):
253
    """Helper function doing the chroot if requested."""
254
    if chroot and chroot != "/":
255
        os.chroot(chroot)
256
        os.chdir("/")
257
258
259
def assert_scanner_works(chroot, executable="oscap"):
260
    args = [executable, "--version"]
261
    command = " ".join(args)
262
263
    try:
264
        proc = subprocess.Popen(
265
            args, preexec_fn=lambda: do_chroot(chroot),
266
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
267
        (stdout, stderr) = proc.communicate()
268
        stderr = stderr.decode(errors="replace")
269
    except OSError as exc:
270
        msg = _(f"Basic invocation '{command}' fails: {str(exc)}")
271
        raise OSCAPaddonError(msg)
272
    if proc.returncode != 0:
273
        msg = _(
274
            f"Basic scanner invocation '{command}' exited "
275
            "with non-zero error code {proc.returncode}: {stderr}")
276
        raise OSCAPaddonError(msg)
277
    return True
278
279
280
def run_oscap_remediate(profile, fpath, ds_id="", xccdf_id="", tailoring="",
281
                        chroot=""):
282
    """
283
    Run the evaluation and remediation with the oscap tool on a given file,
284
    doing the remediation as defined in a given profile defined in a given
285
    checklist that is a part of a given datastream. If requested, run in
286
    chroot.
287
288
    :param profile: id of the profile that will drive the remediation
289
    :type profile: str
290
    :param fpath: path to a file with SCAP content
291
    :type fpath: str
292
    :param ds_id: ID of the datastream that contains the checklist defining
293
                  the profile
294
    :type ds_id: str
295
    :param xccdf_id: ID of the checklist that defines the profile
296
    :type xccdf_id: str
297
    :param tailoring: path to a tailoring file
298
    :type tailoring: str
299
    :param chroot: path to the root the oscap tool should be run in
300
    :type chroot: str
301
    :return: oscap tool's stdout (summary of the rules, checks and fixes)
302
    :rtype: str
303
304
    """
305
306
    if not profile:
307
        return ""
308
309
    # make sure the directory for the results exists
310
    results_dir = os.path.dirname(RESULTS_PATH)
311
    if chroot:
312
        results_dir = os.path.normpath(chroot + "/" + results_dir)
313
    utils.ensure_dir_exists(results_dir)
314
315
    args = ["oscap", "xccdf", "eval"]
316
    args.append("--remediate")
317
    args.append("--results=%s" % RESULTS_PATH)
318
    args.append("--report=%s" % REPORT_PATH)
319
320
    # oscap uses the default profile by default
321
    if profile.lower() != "default":
322
        args.append("--profile=%s" % profile)
323
    if ds_id:
324
        args.append("--datastream-id=%s" % ds_id)
325
    if xccdf_id:
326
        args.append("--xccdf-id=%s" % xccdf_id)
327
    if tailoring:
328
        args.append("--tailoring-file=%s" % tailoring)
329
330
    args.append(fpath)
331
332
    proc = SubprocessLauncher(args)
333
    proc.execute(preexec_fn=lambda: do_chroot(chroot))
334
    proc.log_messages()
335
336
    if proc.returncode not in (0, 2):
337
        # 0 -- success; 2 -- no error, but checks/remediation failed
338
        msg = "Content evaluation and remediation with the oscap tool "\
339
            "failed: %s" % proc.stderr
340
        raise OSCAPaddonError(msg)
341
342
    return proc.stdout
343
344
345
346
def _schedule_firstboot_remediation(
347
        chroot, profile, ds_path, results_path, report_path, ds_id, xccdf_id, tailoring_path):
348
    config = textwrap.dedent(f"""\
349
    OSCAP_REMEDIATE_DS='{ds_path}'
350
    OSCAP_REMEDIATE_PROFILE_ID='{profile}'
351
    OSCAP_REMEDIATE_ARF_RESULT='{results_path}'
352
    OSCAP_REMEDIATE_HTML_REPORT='{report_path}'
353
    OSCAP_REMEDIATE_VERBOSE_LOGS='/var/tmp/oscap_verbose.log'
354
    """)
355
    if ds_id:
356
        config += "OSCAP_REMEDIATE_DATASTREAM_ID='{ds_id}'\n"
357
    if xccdf_id:
358
        config += "OSCAP_REMEDIATE_XCCDF_ID='{xccdf_id}'\n"
359
    if tailoring_path:
360
        config += "OSCAP_REMEDIATE_TAILORING='{tailoring_path}'\n"
361
362
    relative_filename = "var/tmp/oscap-remediate-offline.conf.sh"
363
    local_config_filename = f"/{relative_filename}"
364
    chroot_config_filename = os.path.join(chroot, relative_filename)
365
    with open(chroot_config_filename, "w") as f:
366
        f.write(config)
367
    os.symlink(local_config_filename,
368
               os.path.join(chroot, "system-update"))
369
370
371
def schedule_firstboot_remediation(chroot, profile, fpath, ds_id="", xccdf_id="", tailoring=""):
372
    if not profile:
373
        return ""
374
375
    # make sure the directory for the results exists
376
    results_dir = os.path.dirname(RESULTS_PATH)
377
    results_dir = os.path.normpath(chroot + "/" + results_dir)
378
    utils.ensure_dir_exists(results_dir)
379
380
    log.info("OSCAP addon: Scheduling firstboot remediation")
381
    _schedule_firstboot_remediation(
382
        chroot, profile, fpath, RESULTS_PATH, REPORT_PATH, ds_id, xccdf_id, tailoring)
383
384
    return ""
385
386
387
def extract_data(archive, out_dir, ensure_has_files=None):
388
    """
389
    Fuction that extracts the given archive to the given output directory. It
390
    tries to find out the archive type by the file name.
391
392
    :param archive: path to the archive file that should be extracted
393
    :type archive: str
394
    :param out_dir: output directory the archive should be extracted to
395
    :type out_dir: str
396
    :param ensure_has_files: relative paths to the files that must exist in the
397
                             archive
398
    :type ensure_has_files: iterable of strings or None
399
    :return: a list of files and directories extracted from the archive
400
    :rtype: [str]
401
402
    """
403
404
    if not ensure_has_files:
405
        ensure_has_files = []
406
407
    # get rid of empty file paths
408
    if not ensure_has_files:
409
        ensure_has_files = []
410
    else:
411
        ensure_has_files = [fpath for fpath in ensure_has_files if fpath]
412
413
    msg = "OSCAP addon: Extracting {archive}".format(archive=archive)
414
    if ensure_has_files:
415
        msg += ", expecting to find {files} there.".format(files=tuple(ensure_has_files))
416
    log.info(msg)
417
418
    result = []
419
    if archive.endswith(".zip"):
420
        # ZIP file
421
        try:
422
            zfile = zipfile.ZipFile(archive, "r")
423
        except Exception as exc:
424
            msg = _(f"Error extracting archive as a zipfile: {exc}")
425
            raise ExtractionError(msg)
426
427
        # generator for the paths of the files found in the archive (dirs end
428
        # with "/")
429
        files = set(info.filename for info in zfile.filelist
430
                    if not info.filename.endswith("/"))
431
        for fpath in ensure_has_files or ():
432
            if fpath not in files:
433
                msg = "File '%s' not found in the archive '%s'" % (fpath,
434
                                                                   archive)
435
                raise ExtractionError(msg)
436
437
        utils.ensure_dir_exists(out_dir)
438
        zfile.extractall(path=out_dir)
439
        result = [utils.join_paths(out_dir, info.filename) for info in zfile.filelist]
440
        zfile.close()
441
    elif archive.endswith(".tar"):
442
        # plain tarball
443
        result = _extract_tarball(archive, out_dir, ensure_has_files, None)
444
    elif archive.endswith(".tar.gz"):
445
        # gzipped tarball
446
        result = _extract_tarball(archive, out_dir, ensure_has_files, "gz")
447
    elif archive.endswith(".tar.bz2"):
448
        # bzipped tarball
449
        result = _extract_tarball(archive, out_dir, ensure_has_files, "bz2")
450
    elif archive.endswith(".rpm"):
451
        # RPM
452
        result = _extract_rpm(archive, out_dir, ensure_has_files)
453
    # elif other types of archives
454
    else:
455
        raise ExtractionError("Unsuported archive type")
456
    log.info("OSCAP addon: Extracted {files} from the supplied content"
457
             .format(files=result))
458
    return result
459
460
461
def _extract_tarball(archive, out_dir, ensure_has_files, alg):
462
    """
463
    Extract the given TAR archive to the given output directory and make sure
464
    the given file exists in the archive.
465
466
    :see: extract_data
467
    :param alg: compression algorithm used for the tarball
468
    :type alg: str (one of "gz", "bz2") or None
469
    :return: a list of files and directories extracted from the archive
470
    :rtype: [str]
471
472
    """
473
474
    if alg and alg not in ("gz", "bz2",):
475
        raise ExtractionError("Unsupported compression algorithm")
476
477
    mode = "r"
478
    if alg:
479
        mode += ":%s" % alg
480
481
    try:
482
        tfile = tarfile.TarFile.open(archive, mode)
483
    except tarfile.TarError as err:
484
        raise ExtractionError(str(err))
485
486
    # generator for the paths of the files found in the archive
487
    files = set(member.path for member in tfile.getmembers()
488
                if member.isfile())
489
490
    for fpath in ensure_has_files or ():
491
        if fpath not in files:
492
            msg = "File '%s' not found in the archive '%s'" % (fpath, archive)
493
            raise ExtractionError(msg)
494
495
    utils.ensure_dir_exists(out_dir)
496
    tfile.extractall(path=out_dir)
497
    result = [utils.join_paths(out_dir, member.path) for member in tfile.getmembers()]
498
    tfile.close()
499
500
    return result
501
502
503
def _extract_rpm(rpm_path, root="/", ensure_has_files=None):
504
    """
505
    Extract the given RPM into the directory tree given by the root argument
506
    and make sure the given file exists in the archive.
507
508
    :param rpm_path: path to the RPM file that should be extracted
509
    :type rpm_path: str
510
    :param root: root of the directory tree the RPM should be extracted into
511
    :type root: str
512
    :param ensure_has_files: relative paths to the files that must exist in the
513
                             RPM
514
    :type ensure_has_files: iterable of strings or None
515
    :return: a list of files and directories extracted from the archive
516
    :rtype: [str]
517
518
    """
519
520
    # run rpm2cpio and process the output with the cpioarchive module
521
    temp_fd, temp_path = tempfile.mkstemp(prefix="oscap_rpm")
522
    proc = subprocess.Popen(["rpm2cpio", rpm_path], stdout=temp_fd)
523
    proc.wait()
524
    if proc.returncode != 0:
525
        msg = "Failed to convert RPM '%s' to cpio archive" % rpm_path
526
        raise ExtractionError(msg)
527
528
    os.close(temp_fd)
529
530
    try:
531
        archive = cpioarchive.CpioArchive(temp_path)
532
    except cpioarchive.CpioError as err:
533
        raise ExtractionError(str(err))
534
535
    # get entries from the archive (supports only iteration over entries)
536
    entries = set(entry for entry in archive)
537
538
    # cpio entry names (paths) start with the dot
539
    entry_names = [entry.name.lstrip(".") for entry in entries]
540
541
    for fpath in ensure_has_files or ():
542
        # RPM->cpio entries have absolute paths
543
        if fpath not in entry_names and \
544
           os.path.join("/", fpath) not in entry_names:
545
            msg = "File '%s' not found in the archive '%s'" % (fpath, rpm_path)
546
            raise ExtractionError(msg)
547
548
    try:
549
        for entry in entries:
550
            if entry.size == 0:
551
                continue
552
            dirname = os.path.dirname(entry.name.lstrip("."))
553
            out_dir = os.path.normpath(root + dirname)
554
            utils.ensure_dir_exists(out_dir)
555
556
            out_fpath = os.path.normpath(root + entry.name.lstrip("."))
557
            if os.path.exists(out_fpath):
558
                continue
559
            with open(out_fpath, "wb") as out_file:
560
                buf = entry.read(IO_BUF_SIZE)
561
                while buf:
562
                    out_file.write(buf)
563
                    buf = entry.read(IO_BUF_SIZE)
564
    except (IOError, cpioarchive.CpioError) as e:
565
        raise ExtractionError(e)
566
567
    # cleanup
568
    archive.close()
569
    os.unlink(temp_path)
570
571
    return [os.path.normpath(root + name) for name in entry_names]
572
573
574
def strip_content_dir(fpaths, phase="preinst"):
575
    """
576
    Strip content directory prefix from the file paths for either
577
    pre-installation or post-installation phase.
578
579
    :param fpaths: iterable of file paths to strip content directory prefix
580
                   from
581
    :type fpaths: iterable of strings
582
    :param phase: specifies pre-installation or post-installation phase
583
    :type phase: "preinst" or "postinst"
584
    :return: the same iterable of file paths as given with the content
585
             directory prefix stripped
586
    :rtype: same type as fpaths
587
588
    """
589
590
    if phase == "preinst":
591
        remove_prefix = lambda x: x[len(INSTALLATION_CONTENT_DIR):]
592
    else:
593
        remove_prefix = lambda x: x[len(TARGET_CONTENT_DIR):]
594
595
    return utils.keep_type_map(remove_prefix, fpaths)
596
597
598
def get_ssg_path(root="/"):
599
    return utils.join_paths(root, SSG_DIR + SSG_CONTENT)
600
601
602
def ssg_available(root="/"):
603
    """
604
    Tries to find the SCAP Security Guide under the given root.
605
606
    :return: True if SSG was found under the given root, False otherwise
607
608
    """
609
610
    return os.path.exists(get_ssg_path(root))
611
612
613
def get_content_name(data):
614
    if data.content_type == "scap-security-guide":
615
        raise ValueError("Using scap-security-guide, no single content file")
616
617
    rest = "/anonymous_content"
618
    for prefix in SUPPORTED_URL_PREFIXES:
619
        if data.content_url.startswith(prefix):
620
            rest = data.content_url[len(prefix):]
621
            break
622
623
    parts = rest.rsplit("/", 1)
624
    if len(parts) != 2:
625
        raise ValueError("Unsupported url '%s'" % data.content_url)
626
627
    return parts[1]
628
629
630
def get_raw_preinst_content_path(data):
631
    """Path to the raw (unextracted, ...) pre-installation content file"""
632
    if data.content_type == "scap-security-guide":
633
        log.debug("OSCAP addon: Using scap-security-guide, no single content file")
634
        return None
635
636
    content_name = get_content_name(data)
637
    return utils.join_paths(INSTALLATION_CONTENT_DIR, content_name)
638
639
640
def get_preinst_content_path(data):
641
    """Path to the pre-installation content file"""
642
    if data.content_type == "scap-security-guide":
643
        # SSG is not copied to the standard place
644
        return data.content_path
645
646
    if data.content_type == "datastream":
647
        return get_raw_preinst_content_path(data)
648
649
    return utils.join_paths(
650
        INSTALLATION_CONTENT_DIR,
651
        data.content_path
652
    )
653
654
655
def get_postinst_content_path(data):
656
    """Path to the post-installation content file"""
657
    if data.content_type == "datastream":
658
        return utils.join_paths(
659
            TARGET_CONTENT_DIR,
660
            get_content_name(data)
661
        )
662
663
    if data.content_type in ("rpm", "scap-security-guide"):
664
        # no path magic in case of RPM (SSG is installed as an RPM)
665
        return data.content_path
666
667
    return utils.join_paths(
668
        TARGET_CONTENT_DIR,
669
        data.content_path
670
    )
671
672
673
def get_preinst_tailoring_path(data):
674
    """Path to the pre-installation tailoring file (if any)"""
675
    if not data.tailoring_path:
676
        return ""
677
678
    return utils.join_paths(
679
        INSTALLATION_CONTENT_DIR,
680
        data.tailoring_path
681
    )
682
683
684
def get_postinst_tailoring_path(data):
685
    """Path to the post-installation tailoring file (if any)"""
686
    if not data.tailoring_path:
687
        return ""
688
689
    if data.content_type == "rpm":
690
        # no path magic in case of RPM
691
        return data.tailoring_path
692
693
    return utils.join_paths(
694
        TARGET_CONTENT_DIR,
695
        data.tailoring_path
696
    )
697
698
699
def get_payload_proxy():
700
    """Get the DBus proxy of the active payload.
701
702
    :return: a DBus proxy
703
    """
704
    payloads_proxy = PAYLOADS.get_proxy()
705
    object_path = payloads_proxy.ActivePayload
706
707
    if not object_path:
708
        raise ValueError("Active payload is not set.")
709
710
    return PAYLOADS.get_proxy(object_path)
711
712
713
def get_packages_data() -> PackagesConfigurationData:
714
    """Get the DBus data with the packages configuration.
715
716
    :return: a packages configuration
717
    """
718
    payload_proxy = get_payload_proxy()
719
720
    if payload_proxy.Type != PAYLOAD_TYPE_DNF:
721
        return PackagesConfigurationData()
722
723
    return PackagesConfigurationData.from_structure(
724
        payload_proxy.Packages
725
    )
726
727
728
def set_packages_data(data: PackagesConfigurationData):
729
    """Set the DBus data with the packages configuration.
730
731
    :param data: a packages configuration
732
    """
733
    payload_proxy = get_payload_proxy()
734
735
    if payload_proxy.Type != PAYLOAD_TYPE_DNF:
736
        log.debug("OSCAP addon: The payload doesn't support packages.")
737
        return
738
739
    return payload_proxy.SetPackages(
740
        PackagesConfigurationData.to_structure(data)
741
    )
742