org_fedora_oscap.common.N_()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 1
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nop 1
dl 0
loc 1
rs 10
c 0
b 0
f 0
1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""
22
Module with various classes and functions needed by the OSCAP addon that are
23
not specific to any installation mode (tui, gui, ks).
24
25
"""
26
27
import os
28
import tempfile
29
import subprocess
30
import zipfile
31
import tarfile
32
import textwrap
33
34
import re
35
import logging
36
37
from collections import namedtuple
38
import gettext
39
from functools import wraps
40
41
from dasbus.identifier import DBusServiceIdentifier
42
from pyanaconda.core import constants
43
from pyanaconda.core.dbus import DBus
44
from pyanaconda.core.constants import PAYLOAD_TYPE_DNF
45
from pyanaconda.modules.common.constants.namespaces import ADDONS_NAMESPACE
46
from pyanaconda.modules.common.constants.services import NETWORK, PAYLOADS
47
from pyanaconda.modules.common.structures.packages import PackagesSelectionData
48
from pyanaconda.threading import threadMgr, AnacondaThread
49
50
from org_fedora_oscap import utils
51
from org_fedora_oscap import cpioarchive
52
53
54
log = logging.getLogger("anaconda")
55
56
57
# mimick pyanaconda/core/i18n.py
58
def _(string):
59
    if string:
60
        return gettext.translation("oscap-anaconda-addon", fallback=True).gettext(string)
61
    else:
62
        return ""
63
64
65
def N_(string): return string
66
67
68
# everything else should be private
69
__all__ = ["run_oscap_remediate", "get_fix_rules_pre",
70
           "extract_data", "strip_content_dir",
71
           "OSCAPaddonError", "get_payload_proxy", "get_packages_data",
72
           "set_packages_data"]
73
74
INSTALLATION_CONTENT_DIR = "/tmp/openscap_data/"
75
TARGET_CONTENT_DIR = "/root/openscap_data/"
76
77
SSG_DIR = "/usr/share/xml/scap/ssg/content/"
78
79
# Make it easy to change e.g. by sed substitution in spec files
80
# First name is the canonical addon name, rest are adapters
81
ADDON_NAMES = ["org_fedora_oscap", "com_redhat_oscap"]
82
83
COMPLAIN_ABOUT_NON_CANONICAL_NAMES = True
84
85
# Enable patches that set the content name at package-time
86
DEFAULT_SSG_CONTENT_NAME = ""
87
SSG_CONTENT = DEFAULT_SSG_CONTENT_NAME
88
if not SSG_CONTENT:
89
    if constants.shortProductName != 'anaconda':
90
        if constants.shortProductName == 'fedora':
91
            SSG_CONTENT = "ssg-fedora-ds.xml"
92
        else:
93
            SSG_CONTENT = (
94
                "ssg-{name}{version}-ds.xml"
95
                .format(
96
                    name=constants.shortProductName,
97
                    version=constants.productVersion.strip(".")[0]))
98
99
RESULTS_PATH = utils.join_paths(TARGET_CONTENT_DIR,
100
                                "eval_remediate_results.xml")
101
REPORT_PATH = utils.join_paths(TARGET_CONTENT_DIR,
102
                               "eval_remediate_report.html")
103
104
PRE_INSTALL_FIX_SYSTEM_ATTR = "urn:redhat:anaconda:pre"
105
106
THREAD_FETCH_DATA = "AnaOSCAPdataFetchThread"
107
108
SUPPORTED_ARCHIVES = (".zip", ".tar", ".tar.gz", ".tar.bz2", )
109
110
SUPPORTED_CONTENT_TYPES = (
111
    "datastream", "rpm", "archive", "scap-security-guide",
112
)
113
114
SUPPORTED_URL_PREFIXES = (
115
    "http://", "https://", "ftp://",  # LABEL:?, hdaX:?,
116
)
117
118
# buffer size for reading and writing out data (in bytes)
119
IO_BUF_SIZE = 2 * 1024 * 1024
120
121
# DBus constants
122
KDUMP = DBusServiceIdentifier(
123
    namespace=ADDONS_NAMESPACE,
124
    basename="Kdump",
125
    message_bus=DBus
126
)
127
128
129
class OSCAPaddonError(Exception):
130
    """Exception class for OSCAP addon related errors."""
131
132
    pass
133
134
135
class OSCAPaddonNetworkError(OSCAPaddonError):
136
    """Exception class for OSCAP addon related network errors."""
137
138
    pass
139
140
141
class ExtractionError(OSCAPaddonError):
142
    """Exception class for the extraction errors."""
143
144
    pass
145
146
147
MESSAGE_TYPE_FATAL = 0
148
MESSAGE_TYPE_WARNING = 1
149
MESSAGE_TYPE_INFO = 2
150
151
# namedtuple for messages returned from the rules evaluation
152
#   origin -- class (inherited from RuleHandler) that generated the message
153
#   type -- one of the MESSAGE_TYPE_* constants defined above
154
#   text -- the actual message that should be displayed, logged, ...
155
RuleMessage = namedtuple("RuleMessage", ["origin", "type", "text"])
156
157
158
class SubprocessLauncher(object):
159
    def __init__(self, args):
160
        self.args = args
161
        self.stdout = ""
162
        self.stderr = ""
163
        self.messages = []
164
        self.returncode = None
165
166
    def execute(self, ** kwargs):
167
        command_string = " ".join(self.args)
168
        log.info(
169
            "OSCAP addon: Executing subprocess: '{command_string}'"
170
            .format(command_string=command_string))
171
        try:
172
            proc = subprocess.Popen(self.args, stdout=subprocess.PIPE,
173
                                    stderr=subprocess.PIPE, ** kwargs)
174
        except OSError as oserr:
175
            msg = ("Failed to execute command '{command_string}': {oserr}"
176
                   .format(command_string=command_string, oserr=oserr))
177
            raise OSCAPaddonError(msg)
178
179
        (stdout, stderr) = proc.communicate()
180
        self.stdout = stdout.decode()
181
        self.stderr = stderr.decode(errors="replace")
182
        self.messages = re.findall(r'OpenSCAP Error:.*', self.stderr)
183
        self.messages = self.messages + re.findall(r'E: oscap:.*', self.stderr)
184
185
        self.returncode = proc.returncode
186
187
    def log_messages(self):
188
        for message in self.messages:
189
            log.warning("OSCAP addon: " + message)
190
191
192
def get_fix_rules_pre(profile, fpath, ds_id="", xccdf_id="", tailoring=""):
193
    """
194
    Get fix rules for the pre-installation environment for a given profile in a
195
    given datastream and checklist in a given file.
196
197
    :see: run_oscap_remediate
198
    :see: _run_oscap_gen_fix
199
    :return: fix rules for a given profile
200
    :rtype: str
201
202
    """
203
204
    return _run_oscap_gen_fix(profile, fpath, PRE_INSTALL_FIX_SYSTEM_ATTR,
205
                              ds_id=ds_id, xccdf_id=xccdf_id,
206
                              tailoring=tailoring)
207
208
209
def _run_oscap_gen_fix(profile, fpath, template, ds_id="", xccdf_id="",
210
                       tailoring=""):
211
    """
212
    Run oscap tool on a given file to get the contents of fix elements with the
213
    'system' attribute equal to a given template for a given datastream,
214
    checklist and profile.
215
216
    :see: run_oscap_remediate
217
    :param template: the value of the 'system' attribute of the fix elements
218
    :type template: str
219
    :return: oscap tool's stdout
220
    :rtype: str
221
222
    """
223
224
    if not profile:
225
        return ""
226
227
    args = ["oscap", "xccdf", "generate", "fix"]
228
    args.append("--template=%s" % template)
229
230
    # oscap uses the default profile by default
231
    if profile.lower() != "default":
232
        args.append("--profile=%s" % profile)
233
    if ds_id:
234
        args.append("--datastream-id=%s" % ds_id)
235
    if xccdf_id:
236
        args.append("--xccdf-id=%s" % xccdf_id)
237
    if tailoring:
238
        args.append("--tailoring-file=%s" % tailoring)
239
240
    args.append(fpath)
241
242
    proc = SubprocessLauncher(args)
243
    proc.execute()
244
    proc.log_messages()
245
    if proc.returncode != 0:
246
        msg = "Failed to generate fix rules with the oscap tool: %s" % proc.stderr
247
        raise OSCAPaddonError(msg)
248
249
    return proc.stdout
250
251
252
def do_chroot(chroot):
253
    """Helper function doing the chroot if requested."""
254
    if chroot and chroot != "/":
255
        os.chroot(chroot)
256
        os.chdir("/")
257
258
259
def assert_scanner_works(chroot, executable="oscap"):
260
    args = [executable, "--version"]
261
    command = " ".join(args)
262
263
    try:
264
        proc = subprocess.Popen(
265
            args, preexec_fn=lambda: do_chroot(chroot),
266
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
267
        (stdout, stderr) = proc.communicate()
268
        stderr = stderr.decode(errors="replace")
269
    except OSError as exc:
270
        msg = _(f"Basic invocation '{command}' fails: {str(exc)}")
271
        raise OSCAPaddonError(msg)
272
    if proc.returncode != 0:
273
        msg = _(
274
            f"Basic scanner invocation '{command}' exited "
275
            "with non-zero error code {proc.returncode}: {stderr}")
276
        raise OSCAPaddonError(msg)
277
    return True
278
279
280
def run_oscap_remediate(profile, fpath, ds_id="", xccdf_id="", tailoring="",
281
                        chroot=""):
282
    """
283
    Run the evaluation and remediation with the oscap tool on a given file,
284
    doing the remediation as defined in a given profile defined in a given
285
    checklist that is a part of a given datastream. If requested, run in
286
    chroot.
287
288
    :param profile: id of the profile that will drive the remediation
289
    :type profile: str
290
    :param fpath: path to a file with SCAP content
291
    :type fpath: str
292
    :param ds_id: ID of the datastream that contains the checklist defining
293
                  the profile
294
    :type ds_id: str
295
    :param xccdf_id: ID of the checklist that defines the profile
296
    :type xccdf_id: str
297
    :param tailoring: path to a tailoring file
298
    :type tailoring: str
299
    :param chroot: path to the root the oscap tool should be run in
300
    :type chroot: str
301
    :return: oscap tool's stdout (summary of the rules, checks and fixes)
302
    :rtype: str
303
304
    """
305
306
    if not profile:
307
        return ""
308
309
    # make sure the directory for the results exists
310
    results_dir = os.path.dirname(RESULTS_PATH)
311
    if chroot:
312
        results_dir = os.path.normpath(chroot + "/" + results_dir)
313
    utils.ensure_dir_exists(results_dir)
314
315
    args = ["oscap", "xccdf", "eval"]
316
    args.append("--remediate")
317
    args.append("--results=%s" % RESULTS_PATH)
318
    args.append("--report=%s" % REPORT_PATH)
319
320
    # oscap uses the default profile by default
321
    if profile.lower() != "default":
322
        args.append("--profile=%s" % profile)
323
    if ds_id:
324
        args.append("--datastream-id=%s" % ds_id)
325
    if xccdf_id:
326
        args.append("--xccdf-id=%s" % xccdf_id)
327
    if tailoring:
328
        args.append("--tailoring-file=%s" % tailoring)
329
330
    args.append(fpath)
331
332
    proc = SubprocessLauncher(args)
333
    proc.execute(preexec_fn=lambda: do_chroot(chroot))
334
    proc.log_messages()
335
336
    if proc.returncode not in (0, 2):
337
        # 0 -- success; 2 -- no error, but checks/remediation failed
338
        msg = "Content evaluation and remediation with the oscap tool "\
339
            "failed: %s" % proc.stderr
340
        raise OSCAPaddonError(msg)
341
342
    return proc.stdout
343
344
345
def _create_firstboot_config_string(
346
        profile, ds_path, results_path, report_path, ds_id, xccdf_id, tailoring_path):
347
    config = textwrap.dedent(f"""\
348
    OSCAP_REMEDIATE_DS='{ds_path}'
349
    OSCAP_REMEDIATE_PROFILE_ID='{profile}'
350
    OSCAP_REMEDIATE_ARF_RESULT='{results_path}'
351
    OSCAP_REMEDIATE_HTML_REPORT='{report_path}'
352
    """)
353
    if ds_id:
354
        config += f"OSCAP_REMEDIATE_DATASTREAM_ID='{ds_id}'\n"
355
    if xccdf_id:
356
        config += f"OSCAP_REMEDIATE_XCCDF_ID='{xccdf_id}'\n"
357
    if tailoring_path:
358
        config += f"OSCAP_REMEDIATE_TAILORING='{tailoring_path}'\n"
359
    return config
360
361
362
def _schedule_firstboot_remediation(
363
        chroot, profile, ds_path, results_path, report_path, ds_id, xccdf_id, tailoring_path):
364
    config = _create_firstboot_config_string(
365
        profile, ds_path, results_path, report_path, ds_id, xccdf_id, tailoring_path)
366
    relative_filename = "var/tmp/oscap-remediate-offline.conf.sh"
367
    local_config_filename = f"/{relative_filename}"
368
    chroot_config_filename = os.path.join(chroot, relative_filename)
369
    with open(chroot_config_filename, "w") as f:
370
        f.write(config)
371
    os.symlink(local_config_filename,
372
               os.path.join(chroot, "system-update"))
373
374
375
def schedule_firstboot_remediation(chroot, profile, fpath, ds_id="", xccdf_id="", tailoring=""):
376
    if not profile:
377
        return ""
378
379
    # make sure the directory for the results exists
380
    results_dir = os.path.dirname(RESULTS_PATH)
381
    results_dir = os.path.normpath(chroot + "/" + results_dir)
382
    utils.ensure_dir_exists(results_dir)
383
384
    log.info("OSCAP addon: Scheduling firstboot remediation")
385
    _schedule_firstboot_remediation(
386
        chroot, profile, fpath, RESULTS_PATH, REPORT_PATH, ds_id, xccdf_id, tailoring)
387
388
    return ""
389
390
391
def extract_data(archive, out_dir, ensure_has_files=None):
392
    """
393
    Fuction that extracts the given archive to the given output directory. It
394
    tries to find out the archive type by the file name.
395
396
    :param archive: path to the archive file that should be extracted
397
    :type archive: str
398
    :param out_dir: output directory the archive should be extracted to
399
    :type out_dir: str
400
    :param ensure_has_files: relative paths to the files that must exist in the
401
                             archive
402
    :type ensure_has_files: iterable of strings or None
403
    :return: a list of files and directories extracted from the archive
404
    :rtype: [str]
405
406
    """
407
408
    if not ensure_has_files:
409
        ensure_has_files = []
410
411
    # get rid of empty file paths
412
    if not ensure_has_files:
413
        ensure_has_files = []
414
    else:
415
        ensure_has_files = [fpath for fpath in ensure_has_files if fpath]
416
417
    msg = "OSCAP addon: Extracting {archive}".format(archive=archive)
418
    if ensure_has_files:
419
        msg += ", expecting to find {files} there.".format(files=tuple(ensure_has_files))
420
    log.info(msg)
421
422
    result = []
423
    if archive.endswith(".zip"):
424
        # ZIP file
425
        try:
426
            zfile = zipfile.ZipFile(archive, "r")
427
        except Exception as exc:
428
            msg = _(f"Error extracting archive as a zipfile: {exc}")
429
            raise ExtractionError(msg)
430
431
        # generator for the paths of the files found in the archive (dirs end
432
        # with "/")
433
        files = set(info.filename for info in zfile.filelist
434
                    if not info.filename.endswith("/"))
435
        for fpath in ensure_has_files or ():
436
            if fpath not in files:
437
                msg = "File '%s' not found in the archive '%s'" % (fpath,
438
                                                                   archive)
439
                raise ExtractionError(msg)
440
441
        utils.ensure_dir_exists(out_dir)
442
        zfile.extractall(path=out_dir)
443
        result = [utils.join_paths(out_dir, info.filename) for info in zfile.filelist]
444
        zfile.close()
445
    elif archive.endswith(".tar"):
446
        # plain tarball
447
        result = _extract_tarball(archive, out_dir, ensure_has_files, None)
448
    elif archive.endswith(".tar.gz"):
449
        # gzipped tarball
450
        result = _extract_tarball(archive, out_dir, ensure_has_files, "gz")
451
    elif archive.endswith(".tar.bz2"):
452
        # bzipped tarball
453
        result = _extract_tarball(archive, out_dir, ensure_has_files, "bz2")
454
    elif archive.endswith(".rpm"):
455
        # RPM
456
        result = _extract_rpm(archive, out_dir, ensure_has_files)
457
    # elif other types of archives
458
    else:
459
        raise ExtractionError("Unsuported archive type")
460
    log.info("OSCAP addon: Extracted {files} from the supplied content"
461
             .format(files=result))
462
    return result
463
464
465
def _extract_tarball(archive, out_dir, ensure_has_files, alg):
466
    """
467
    Extract the given TAR archive to the given output directory and make sure
468
    the given file exists in the archive.
469
470
    :see: extract_data
471
    :param alg: compression algorithm used for the tarball
472
    :type alg: str (one of "gz", "bz2") or None
473
    :return: a list of files and directories extracted from the archive
474
    :rtype: [str]
475
476
    """
477
478
    if alg and alg not in ("gz", "bz2",):
479
        raise ExtractionError("Unsupported compression algorithm")
480
481
    mode = "r"
482
    if alg:
483
        mode += ":%s" % alg
484
485
    try:
486
        tfile = tarfile.TarFile.open(archive, mode)
487
    except tarfile.TarError as err:
488
        raise ExtractionError(str(err))
489
490
    # generator for the paths of the files found in the archive
491
    files = set(member.path for member in tfile.getmembers()
492
                if member.isfile())
493
494
    for fpath in ensure_has_files or ():
495
        if fpath not in files:
496
            msg = "File '%s' not found in the archive '%s'" % (fpath, archive)
497
            raise ExtractionError(msg)
498
499
    utils.ensure_dir_exists(out_dir)
500
    tfile.extractall(path=out_dir)
501
    result = [utils.join_paths(out_dir, member.path) for member in tfile.getmembers()]
502
    tfile.close()
503
504
    return result
505
506
507
def _extract_rpm(rpm_path, root="/", ensure_has_files=None):
508
    """
509
    Extract the given RPM into the directory tree given by the root argument
510
    and make sure the given file exists in the archive.
511
512
    :param rpm_path: path to the RPM file that should be extracted
513
    :type rpm_path: str
514
    :param root: root of the directory tree the RPM should be extracted into
515
    :type root: str
516
    :param ensure_has_files: relative paths to the files that must exist in the
517
                             RPM
518
    :type ensure_has_files: iterable of strings or None
519
    :return: a list of files and directories extracted from the archive
520
    :rtype: [str]
521
522
    """
523
524
    # run rpm2cpio and process the output with the cpioarchive module
525
    temp_fd, temp_path = tempfile.mkstemp(prefix="oscap_rpm")
526
    proc = subprocess.Popen(["rpm2cpio", rpm_path], stdout=temp_fd)
527
    proc.wait()
528
    if proc.returncode != 0:
529
        msg = "Failed to convert RPM '%s' to cpio archive" % rpm_path
530
        raise ExtractionError(msg)
531
532
    os.close(temp_fd)
533
534
    try:
535
        archive = cpioarchive.CpioArchive(temp_path)
536
    except cpioarchive.CpioError as err:
537
        raise ExtractionError(str(err))
538
539
    # get entries from the archive (supports only iteration over entries)
540
    entries = set(entry for entry in archive)
541
542
    # cpio entry names (paths) start with the dot
543
    entry_names = [entry.name.lstrip(".") for entry in entries]
544
545
    for fpath in ensure_has_files or ():
546
        # RPM->cpio entries have absolute paths
547
        if fpath not in entry_names and \
548
           os.path.join("/", fpath) not in entry_names:
549
            msg = "File '%s' not found in the archive '%s'" % (fpath, rpm_path)
550
            raise ExtractionError(msg)
551
552
    try:
553
        for entry in entries:
554
            if entry.size == 0:
555
                continue
556
            dirname = os.path.dirname(entry.name.lstrip("."))
557
            out_dir = os.path.normpath(root + dirname)
558
            utils.ensure_dir_exists(out_dir)
559
560
            out_fpath = os.path.normpath(root + entry.name.lstrip("."))
561
            if os.path.exists(out_fpath):
562
                continue
563
            with open(out_fpath, "wb") as out_file:
564
                buf = entry.read(IO_BUF_SIZE)
565
                while buf:
566
                    out_file.write(buf)
567
                    buf = entry.read(IO_BUF_SIZE)
568
    except (IOError, cpioarchive.CpioError) as e:
569
        raise ExtractionError(e)
570
571
    # cleanup
572
    archive.close()
573
    os.unlink(temp_path)
574
575
    return [os.path.normpath(root + name) for name in entry_names]
576
577
578
def strip_content_dir(fpaths, phase="preinst"):
579
    """
580
    Strip content directory prefix from the file paths for either
581
    pre-installation or post-installation phase.
582
583
    :param fpaths: iterable of file paths to strip content directory prefix
584
                   from
585
    :type fpaths: iterable of strings
586
    :param phase: specifies pre-installation or post-installation phase
587
    :type phase: "preinst" or "postinst"
588
    :return: the same iterable of file paths as given with the content
589
             directory prefix stripped
590
    :rtype: same type as fpaths
591
592
    """
593
594
    if phase == "preinst":
595
        remove_prefix = lambda x: x[len(INSTALLATION_CONTENT_DIR):]
596
    else:
597
        remove_prefix = lambda x: x[len(TARGET_CONTENT_DIR):]
598
599
    return utils.keep_type_map(remove_prefix, fpaths)
600
601
602
def get_ssg_path(root="/"):
603
    return utils.join_paths(root, SSG_DIR + SSG_CONTENT)
604
605
606
def ssg_available(root="/"):
607
    """
608
    Tries to find the SCAP Security Guide under the given root.
609
610
    :return: True if SSG was found under the given root, False otherwise
611
612
    """
613
614
    return os.path.exists(get_ssg_path(root))
615
616
617
def get_content_name(data):
618
    if data.content_type == "scap-security-guide":
619
        raise ValueError("Using scap-security-guide, no single content file")
620
621
    rest = "/anonymous_content"
622
    for prefix in SUPPORTED_URL_PREFIXES:
623
        if data.content_url.startswith(prefix):
624
            rest = data.content_url[len(prefix):]
625
            break
626
627
    parts = rest.rsplit("/", 1)
628
    if len(parts) != 2:
629
        raise ValueError("Unsupported url '%s'" % data.content_url)
630
631
    return parts[1]
632
633
634
def get_raw_preinst_content_path(data):
635
    """Path to the raw (unextracted, ...) pre-installation content file"""
636
    if data.content_type == "scap-security-guide":
637
        log.debug("OSCAP addon: Using scap-security-guide, no single content file")
638
        return None
639
640
    content_name = get_content_name(data)
641
    return utils.join_paths(INSTALLATION_CONTENT_DIR, content_name)
642
643
644
def get_preinst_content_path(data):
645
    """Path to the pre-installation content file"""
646
    if data.content_type == "scap-security-guide":
647
        # SSG is not copied to the standard place
648
        return data.content_path
649
650
    if data.content_type == "datastream":
651
        return get_raw_preinst_content_path(data)
652
653
    return utils.join_paths(
654
        INSTALLATION_CONTENT_DIR,
655
        data.content_path
656
    )
657
658
659
def get_postinst_content_path(data):
660
    """Path to the post-installation content file"""
661
    if data.content_type == "datastream":
662
        return utils.join_paths(
663
            TARGET_CONTENT_DIR,
664
            get_content_name(data)
665
        )
666
667
    if data.content_type in ("rpm", "scap-security-guide"):
668
        # no path magic in case of RPM (SSG is installed as an RPM)
669
        return data.content_path
670
671
    return utils.join_paths(
672
        TARGET_CONTENT_DIR,
673
        data.content_path
674
    )
675
676
677
def get_preinst_tailoring_path(data):
678
    """Path to the pre-installation tailoring file (if any)"""
679
    if not data.tailoring_path:
680
        return ""
681
682
    return utils.join_paths(
683
        INSTALLATION_CONTENT_DIR,
684
        data.tailoring_path
685
    )
686
687
688
def get_postinst_tailoring_path(data):
689
    """Path to the post-installation tailoring file (if any)"""
690
    if not data.tailoring_path:
691
        return ""
692
693
    if data.content_type == "rpm":
694
        # no path magic in case of RPM
695
        return data.tailoring_path
696
697
    return utils.join_paths(
698
        TARGET_CONTENT_DIR,
699
        data.tailoring_path
700
    )
701
702
703
def get_payload_proxy():
704
    """Get the DBus proxy of the active payload.
705
706
    :return: a DBus proxy
707
    """
708
    payloads_proxy = PAYLOADS.get_proxy()
709
    object_path = payloads_proxy.ActivePayload
710
711
    if not object_path:
712
        raise ValueError("Active payload is not set.")
713
714
    return PAYLOADS.get_proxy(object_path)
715
716
717
def get_packages_data() -> PackagesSelectionData:
718
    """Get the DBus data with the packages configuration.
719
720
    :return: a packages configuration
721
    """
722
    payload_proxy = get_payload_proxy()
723
724
    if payload_proxy.Type != PAYLOAD_TYPE_DNF:
725
        return PackagesSelectionData()
726
727
    return PackagesSelectionData.from_structure(
728
        payload_proxy.PackagesSelection
729
    )
730
731
732
def set_packages_data(data: PackagesSelectionData):
733
    """Set the DBus data with the packages configuration.
734
735
    :param data: a packages configuration
736
    """
737
    payload_proxy = get_payload_proxy()
738
739
    if payload_proxy.Type != PAYLOAD_TYPE_DNF:
740
        log.debug("OSCAP addon: The payload doesn't support packages.")
741
        return
742
743
    payload_proxy.PackagesSelection = \
744
        PackagesSelectionData.to_structure(data)
745