Passed
Push — rhel9-branch ( 1d9a8a...9c10fe )
by Jan
02:20 queued 13s
created

org_fedora_oscap.common   F

Complexity

Total Complexity 100

Size/Duplication

Total Lines 739
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 100
eloc 382
dl 0
loc 739
rs 2
c 0
b 0
f 0

24 Functions

Rating   Name   Duplication   Size   Complexity  
C run_oscap_remediate() 0 63 9
A assert_scanner_works() 0 19 4
A do_chroot() 0 5 3
A get_fix_rules_pre() 0 15 1
B _run_oscap_gen_fix() 0 41 7
A _() 0 5 2
A N_() 0 1 1
A get_postinst_tailoring_path() 0 12 3
A get_ssg_path() 0 2 1
A get_postinst_content_path() 0 15 3
A get_preinst_content_path() 0 12 3
A get_raw_preinst_content_path() 0 8 2
B _extract_tarball() 0 40 7
A get_packages_data() 0 12 2
A get_content_name() 0 15 5
A _schedule_firstboot_remediation() 0 22 5
A strip_content_dir() 0 22 4
A schedule_firstboot_remediation() 0 14 2
A get_preinst_tailoring_path() 0 8 2
D extract_data() 0 72 12
A get_payload_proxy() 0 12 2
A ssg_available() 0 9 1
D _extract_rpm() 0 69 12
A set_packages_data() 0 13 2

3 Methods

Rating   Name   Duplication   Size   Complexity  
A SubprocessLauncher.execute() 0 20 2
A SubprocessLauncher.__init__() 0 6 1
A SubprocessLauncher.log_messages() 0 3 2

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.common often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""
22
Module with various classes and functions needed by the OSCAP addon that are
23
not specific to any installation mode (tui, gui, ks).
24
25
"""
26
27
import os
28
import tempfile
29
import subprocess
30
import zipfile
31
import tarfile
32
import textwrap
33
34
import re
35
import logging
36
37
from collections import namedtuple
38
import gettext
39
from functools import wraps
40
41
from dasbus.identifier import DBusServiceIdentifier
42
from pyanaconda.core import constants
43
from pyanaconda.core.dbus import DBus
44
from pyanaconda.core.constants import PAYLOAD_TYPE_DNF
45
from pyanaconda.modules.common.constants.namespaces import ADDONS_NAMESPACE
46
from pyanaconda.modules.common.constants.services import PAYLOADS
47
from pyanaconda.modules.common.structures.payload import PackagesConfigurationData
48
from pyanaconda.threading import threadMgr, AnacondaThread
49
50
from org_fedora_oscap import utils
51
from org_fedora_oscap import cpioarchive
52
53
54
log = logging.getLogger("anaconda")
55
56
57
# mimick pyanaconda/core/i18n.py
58
def _(string):
59
    if string:
60
        return gettext.translation("oscap-anaconda-addon", fallback=True).gettext(string)
61
    else:
62
        return ""
63
64
65
def N_(string): return string
66
67
68
# everything else should be private
69
__all__ = ["run_oscap_remediate", "get_fix_rules_pre",
70
           "extract_data", "strip_content_dir",
71
           "OSCAPaddonError", "get_payload_proxy", "get_packages_data",
72
           "set_packages_data"]
73
74
INSTALLATION_CONTENT_DIR = "/tmp/openscap_data/"
75
TARGET_CONTENT_DIR = "/root/openscap_data/"
76
77
SSG_DIR = "/usr/share/xml/scap/ssg/content/"
78
79
# Make it easy to change e.g. by sed substitution in spec files
80
# First name is the canonical addon name, rest are adapters
81
ADDON_NAMES = ["com_redhat_oscap", "org_fedora_oscap"]
82
83
COMPLAIN_ABOUT_NON_CANONICAL_NAMES = True
84
85
# Enable patches that set the content name at package-time
86
DEFAULT_SSG_CONTENT_NAME = ""
87
SSG_CONTENT = DEFAULT_SSG_CONTENT_NAME
88
if not SSG_CONTENT:
89
    if constants.shortProductName != 'anaconda':
90
        if constants.shortProductName == 'fedora':
91
            SSG_CONTENT = "ssg-fedora-ds.xml"
92
        else:
93
            SSG_CONTENT = (
94
                "ssg-{name}{version}-ds.xml"
95
                .format(
96
                    name=constants.shortProductName,
97
                    version=constants.productVersion.strip(".")[0]))
98
99
RESULTS_PATH = utils.join_paths(TARGET_CONTENT_DIR,
100
                                "eval_remediate_results.xml")
101
REPORT_PATH = utils.join_paths(TARGET_CONTENT_DIR,
102
                               "eval_remediate_report.html")
103
104
PRE_INSTALL_FIX_SYSTEM_ATTR = "urn:redhat:anaconda:pre"
105
106
THREAD_FETCH_DATA = "AnaOSCAPdataFetchThread"
107
108
SUPPORTED_ARCHIVES = (".zip", ".tar", ".tar.gz", ".tar.bz2", )
109
110
SUPPORTED_CONTENT_TYPES = (
111
    "datastream", "rpm", "archive", "scap-security-guide",
112
)
113
114
SUPPORTED_URL_PREFIXES = (
115
    "http://", "https://", "ftp://",  # LABEL:?, hdaX:?,
116
)
117
118
# buffer size for reading and writing out data (in bytes)
119
IO_BUF_SIZE = 2 * 1024 * 1024
120
121
# DBus constants
122
KDUMP = DBusServiceIdentifier(
123
    namespace=ADDONS_NAMESPACE,
124
    basename="Kdump",
125
    message_bus=DBus
126
)
127
128
129
class OSCAPaddonError(Exception):
130
    """Exception class for OSCAP addon related errors."""
131
132
    pass
133
134
135
class OSCAPaddonNetworkError(OSCAPaddonError):
136
    """Exception class for OSCAP addon related network errors."""
137
138
    pass
139
140
141
class ExtractionError(OSCAPaddonError):
142
    """Exception class for the extraction errors."""
143
144
    pass
145
146
147
MESSAGE_TYPE_FATAL = 0
148
MESSAGE_TYPE_WARNING = 1
149
MESSAGE_TYPE_INFO = 2
150
151
# namedtuple for messages returned from the rules evaluation
152
#   origin -- class (inherited from RuleHandler) that generated the message
153
#   type -- one of the MESSAGE_TYPE_* constants defined above
154
#   text -- the actual message that should be displayed, logged, ...
155
RuleMessage = namedtuple("RuleMessage", ["origin", "type", "text"])
156
157
158
class SubprocessLauncher(object):
159
    def __init__(self, args):
160
        self.args = args
161
        self.stdout = ""
162
        self.stderr = ""
163
        self.messages = []
164
        self.returncode = None
165
166
    def execute(self, ** kwargs):
167
        command_string = " ".join(self.args)
168
        log.info(
169
            "OSCAP addon: Executing subprocess: '{command_string}'"
170
            .format(command_string=command_string))
171
        try:
172
            proc = subprocess.Popen(self.args, stdout=subprocess.PIPE,
173
                                    stderr=subprocess.PIPE, ** kwargs)
174
        except OSError as oserr:
175
            msg = ("Failed to execute command '{command_string}': {oserr}"
176
                   .format(command_string=command_string, oserr=oserr))
177
            raise OSCAPaddonError(msg)
178
179
        (stdout, stderr) = proc.communicate()
180
        self.stdout = stdout.decode()
181
        self.stderr = stderr.decode(errors="replace")
182
        self.messages = re.findall(r'OpenSCAP Error:.*', self.stderr)
183
        self.messages = self.messages + re.findall(r'E: oscap:.*', self.stderr)
184
185
        self.returncode = proc.returncode
186
187
    def log_messages(self):
188
        for message in self.messages:
189
            log.warning("OSCAP addon: " + message)
190
191
192
def get_fix_rules_pre(profile, fpath, ds_id="", xccdf_id="", tailoring=""):
193
    """
194
    Get fix rules for the pre-installation environment for a given profile in a
195
    given datastream and checklist in a given file.
196
197
    :see: run_oscap_remediate
198
    :see: _run_oscap_gen_fix
199
    :return: fix rules for a given profile
200
    :rtype: str
201
202
    """
203
204
    return _run_oscap_gen_fix(profile, fpath, PRE_INSTALL_FIX_SYSTEM_ATTR,
205
                              ds_id=ds_id, xccdf_id=xccdf_id,
206
                              tailoring=tailoring)
207
208
209
def _run_oscap_gen_fix(profile, fpath, template, ds_id="", xccdf_id="",
210
                       tailoring=""):
211
    """
212
    Run oscap tool on a given file to get the contents of fix elements with the
213
    'system' attribute equal to a given template for a given datastream,
214
    checklist and profile.
215
216
    :see: run_oscap_remediate
217
    :param template: the value of the 'system' attribute of the fix elements
218
    :type template: str
219
    :return: oscap tool's stdout
220
    :rtype: str
221
222
    """
223
224
    if not profile:
225
        return ""
226
227
    args = ["oscap", "xccdf", "generate", "fix"]
228
    args.append("--template=%s" % template)
229
230
    # oscap uses the default profile by default
231
    if profile.lower() != "default":
232
        args.append("--profile=%s" % profile)
233
    if ds_id:
234
        args.append("--datastream-id=%s" % ds_id)
235
    if xccdf_id:
236
        args.append("--xccdf-id=%s" % xccdf_id)
237
    if tailoring:
238
        args.append("--tailoring-file=%s" % tailoring)
239
240
    args.append(fpath)
241
242
    proc = SubprocessLauncher(args)
243
    proc.execute()
244
    proc.log_messages()
245
    if proc.returncode != 0:
246
        msg = "Failed to generate fix rules with the oscap tool: %s" % proc.stderr
247
        raise OSCAPaddonError(msg)
248
249
    return proc.stdout
250
251
252
def do_chroot(chroot):
253
    """Helper function doing the chroot if requested."""
254
    if chroot and chroot != "/":
255
        os.chroot(chroot)
256
        os.chdir("/")
257
258
259
def assert_scanner_works(chroot, executable="oscap"):
260
    args = [executable, "--version"]
261
    command = " ".join(args)
262
263
    try:
264
        proc = subprocess.Popen(
265
            args, preexec_fn=lambda: do_chroot(chroot),
266
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
267
        (stdout, stderr) = proc.communicate()
268
        stderr = stderr.decode(errors="replace")
269
    except OSError as exc:
270
        msg = _(f"Basic invocation '{command}' fails: {str(exc)}")
271
        raise OSCAPaddonError(msg)
272
    if proc.returncode != 0:
273
        msg = _(
274
            f"Basic scanner invocation '{command}' exited "
275
            "with non-zero error code {proc.returncode}: {stderr}")
276
        raise OSCAPaddonError(msg)
277
    return True
278
279
280
def run_oscap_remediate(profile, fpath, ds_id="", xccdf_id="", tailoring="",
281
                        chroot=""):
282
    """
283
    Run the evaluation and remediation with the oscap tool on a given file,
284
    doing the remediation as defined in a given profile defined in a given
285
    checklist that is a part of a given datastream. If requested, run in
286
    chroot.
287
288
    :param profile: id of the profile that will drive the remediation
289
    :type profile: str
290
    :param fpath: path to a file with SCAP content
291
    :type fpath: str
292
    :param ds_id: ID of the datastream that contains the checklist defining
293
                  the profile
294
    :type ds_id: str
295
    :param xccdf_id: ID of the checklist that defines the profile
296
    :type xccdf_id: str
297
    :param tailoring: path to a tailoring file
298
    :type tailoring: str
299
    :param chroot: path to the root the oscap tool should be run in
300
    :type chroot: str
301
    :return: oscap tool's stdout (summary of the rules, checks and fixes)
302
    :rtype: str
303
304
    """
305
306
    if not profile:
307
        return ""
308
309
    # make sure the directory for the results exists
310
    results_dir = os.path.dirname(RESULTS_PATH)
311
    if chroot:
312
        results_dir = os.path.normpath(chroot + "/" + results_dir)
313
    utils.ensure_dir_exists(results_dir)
314
315
    args = ["oscap", "xccdf", "eval"]
316
    args.append("--remediate")
317
    args.append("--results=%s" % RESULTS_PATH)
318
    args.append("--report=%s" % REPORT_PATH)
319
320
    # oscap uses the default profile by default
321
    if profile.lower() != "default":
322
        args.append("--profile=%s" % profile)
323
    if ds_id:
324
        args.append("--datastream-id=%s" % ds_id)
325
    if xccdf_id:
326
        args.append("--xccdf-id=%s" % xccdf_id)
327
    if tailoring:
328
        args.append("--tailoring-file=%s" % tailoring)
329
330
    args.append(fpath)
331
332
    proc = SubprocessLauncher(args)
333
    proc.execute(preexec_fn=lambda: do_chroot(chroot))
334
    proc.log_messages()
335
336
    if proc.returncode not in (0, 2):
337
        # 0 -- success; 2 -- no error, but checks/remediation failed
338
        msg = "Content evaluation and remediation with the oscap tool "\
339
            "failed: %s" % proc.stderr
340
        raise OSCAPaddonError(msg)
341
342
    return proc.stdout
343
344
345
def _schedule_firstboot_remediation(
346
        chroot, profile, ds_path, results_path, report_path, ds_id, xccdf_id, tailoring_path):
347
    config = textwrap.dedent(f"""\
348
    OSCAP_REMEDIATE_DS='{ds_path}'
349
    OSCAP_REMEDIATE_PROFILE_ID='{profile}'
350
    OSCAP_REMEDIATE_ARF_RESULT='{results_path}'
351
    OSCAP_REMEDIATE_HTML_REPORT='{report_path}'
352
    """)
353
    if ds_id:
354
        config += "OSCAP_REMEDIATE_DATASTREAM_ID='{ds_id}'\n"
355
    if xccdf_id:
356
        config += "OSCAP_REMEDIATE_XCCDF_ID='{xccdf_id}'\n"
357
    if tailoring_path:
358
        config += "OSCAP_REMEDIATE_TAILORING='{tailoring_path}'\n"
359
360
    relative_filename = "var/tmp/oscap-remediate-offline.conf.sh"
361
    local_config_filename = f"/{relative_filename}"
362
    chroot_config_filename = os.path.join(chroot, relative_filename)
363
    with open(chroot_config_filename, "w") as f:
364
        f.write(config)
365
    os.symlink(local_config_filename,
366
               os.path.join(chroot, "system-update"))
367
368
369
def schedule_firstboot_remediation(chroot, profile, fpath, ds_id="", xccdf_id="", tailoring=""):
370
    if not profile:
371
        return ""
372
373
    # make sure the directory for the results exists
374
    results_dir = os.path.dirname(RESULTS_PATH)
375
    results_dir = os.path.normpath(chroot + "/" + results_dir)
376
    utils.ensure_dir_exists(results_dir)
377
378
    log.info("OSCAP addon: Scheduling firstboot remediation")
379
    _schedule_firstboot_remediation(
380
        chroot, profile, fpath, RESULTS_PATH, REPORT_PATH, ds_id, xccdf_id, tailoring)
381
382
    return ""
383
384
385
def extract_data(archive, out_dir, ensure_has_files=None):
386
    """
387
    Fuction that extracts the given archive to the given output directory. It
388
    tries to find out the archive type by the file name.
389
390
    :param archive: path to the archive file that should be extracted
391
    :type archive: str
392
    :param out_dir: output directory the archive should be extracted to
393
    :type out_dir: str
394
    :param ensure_has_files: relative paths to the files that must exist in the
395
                             archive
396
    :type ensure_has_files: iterable of strings or None
397
    :return: a list of files and directories extracted from the archive
398
    :rtype: [str]
399
400
    """
401
402
    if not ensure_has_files:
403
        ensure_has_files = []
404
405
    # get rid of empty file paths
406
    if not ensure_has_files:
407
        ensure_has_files = []
408
    else:
409
        ensure_has_files = [fpath for fpath in ensure_has_files if fpath]
410
411
    msg = "OSCAP addon: Extracting {archive}".format(archive=archive)
412
    if ensure_has_files:
413
        msg += ", expecting to find {files} there.".format(files=tuple(ensure_has_files))
414
    log.info(msg)
415
416
    result = []
417
    if archive.endswith(".zip"):
418
        # ZIP file
419
        try:
420
            zfile = zipfile.ZipFile(archive, "r")
421
        except Exception as exc:
422
            msg = _(f"Error extracting archive as a zipfile: {exc}")
423
            raise ExtractionError(msg)
424
425
        # generator for the paths of the files found in the archive (dirs end
426
        # with "/")
427
        files = set(info.filename for info in zfile.filelist
428
                    if not info.filename.endswith("/"))
429
        for fpath in ensure_has_files or ():
430
            if fpath not in files:
431
                msg = "File '%s' not found in the archive '%s'" % (fpath,
432
                                                                   archive)
433
                raise ExtractionError(msg)
434
435
        utils.ensure_dir_exists(out_dir)
436
        zfile.extractall(path=out_dir)
437
        result = [utils.join_paths(out_dir, info.filename) for info in zfile.filelist]
438
        zfile.close()
439
    elif archive.endswith(".tar"):
440
        # plain tarball
441
        result = _extract_tarball(archive, out_dir, ensure_has_files, None)
442
    elif archive.endswith(".tar.gz"):
443
        # gzipped tarball
444
        result = _extract_tarball(archive, out_dir, ensure_has_files, "gz")
445
    elif archive.endswith(".tar.bz2"):
446
        # bzipped tarball
447
        result = _extract_tarball(archive, out_dir, ensure_has_files, "bz2")
448
    elif archive.endswith(".rpm"):
449
        # RPM
450
        result = _extract_rpm(archive, out_dir, ensure_has_files)
451
    # elif other types of archives
452
    else:
453
        raise ExtractionError("Unsuported archive type")
454
    log.info("OSCAP addon: Extracted {files} from the supplied content"
455
             .format(files=result))
456
    return result
457
458
459
def _extract_tarball(archive, out_dir, ensure_has_files, alg):
460
    """
461
    Extract the given TAR archive to the given output directory and make sure
462
    the given file exists in the archive.
463
464
    :see: extract_data
465
    :param alg: compression algorithm used for the tarball
466
    :type alg: str (one of "gz", "bz2") or None
467
    :return: a list of files and directories extracted from the archive
468
    :rtype: [str]
469
470
    """
471
472
    if alg and alg not in ("gz", "bz2",):
473
        raise ExtractionError("Unsupported compression algorithm")
474
475
    mode = "r"
476
    if alg:
477
        mode += ":%s" % alg
478
479
    try:
480
        tfile = tarfile.TarFile.open(archive, mode)
481
    except tarfile.TarError as err:
482
        raise ExtractionError(str(err))
483
484
    # generator for the paths of the files found in the archive
485
    files = set(member.path for member in tfile.getmembers()
486
                if member.isfile())
487
488
    for fpath in ensure_has_files or ():
489
        if fpath not in files:
490
            msg = "File '%s' not found in the archive '%s'" % (fpath, archive)
491
            raise ExtractionError(msg)
492
493
    utils.ensure_dir_exists(out_dir)
494
    tfile.extractall(path=out_dir)
495
    result = [utils.join_paths(out_dir, member.path) for member in tfile.getmembers()]
496
    tfile.close()
497
498
    return result
499
500
501
def _extract_rpm(rpm_path, root="/", ensure_has_files=None):
502
    """
503
    Extract the given RPM into the directory tree given by the root argument
504
    and make sure the given file exists in the archive.
505
506
    :param rpm_path: path to the RPM file that should be extracted
507
    :type rpm_path: str
508
    :param root: root of the directory tree the RPM should be extracted into
509
    :type root: str
510
    :param ensure_has_files: relative paths to the files that must exist in the
511
                             RPM
512
    :type ensure_has_files: iterable of strings or None
513
    :return: a list of files and directories extracted from the archive
514
    :rtype: [str]
515
516
    """
517
518
    # run rpm2cpio and process the output with the cpioarchive module
519
    temp_fd, temp_path = tempfile.mkstemp(prefix="oscap_rpm")
520
    proc = subprocess.Popen(["rpm2cpio", rpm_path], stdout=temp_fd)
521
    proc.wait()
522
    if proc.returncode != 0:
523
        msg = "Failed to convert RPM '%s' to cpio archive" % rpm_path
524
        raise ExtractionError(msg)
525
526
    os.close(temp_fd)
527
528
    try:
529
        archive = cpioarchive.CpioArchive(temp_path)
530
    except cpioarchive.CpioError as err:
531
        raise ExtractionError(str(err))
532
533
    # get entries from the archive (supports only iteration over entries)
534
    entries = set(entry for entry in archive)
535
536
    # cpio entry names (paths) start with the dot
537
    entry_names = [entry.name.lstrip(".") for entry in entries]
538
539
    for fpath in ensure_has_files or ():
540
        # RPM->cpio entries have absolute paths
541
        if fpath not in entry_names and \
542
           os.path.join("/", fpath) not in entry_names:
543
            msg = "File '%s' not found in the archive '%s'" % (fpath, rpm_path)
544
            raise ExtractionError(msg)
545
546
    try:
547
        for entry in entries:
548
            if entry.size == 0:
549
                continue
550
            dirname = os.path.dirname(entry.name.lstrip("."))
551
            out_dir = os.path.normpath(root + dirname)
552
            utils.ensure_dir_exists(out_dir)
553
554
            out_fpath = os.path.normpath(root + entry.name.lstrip("."))
555
            if os.path.exists(out_fpath):
556
                continue
557
            with open(out_fpath, "wb") as out_file:
558
                buf = entry.read(IO_BUF_SIZE)
559
                while buf:
560
                    out_file.write(buf)
561
                    buf = entry.read(IO_BUF_SIZE)
562
    except (IOError, cpioarchive.CpioError) as e:
563
        raise ExtractionError(e)
564
565
    # cleanup
566
    archive.close()
567
    os.unlink(temp_path)
568
569
    return [os.path.normpath(root + name) for name in entry_names]
570
571
572
def strip_content_dir(fpaths, phase="preinst"):
573
    """
574
    Strip content directory prefix from the file paths for either
575
    pre-installation or post-installation phase.
576
577
    :param fpaths: iterable of file paths to strip content directory prefix
578
                   from
579
    :type fpaths: iterable of strings
580
    :param phase: specifies pre-installation or post-installation phase
581
    :type phase: "preinst" or "postinst"
582
    :return: the same iterable of file paths as given with the content
583
             directory prefix stripped
584
    :rtype: same type as fpaths
585
586
    """
587
588
    if phase == "preinst":
589
        remove_prefix = lambda x: x[len(INSTALLATION_CONTENT_DIR):]
590
    else:
591
        remove_prefix = lambda x: x[len(TARGET_CONTENT_DIR):]
592
593
    return utils.keep_type_map(remove_prefix, fpaths)
594
595
596
def get_ssg_path(root="/"):
597
    return utils.join_paths(root, SSG_DIR + SSG_CONTENT)
598
599
600
def ssg_available(root="/"):
601
    """
602
    Tries to find the SCAP Security Guide under the given root.
603
604
    :return: True if SSG was found under the given root, False otherwise
605
606
    """
607
608
    return os.path.exists(get_ssg_path(root))
609
610
611
def get_content_name(data):
612
    if data.content_type == "scap-security-guide":
613
        raise ValueError("Using scap-security-guide, no single content file")
614
615
    rest = "/anonymous_content"
616
    for prefix in SUPPORTED_URL_PREFIXES:
617
        if data.content_url.startswith(prefix):
618
            rest = data.content_url[len(prefix):]
619
            break
620
621
    parts = rest.rsplit("/", 1)
622
    if len(parts) != 2:
623
        raise ValueError("Unsupported url '%s'" % data.content_url)
624
625
    return parts[1]
626
627
628
def get_raw_preinst_content_path(data):
629
    """Path to the raw (unextracted, ...) pre-installation content file"""
630
    if data.content_type == "scap-security-guide":
631
        log.debug("OSCAP addon: Using scap-security-guide, no single content file")
632
        return None
633
634
    content_name = get_content_name(data)
635
    return utils.join_paths(INSTALLATION_CONTENT_DIR, content_name)
636
637
638
def get_preinst_content_path(data):
639
    """Path to the pre-installation content file"""
640
    if data.content_type == "scap-security-guide":
641
        # SSG is not copied to the standard place
642
        return data.content_path
643
644
    if data.content_type == "datastream":
645
        return get_raw_preinst_content_path(data)
646
647
    return utils.join_paths(
648
        INSTALLATION_CONTENT_DIR,
649
        data.content_path
650
    )
651
652
653
def get_postinst_content_path(data):
654
    """Path to the post-installation content file"""
655
    if data.content_type == "datastream":
656
        return utils.join_paths(
657
            TARGET_CONTENT_DIR,
658
            get_content_name(data)
659
        )
660
661
    if data.content_type in ("rpm", "scap-security-guide"):
662
        # no path magic in case of RPM (SSG is installed as an RPM)
663
        return data.content_path
664
665
    return utils.join_paths(
666
        TARGET_CONTENT_DIR,
667
        data.content_path
668
    )
669
670
671
def get_preinst_tailoring_path(data):
672
    """Path to the pre-installation tailoring file (if any)"""
673
    if not data.tailoring_path:
674
        return ""
675
676
    return utils.join_paths(
677
        INSTALLATION_CONTENT_DIR,
678
        data.tailoring_path
679
    )
680
681
682
def get_postinst_tailoring_path(data):
683
    """Path to the post-installation tailoring file (if any)"""
684
    if not data.tailoring_path:
685
        return ""
686
687
    if data.content_type == "rpm":
688
        # no path magic in case of RPM
689
        return data.tailoring_path
690
691
    return utils.join_paths(
692
        TARGET_CONTENT_DIR,
693
        data.tailoring_path
694
    )
695
696
697
def get_payload_proxy():
698
    """Get the DBus proxy of the active payload.
699
700
    :return: a DBus proxy
701
    """
702
    payloads_proxy = PAYLOADS.get_proxy()
703
    object_path = payloads_proxy.ActivePayload
704
705
    if not object_path:
706
        raise ValueError("Active payload is not set.")
707
708
    return PAYLOADS.get_proxy(object_path)
709
710
711
def get_packages_data() -> PackagesConfigurationData:
712
    """Get the DBus data with the packages configuration.
713
714
    :return: a packages configuration
715
    """
716
    payload_proxy = get_payload_proxy()
717
718
    if payload_proxy.Type != PAYLOAD_TYPE_DNF:
719
        return PackagesConfigurationData()
720
721
    return PackagesConfigurationData.from_structure(
722
        payload_proxy.Packages
723
    )
724
725
726
def set_packages_data(data: PackagesConfigurationData):
727
    """Set the DBus data with the packages configuration.
728
729
    :param data: a packages configuration
730
    """
731
    payload_proxy = get_payload_proxy()
732
733
    if payload_proxy.Type != PAYLOAD_TYPE_DNF:
734
        log.debug("OSCAP addon: The payload doesn't support packages.")
735
        return
736
737
    return payload_proxy.SetPackages(
738
        PackagesConfigurationData.to_structure(data)
739
    )
740