Passed
Pull Request — rhel9-branch (#189)
by Matěj
01:02
created

schedule_firstboot_remediation()   A

Complexity

Conditions 2

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 10
nop 6
dl 0
loc 14
rs 9.9
c 0
b 0
f 0
1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""
22
Module with various classes and functions needed by the OSCAP addon that are
23
not specific to any installation mode (tui, gui, ks).
24
25
"""
26
27
import os
28
import tempfile
29
import subprocess
30
import zipfile
31
import tarfile
32
import textwrap
33
34
import re
35
import logging
36
37
from collections import namedtuple
38
import gettext
39
from functools import wraps
40
41
from dasbus.identifier import DBusServiceIdentifier
42
from pyanaconda.core import constants
43
from pyanaconda.core.dbus import DBus
44
from pyanaconda.core.constants import PAYLOAD_TYPE_DNF
45
from pyanaconda.modules.common.constants.namespaces import ADDONS_NAMESPACE
46
from pyanaconda.modules.common.constants.services import PAYLOADS
47
from pyanaconda.modules.common.structures.payload import PackagesConfigurationData
48
from pyanaconda.threading import threadMgr, AnacondaThread
49
50
from org_fedora_oscap import utils
51
from org_fedora_oscap import cpioarchive
52
53
54
log = logging.getLogger("anaconda")
55
56
57
# mimick pyanaconda/core/i18n.py
58
def _(string):
59
    if string:
60
        return gettext.translation("oscap-anaconda-addon", fallback=True).gettext(string)
61
    else:
62
        return ""
63
64
65
def N_(string): return string
66
67
68
# everything else should be private
69
__all__ = ["run_oscap_remediate", "get_fix_rules_pre",
70
           "extract_data", "strip_content_dir",
71
           "OSCAPaddonError", "get_payload_proxy", "get_packages_data",
72
           "set_packages_data"]
73
74
INSTALLATION_CONTENT_DIR = "/tmp/openscap_data/"
75
TARGET_CONTENT_DIR = "/root/openscap_data/"
76
77
SSG_DIR = "/usr/share/xml/scap/ssg/content/"
78
79
# Make it easy to change e.g. by sed substitution in spec files
80
# First name is the canonical addon name, rest are adapters
81
ADDON_NAMES = ["com_redhat_oscap", "org_fedora_oscap"]
82
83
COMPLAIN_ABOUT_NON_CANONICAL_NAMES = True
84
85
# Enable patches that set the content name at package-time
86
DEFAULT_SSG_CONTENT_NAME = ""
87
SSG_CONTENT = DEFAULT_SSG_CONTENT_NAME
88
if not SSG_CONTENT:
89
    if constants.shortProductName != 'anaconda':
90
        if constants.shortProductName == 'fedora':
91
            SSG_CONTENT = "ssg-fedora-ds.xml"
92
        else:
93
            SSG_CONTENT = (
94
                "ssg-{name}{version}-ds.xml"
95
                .format(
96
                    name=constants.shortProductName,
97
                    version=constants.productVersion.strip(".")[0]))
98
99
RESULTS_PATH = utils.join_paths(TARGET_CONTENT_DIR,
100
                                "eval_remediate_results.xml")
101
REPORT_PATH = utils.join_paths(TARGET_CONTENT_DIR,
102
                               "eval_remediate_report.html")
103
104
PRE_INSTALL_FIX_SYSTEM_ATTR = "urn:redhat:anaconda:pre"
105
106
THREAD_FETCH_DATA = "AnaOSCAPdataFetchThread"
107
108
SUPPORTED_ARCHIVES = (".zip", ".tar", ".tar.gz", ".tar.bz2", )
109
110
SUPPORTED_CONTENT_TYPES = (
111
    "datastream", "rpm", "archive", "scap-security-guide",
112
)
113
114
SUPPORTED_URL_PREFIXES = (
115
    "http://", "https://", "ftp://",  # LABEL:?, hdaX:?,
116
)
117
118
# buffer size for reading and writing out data (in bytes)
119
IO_BUF_SIZE = 2 * 1024 * 1024
120
121
# DBus constants
122
KDUMP = DBusServiceIdentifier(
123
    namespace=ADDONS_NAMESPACE,
124
    basename="Kdump",
125
    message_bus=DBus
126
)
127
128
129
class OSCAPaddonError(Exception):
130
    """Exception class for OSCAP addon related errors."""
131
132
    pass
133
134
135
class OSCAPaddonNetworkError(OSCAPaddonError):
136
    """Exception class for OSCAP addon related network errors."""
137
138
    pass
139
140
141
class ExtractionError(OSCAPaddonError):
142
    """Exception class for the extraction errors."""
143
144
    pass
145
146
147
MESSAGE_TYPE_FATAL = 0
148
MESSAGE_TYPE_WARNING = 1
149
MESSAGE_TYPE_INFO = 2
150
151
# namedtuple for messages returned from the rules evaluation
152
#   origin -- class (inherited from RuleHandler) that generated the message
153
#   type -- one of the MESSAGE_TYPE_* constants defined above
154
#   text -- the actual message that should be displayed, logged, ...
155
RuleMessage = namedtuple("RuleMessage", ["origin", "type", "text"])
156
157
158
class SubprocessLauncher(object):
159
    def __init__(self, args):
160
        self.args = args
161
        self.stdout = ""
162
        self.stderr = ""
163
        self.messages = []
164
        self.returncode = None
165
166
    def execute(self, ** kwargs):
167
        command_string = " ".join(self.args)
168
        log.info(
169
            "OSCAP addon: Executing subprocess: '{command_string}'"
170
            .format(command_string=command_string))
171
        try:
172
            proc = subprocess.Popen(self.args, stdout=subprocess.PIPE,
173
                                    stderr=subprocess.PIPE, ** kwargs)
174
        except OSError as oserr:
175
            msg = ("Failed to execute command '{command_string}': {oserr}"
176
                   .format(command_string=command_string, oserr=oserr))
177
            raise OSCAPaddonError(msg)
178
179
        (stdout, stderr) = proc.communicate()
180
        self.stdout = stdout.decode()
181
        self.stderr = stderr.decode(errors="replace")
182
        self.messages = re.findall(r'OpenSCAP Error:.*', self.stderr)
183
        self.messages = self.messages + re.findall(r'E: oscap:.*', self.stderr)
184
185
        self.returncode = proc.returncode
186
187
    def log_messages(self):
188
        for message in self.messages:
189
            log.warning("OSCAP addon: " + message)
190
191
192
def get_fix_rules_pre(profile, fpath, ds_id="", xccdf_id="", tailoring=""):
193
    """
194
    Get fix rules for the pre-installation environment for a given profile in a
195
    given datastream and checklist in a given file.
196
197
    :see: run_oscap_remediate
198
    :see: _run_oscap_gen_fix
199
    :return: fix rules for a given profile
200
    :rtype: str
201
202
    """
203
204
    return _run_oscap_gen_fix(profile, fpath, PRE_INSTALL_FIX_SYSTEM_ATTR,
205
                              ds_id=ds_id, xccdf_id=xccdf_id,
206
                              tailoring=tailoring)
207
208
209
def _run_oscap_gen_fix(profile, fpath, template, ds_id="", xccdf_id="",
210
                       tailoring=""):
211
    """
212
    Run oscap tool on a given file to get the contents of fix elements with the
213
    'system' attribute equal to a given template for a given datastream,
214
    checklist and profile.
215
216
    :see: run_oscap_remediate
217
    :param template: the value of the 'system' attribute of the fix elements
218
    :type template: str
219
    :return: oscap tool's stdout
220
    :rtype: str
221
222
    """
223
224
    if not profile:
225
        return ""
226
227
    args = ["oscap", "xccdf", "generate", "fix"]
228
    args.append("--template=%s" % template)
229
230
    # oscap uses the default profile by default
231
    if profile.lower() != "default":
232
        args.append("--profile=%s" % profile)
233
    if ds_id:
234
        args.append("--datastream-id=%s" % ds_id)
235
    if xccdf_id:
236
        args.append("--xccdf-id=%s" % xccdf_id)
237
    if tailoring:
238
        args.append("--tailoring-file=%s" % tailoring)
239
240
    args.append(fpath)
241
242
    proc = SubprocessLauncher(args)
243
    proc.execute()
244
    proc.log_messages()
245
    if proc.returncode != 0:
246
        msg = "Failed to generate fix rules with the oscap tool: %s" % proc.stderr
247
        raise OSCAPaddonError(msg)
248
249
    return proc.stdout
250
251
252
def do_chroot(chroot):
253
    """Helper function doing the chroot if requested."""
254
    if chroot and chroot != "/":
255
        os.chroot(chroot)
256
        os.chdir("/")
257
258
259
def assert_scanner_works(chroot, executable="oscap"):
260
    args = [executable, "--version"]
261
    command = " ".join(args)
262
263
    try:
264
        proc = subprocess.Popen(
265
            args, preexec_fn=lambda: do_chroot(chroot),
266
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
267
        (stdout, stderr) = proc.communicate()
268
        stderr = stderr.decode(errors="replace")
269
    except OSError as exc:
270
        msg = _(f"Basic invocation '{command}' fails: {str(exc)}")
271
        raise OSCAPaddonError(msg)
272
    if proc.returncode != 0:
273
        msg = _(
274
            f"Basic scanner invocation '{command}' exited "
275
            "with non-zero error code {proc.returncode}: {stderr}")
276
        raise OSCAPaddonError(msg)
277
    return True
278
279
280
def run_oscap_remediate(profile, fpath, ds_id="", xccdf_id="", tailoring="",
281
                        chroot=""):
282
    """
283
    Run the evaluation and remediation with the oscap tool on a given file,
284
    doing the remediation as defined in a given profile defined in a given
285
    checklist that is a part of a given datastream. If requested, run in
286
    chroot.
287
288
    :param profile: id of the profile that will drive the remediation
289
    :type profile: str
290
    :param fpath: path to a file with SCAP content
291
    :type fpath: str
292
    :param ds_id: ID of the datastream that contains the checklist defining
293
                  the profile
294
    :type ds_id: str
295
    :param xccdf_id: ID of the checklist that defines the profile
296
    :type xccdf_id: str
297
    :param tailoring: path to a tailoring file
298
    :type tailoring: str
299
    :param chroot: path to the root the oscap tool should be run in
300
    :type chroot: str
301
    :return: oscap tool's stdout (summary of the rules, checks and fixes)
302
    :rtype: str
303
304
    """
305
306
    if not profile:
307
        return ""
308
309
    # make sure the directory for the results exists
310
    results_dir = os.path.dirname(RESULTS_PATH)
311
    if chroot:
312
        results_dir = os.path.normpath(chroot + "/" + results_dir)
313
    utils.ensure_dir_exists(results_dir)
314
315
    args = ["oscap", "xccdf", "eval"]
316
    args.append("--remediate")
317
    args.append("--results=%s" % RESULTS_PATH)
318
    args.append("--report=%s" % REPORT_PATH)
319
320
    # oscap uses the default profile by default
321
    if profile.lower() != "default":
322
        args.append("--profile=%s" % profile)
323
    if ds_id:
324
        args.append("--datastream-id=%s" % ds_id)
325
    if xccdf_id:
326
        args.append("--xccdf-id=%s" % xccdf_id)
327
    if tailoring:
328
        args.append("--tailoring-file=%s" % tailoring)
329
330
    args.append(fpath)
331
332
    proc = SubprocessLauncher(args)
333
    proc.execute(preexec_fn=lambda: do_chroot(chroot))
334
    proc.log_messages()
335
336
    if proc.returncode not in (0, 2):
337
        # 0 -- success; 2 -- no error, but checks/remediation failed
338
        msg = "Content evaluation and remediation with the oscap tool "\
339
            "failed: %s" % proc.stderr
340
        raise OSCAPaddonError(msg)
341
342
    return proc.stdout
343
344
345
def _schedule_firstboot_remediation(
346
        chroot, profile, ds_path, results_path, report_path, ds_id, xccdf_id, tailoring_path):
347
    config = textwrap.dedent(f"""\
348
    OSCAP_REMEDIATE_DS='{ds_path}'
349
    OSCAP_REMEDIATE_PROFILE_ID='{profile}'
350
    OSCAP_REMEDIATE_ARF_RESULT='{results_path}'
351
    OSCAP_REMEDIATE_HTML_REPORT='{report_path}'
352
    OSCAP_REMEDIATE_VERBOSE_LOGS='/var/tmp/oscap_verbose.log'
353
    """)
354
    if ds_id:
355
        config += "OSCAP_REMEDIATE_DATASTREAM_ID='{ds_id}'\n"
356
    if xccdf_id:
357
        config += "OSCAP_REMEDIATE_XCCDF_ID='{xccdf_id}'\n"
358
    if tailoring_path:
359
        config += "OSCAP_REMEDIATE_TAILORING='{tailoring_path}'\n"
360
361
    relative_filename = "var/tmp/oscap-remediate-offline.conf.sh"
362
    local_config_filename = f"/{relative_filename}"
363
    chroot_config_filename = os.path.join(chroot, relative_filename)
364
    with open(chroot_config_filename, "w") as f:
365
        f.write(config)
366
    os.symlink(local_config_filename,
367
               os.path.join(chroot, "system-update"))
368
369
370
def schedule_firstboot_remediation(chroot, profile, fpath, ds_id="", xccdf_id="", tailoring=""):
371
    if not profile:
372
        return ""
373
374
    # make sure the directory for the results exists
375
    results_dir = os.path.dirname(RESULTS_PATH)
376
    results_dir = os.path.normpath(chroot + "/" + results_dir)
377
    utils.ensure_dir_exists(results_dir)
378
379
    log.info("OSCAP addon: Scheduling firstboot remediation")
380
    _schedule_firstboot_remediation(
381
        chroot, profile, fpath, RESULTS_PATH, REPORT_PATH, ds_id, xccdf_id, tailoring)
382
383
    return ""
384
385
386
def extract_data(archive, out_dir, ensure_has_files=None):
387
    """
388
    Fuction that extracts the given archive to the given output directory. It
389
    tries to find out the archive type by the file name.
390
391
    :param archive: path to the archive file that should be extracted
392
    :type archive: str
393
    :param out_dir: output directory the archive should be extracted to
394
    :type out_dir: str
395
    :param ensure_has_files: relative paths to the files that must exist in the
396
                             archive
397
    :type ensure_has_files: iterable of strings or None
398
    :return: a list of files and directories extracted from the archive
399
    :rtype: [str]
400
401
    """
402
403
    if not ensure_has_files:
404
        ensure_has_files = []
405
406
    # get rid of empty file paths
407
    if not ensure_has_files:
408
        ensure_has_files = []
409
    else:
410
        ensure_has_files = [fpath for fpath in ensure_has_files if fpath]
411
412
    msg = "OSCAP addon: Extracting {archive}".format(archive=archive)
413
    if ensure_has_files:
414
        msg += ", expecting to find {files} there.".format(files=tuple(ensure_has_files))
415
    log.info(msg)
416
417
    result = []
418
    if archive.endswith(".zip"):
419
        # ZIP file
420
        try:
421
            zfile = zipfile.ZipFile(archive, "r")
422
        except Exception as exc:
423
            msg = _(f"Error extracting archive as a zipfile: {exc}")
424
            raise ExtractionError(msg)
425
426
        # generator for the paths of the files found in the archive (dirs end
427
        # with "/")
428
        files = set(info.filename for info in zfile.filelist
429
                    if not info.filename.endswith("/"))
430
        for fpath in ensure_has_files or ():
431
            if fpath not in files:
432
                msg = "File '%s' not found in the archive '%s'" % (fpath,
433
                                                                   archive)
434
                raise ExtractionError(msg)
435
436
        utils.ensure_dir_exists(out_dir)
437
        zfile.extractall(path=out_dir)
438
        result = [utils.join_paths(out_dir, info.filename) for info in zfile.filelist]
439
        zfile.close()
440
    elif archive.endswith(".tar"):
441
        # plain tarball
442
        result = _extract_tarball(archive, out_dir, ensure_has_files, None)
443
    elif archive.endswith(".tar.gz"):
444
        # gzipped tarball
445
        result = _extract_tarball(archive, out_dir, ensure_has_files, "gz")
446
    elif archive.endswith(".tar.bz2"):
447
        # bzipped tarball
448
        result = _extract_tarball(archive, out_dir, ensure_has_files, "bz2")
449
    elif archive.endswith(".rpm"):
450
        # RPM
451
        result = _extract_rpm(archive, out_dir, ensure_has_files)
452
    # elif other types of archives
453
    else:
454
        raise ExtractionError("Unsuported archive type")
455
    log.info("OSCAP addon: Extracted {files} from the supplied content"
456
             .format(files=result))
457
    return result
458
459
460
def _extract_tarball(archive, out_dir, ensure_has_files, alg):
461
    """
462
    Extract the given TAR archive to the given output directory and make sure
463
    the given file exists in the archive.
464
465
    :see: extract_data
466
    :param alg: compression algorithm used for the tarball
467
    :type alg: str (one of "gz", "bz2") or None
468
    :return: a list of files and directories extracted from the archive
469
    :rtype: [str]
470
471
    """
472
473
    if alg and alg not in ("gz", "bz2",):
474
        raise ExtractionError("Unsupported compression algorithm")
475
476
    mode = "r"
477
    if alg:
478
        mode += ":%s" % alg
479
480
    try:
481
        tfile = tarfile.TarFile.open(archive, mode)
482
    except tarfile.TarError as err:
483
        raise ExtractionError(str(err))
484
485
    # generator for the paths of the files found in the archive
486
    files = set(member.path for member in tfile.getmembers()
487
                if member.isfile())
488
489
    for fpath in ensure_has_files or ():
490
        if fpath not in files:
491
            msg = "File '%s' not found in the archive '%s'" % (fpath, archive)
492
            raise ExtractionError(msg)
493
494
    utils.ensure_dir_exists(out_dir)
495
    tfile.extractall(path=out_dir)
496
    result = [utils.join_paths(out_dir, member.path) for member in tfile.getmembers()]
497
    tfile.close()
498
499
    return result
500
501
502
def _extract_rpm(rpm_path, root="/", ensure_has_files=None):
503
    """
504
    Extract the given RPM into the directory tree given by the root argument
505
    and make sure the given file exists in the archive.
506
507
    :param rpm_path: path to the RPM file that should be extracted
508
    :type rpm_path: str
509
    :param root: root of the directory tree the RPM should be extracted into
510
    :type root: str
511
    :param ensure_has_files: relative paths to the files that must exist in the
512
                             RPM
513
    :type ensure_has_files: iterable of strings or None
514
    :return: a list of files and directories extracted from the archive
515
    :rtype: [str]
516
517
    """
518
519
    # run rpm2cpio and process the output with the cpioarchive module
520
    temp_fd, temp_path = tempfile.mkstemp(prefix="oscap_rpm")
521
    proc = subprocess.Popen(["rpm2cpio", rpm_path], stdout=temp_fd)
522
    proc.wait()
523
    if proc.returncode != 0:
524
        msg = "Failed to convert RPM '%s' to cpio archive" % rpm_path
525
        raise ExtractionError(msg)
526
527
    os.close(temp_fd)
528
529
    try:
530
        archive = cpioarchive.CpioArchive(temp_path)
531
    except cpioarchive.CpioError as err:
532
        raise ExtractionError(str(err))
533
534
    # get entries from the archive (supports only iteration over entries)
535
    entries = set(entry for entry in archive)
536
537
    # cpio entry names (paths) start with the dot
538
    entry_names = [entry.name.lstrip(".") for entry in entries]
539
540
    for fpath in ensure_has_files or ():
541
        # RPM->cpio entries have absolute paths
542
        if fpath not in entry_names and \
543
           os.path.join("/", fpath) not in entry_names:
544
            msg = "File '%s' not found in the archive '%s'" % (fpath, rpm_path)
545
            raise ExtractionError(msg)
546
547
    try:
548
        for entry in entries:
549
            if entry.size == 0:
550
                continue
551
            dirname = os.path.dirname(entry.name.lstrip("."))
552
            out_dir = os.path.normpath(root + dirname)
553
            utils.ensure_dir_exists(out_dir)
554
555
            out_fpath = os.path.normpath(root + entry.name.lstrip("."))
556
            if os.path.exists(out_fpath):
557
                continue
558
            with open(out_fpath, "wb") as out_file:
559
                buf = entry.read(IO_BUF_SIZE)
560
                while buf:
561
                    out_file.write(buf)
562
                    buf = entry.read(IO_BUF_SIZE)
563
    except (IOError, cpioarchive.CpioError) as e:
564
        raise ExtractionError(e)
565
566
    # cleanup
567
    archive.close()
568
    os.unlink(temp_path)
569
570
    return [os.path.normpath(root + name) for name in entry_names]
571
572
573
def strip_content_dir(fpaths, phase="preinst"):
574
    """
575
    Strip content directory prefix from the file paths for either
576
    pre-installation or post-installation phase.
577
578
    :param fpaths: iterable of file paths to strip content directory prefix
579
                   from
580
    :type fpaths: iterable of strings
581
    :param phase: specifies pre-installation or post-installation phase
582
    :type phase: "preinst" or "postinst"
583
    :return: the same iterable of file paths as given with the content
584
             directory prefix stripped
585
    :rtype: same type as fpaths
586
587
    """
588
589
    if phase == "preinst":
590
        remove_prefix = lambda x: x[len(INSTALLATION_CONTENT_DIR):]
591
    else:
592
        remove_prefix = lambda x: x[len(TARGET_CONTENT_DIR):]
593
594
    return utils.keep_type_map(remove_prefix, fpaths)
595
596
597
def get_ssg_path(root="/"):
598
    return utils.join_paths(root, SSG_DIR + SSG_CONTENT)
599
600
601
def ssg_available(root="/"):
602
    """
603
    Tries to find the SCAP Security Guide under the given root.
604
605
    :return: True if SSG was found under the given root, False otherwise
606
607
    """
608
609
    return os.path.exists(get_ssg_path(root))
610
611
612
def get_content_name(data):
613
    if data.content_type == "scap-security-guide":
614
        raise ValueError("Using scap-security-guide, no single content file")
615
616
    rest = "/anonymous_content"
617
    for prefix in SUPPORTED_URL_PREFIXES:
618
        if data.content_url.startswith(prefix):
619
            rest = data.content_url[len(prefix):]
620
            break
621
622
    parts = rest.rsplit("/", 1)
623
    if len(parts) != 2:
624
        raise ValueError("Unsupported url '%s'" % data.content_url)
625
626
    return parts[1]
627
628
629
def get_raw_preinst_content_path(data):
630
    """Path to the raw (unextracted, ...) pre-installation content file"""
631
    if data.content_type == "scap-security-guide":
632
        log.debug("OSCAP addon: Using scap-security-guide, no single content file")
633
        return None
634
635
    content_name = get_content_name(data)
636
    return utils.join_paths(INSTALLATION_CONTENT_DIR, content_name)
637
638
639
def get_preinst_content_path(data):
640
    """Path to the pre-installation content file"""
641
    if data.content_type == "scap-security-guide":
642
        # SSG is not copied to the standard place
643
        return data.content_path
644
645
    if data.content_type == "datastream":
646
        return get_raw_preinst_content_path(data)
647
648
    return utils.join_paths(
649
        INSTALLATION_CONTENT_DIR,
650
        data.content_path
651
    )
652
653
654
def get_postinst_content_path(data):
655
    """Path to the post-installation content file"""
656
    if data.content_type == "datastream":
657
        return utils.join_paths(
658
            TARGET_CONTENT_DIR,
659
            get_content_name(data)
660
        )
661
662
    if data.content_type in ("rpm", "scap-security-guide"):
663
        # no path magic in case of RPM (SSG is installed as an RPM)
664
        return data.content_path
665
666
    return utils.join_paths(
667
        TARGET_CONTENT_DIR,
668
        data.content_path
669
    )
670
671
672
def get_preinst_tailoring_path(data):
673
    """Path to the pre-installation tailoring file (if any)"""
674
    if not data.tailoring_path:
675
        return ""
676
677
    return utils.join_paths(
678
        INSTALLATION_CONTENT_DIR,
679
        data.tailoring_path
680
    )
681
682
683
def get_postinst_tailoring_path(data):
684
    """Path to the post-installation tailoring file (if any)"""
685
    if not data.tailoring_path:
686
        return ""
687
688
    if data.content_type == "rpm":
689
        # no path magic in case of RPM
690
        return data.tailoring_path
691
692
    return utils.join_paths(
693
        TARGET_CONTENT_DIR,
694
        data.tailoring_path
695
    )
696
697
698
def get_payload_proxy():
699
    """Get the DBus proxy of the active payload.
700
701
    :return: a DBus proxy
702
    """
703
    payloads_proxy = PAYLOADS.get_proxy()
704
    object_path = payloads_proxy.ActivePayload
705
706
    if not object_path:
707
        raise ValueError("Active payload is not set.")
708
709
    return PAYLOADS.get_proxy(object_path)
710
711
712
def get_packages_data() -> PackagesConfigurationData:
713
    """Get the DBus data with the packages configuration.
714
715
    :return: a packages configuration
716
    """
717
    payload_proxy = get_payload_proxy()
718
719
    if payload_proxy.Type != PAYLOAD_TYPE_DNF:
720
        return PackagesConfigurationData()
721
722
    return PackagesConfigurationData.from_structure(
723
        payload_proxy.Packages
724
    )
725
726
727
def set_packages_data(data: PackagesConfigurationData):
728
    """Set the DBus data with the packages configuration.
729
730
    :param data: a packages configuration
731
    """
732
    payload_proxy = get_payload_proxy()
733
734
    if payload_proxy.Type != PAYLOAD_TYPE_DNF:
735
        log.debug("OSCAP addon: The payload doesn't support packages.")
736
        return
737
738
    return payload_proxy.SetPackages(
739
        PackagesConfigurationData.to_structure(data)
740
    )
741