Completed
Push — rhel9-branch ( cf5a6a...b257de )
by Jan
15s queued 12s
created

org_fedora_oscap.common.extract_data()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 40
dl 0
loc 72
rs 4.8
c 0
b 0
f 0
cc 12
nop 3

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like org_fedora_oscap.common.extract_data() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""
22
Module with various classes and functions needed by the OSCAP addon that are
23
not specific to any installation mode (tui, gui, ks).
24
25
"""
26
27
import os
28
import tempfile
29
import subprocess
30
import zipfile
31
import tarfile
32
33
import cpioarchive
34
import re
35
import logging
36
37
from collections import namedtuple
38
import gettext
39
from functools import wraps
40
41
from dasbus.identifier import DBusServiceIdentifier
42
from pyanaconda.core import constants
43
from pyanaconda.core.dbus import DBus
44
from pyanaconda.core.constants import PAYLOAD_TYPE_DNF
45
from pyanaconda.modules.common.constants.namespaces import ADDONS_NAMESPACE
46
from pyanaconda.modules.common.constants.services import PAYLOADS
47
from pyanaconda.modules.common.structures.payload import PackagesConfigurationData
48
from pyanaconda.threading import threadMgr, AnacondaThread
49
50
from org_fedora_oscap import utils
51
52
log = logging.getLogger("anaconda")
53
54
55
# mimick pyanaconda/core/i18n.py
56
def _(string):
57
    if string:
58
        return gettext.translation("oscap-anaconda-addon", fallback=True).gettext(string)
59
    else:
60
        return ""
61
62
63
def N_(string): return string
64
65
66
# everything else should be private
67
__all__ = ["run_oscap_remediate", "get_fix_rules_pre",
68
           "extract_data", "strip_content_dir",
69
           "OSCAPaddonError", "get_payload_proxy", "get_packages_data",
70
           "set_packages_data"]
71
72
INSTALLATION_CONTENT_DIR = "/tmp/openscap_data/"
73
TARGET_CONTENT_DIR = "/root/openscap_data/"
74
75
SSG_DIR = "/usr/share/xml/scap/ssg/content/"
76
77
# Enable patches that set the content name at package-time
78
DEFAULT_SSG_CONTENT_NAME = ""
79
SSG_CONTENT = DEFAULT_SSG_CONTENT_NAME
80
if not SSG_CONTENT:
81
    if constants.shortProductName != 'anaconda':
82
        if constants.shortProductName == 'fedora':
83
            SSG_CONTENT = "ssg-fedora-ds.xml"
84
        else:
85
            SSG_CONTENT = (
86
                "ssg-{name}{version}-ds.xml"
87
                .format(
88
                    name=constants.shortProductName,
89
                    version=constants.productVersion.strip(".")[0]))
90
91
RESULTS_PATH = utils.join_paths(TARGET_CONTENT_DIR,
92
                                "eval_remediate_results.xml")
93
REPORT_PATH = utils.join_paths(TARGET_CONTENT_DIR,
94
                               "eval_remediate_report.html")
95
96
PRE_INSTALL_FIX_SYSTEM_ATTR = "urn:redhat:anaconda:pre"
97
98
THREAD_FETCH_DATA = "AnaOSCAPdataFetchThread"
99
100
SUPPORTED_ARCHIVES = (".zip", ".tar", ".tar.gz", ".tar.bz2", )
101
102
SUPPORTED_CONTENT_TYPES = (
103
    "datastream", "rpm", "archive", "scap-security-guide",
104
)
105
106
SUPPORTED_URL_PREFIXES = (
107
    "http://", "https://", "ftp://",  # LABEL:?, hdaX:?,
108
)
109
110
# buffer size for reading and writing out data (in bytes)
111
IO_BUF_SIZE = 2 * 1024 * 1024
112
113
# DBus constants
114
KDUMP = DBusServiceIdentifier(
115
    namespace=ADDONS_NAMESPACE,
116
    basename="Kdump",
117
    message_bus=DBus
118
)
119
120
121
class OSCAPaddonError(Exception):
122
    """Exception class for OSCAP addon related errors."""
123
124
    pass
125
126
127
class OSCAPaddonNetworkError(OSCAPaddonError):
128
    """Exception class for OSCAP addon related network errors."""
129
130
    pass
131
132
133
class ExtractionError(OSCAPaddonError):
134
    """Exception class for the extraction errors."""
135
136
    pass
137
138
139
MESSAGE_TYPE_FATAL = 0
140
MESSAGE_TYPE_WARNING = 1
141
MESSAGE_TYPE_INFO = 2
142
143
# namedtuple for messages returned from the rules evaluation
144
#   origin -- class (inherited from RuleHandler) that generated the message
145
#   type -- one of the MESSAGE_TYPE_* constants defined above
146
#   text -- the actual message that should be displayed, logged, ...
147
RuleMessage = namedtuple("RuleMessage", ["origin", "type", "text"])
148
149
150
class SubprocessLauncher(object):
151
    def __init__(self, args):
152
        self.args = args
153
        self.stdout = ""
154
        self.stderr = ""
155
        self.messages = []
156
        self.returncode = None
157
158
    def execute(self, ** kwargs):
159
        command_string = " ".join(self.args)
160
        log.info(
161
            "OSCAP addon: Executing subprocess: '{command_string}'"
162
            .format(command_string=command_string))
163
        try:
164
            proc = subprocess.Popen(self.args, stdout=subprocess.PIPE,
165
                                    stderr=subprocess.PIPE, ** kwargs)
166
        except OSError as oserr:
167
            msg = "Failed to run the oscap tool: %s" % oserr
168
            raise OSCAPaddonError(msg)
169
170
        (stdout, stderr) = proc.communicate()
171
        self.stdout = stdout.decode()
172
        self.stderr = stderr.decode(errors="replace")
173
        self.messages = re.findall(r'OpenSCAP Error:.*', self.stderr)
174
        self.messages = self.messages + re.findall(r'E: oscap:.*', self.stderr)
175
176
        self.returncode = proc.returncode
177
178
    def log_messages(self):
179
        for message in self.messages:
180
            log.warning("OSCAP addon: " + message)
181
182
183
def get_fix_rules_pre(profile, fpath, ds_id="", xccdf_id="", tailoring=""):
184
    """
185
    Get fix rules for the pre-installation environment for a given profile in a
186
    given datastream and checklist in a given file.
187
188
    :see: run_oscap_remediate
189
    :see: _run_oscap_gen_fix
190
    :return: fix rules for a given profile
191
    :rtype: str
192
193
    """
194
195
    return _run_oscap_gen_fix(profile, fpath, PRE_INSTALL_FIX_SYSTEM_ATTR,
196
                              ds_id=ds_id, xccdf_id=xccdf_id,
197
                              tailoring=tailoring)
198
199
200
def _run_oscap_gen_fix(profile, fpath, template, ds_id="", xccdf_id="",
201
                       tailoring=""):
202
    """
203
    Run oscap tool on a given file to get the contents of fix elements with the
204
    'system' attribute equal to a given template for a given datastream,
205
    checklist and profile.
206
207
    :see: run_oscap_remediate
208
    :param template: the value of the 'system' attribute of the fix elements
209
    :type template: str
210
    :return: oscap tool's stdout
211
    :rtype: str
212
213
    """
214
215
    if not profile:
216
        return ""
217
218
    args = ["oscap", "xccdf", "generate", "fix"]
219
    args.append("--template=%s" % template)
220
221
    # oscap uses the default profile by default
222
    if profile.lower() != "default":
223
        args.append("--profile=%s" % profile)
224
    if ds_id:
225
        args.append("--datastream-id=%s" % ds_id)
226
    if xccdf_id:
227
        args.append("--xccdf-id=%s" % xccdf_id)
228
    if tailoring:
229
        args.append("--tailoring-file=%s" % tailoring)
230
231
    args.append(fpath)
232
233
    proc = SubprocessLauncher(args)
234
    proc.execute()
235
    proc.log_messages()
236
    if proc.returncode != 0:
237
        msg = "Failed to generate fix rules with the oscap tool: %s" % proc.stderr
238
        raise OSCAPaddonError(msg)
239
240
    return proc.stdout
241
242
243
def run_oscap_remediate(profile, fpath, ds_id="", xccdf_id="", tailoring="",
244
                        chroot=""):
245
    """
246
    Run the evaluation and remediation with the oscap tool on a given file,
247
    doing the remediation as defined in a given profile defined in a given
248
    checklist that is a part of a given datastream. If requested, run in
249
    chroot.
250
251
    :param profile: id of the profile that will drive the remediation
252
    :type profile: str
253
    :param fpath: path to a file with SCAP content
254
    :type fpath: str
255
    :param ds_id: ID of the datastream that contains the checklist defining
256
                  the profile
257
    :type ds_id: str
258
    :param xccdf_id: ID of the checklist that defines the profile
259
    :type xccdf_id: str
260
    :param tailoring: path to a tailoring file
261
    :type tailoring: str
262
    :param chroot: path to the root the oscap tool should be run in
263
    :type chroot: str
264
    :return: oscap tool's stdout (summary of the rules, checks and fixes)
265
    :rtype: str
266
267
    """
268
269
    if not profile:
270
        return ""
271
272
    def do_chroot():
273
        """Helper function doing the chroot if requested."""
274
        if chroot and chroot != "/":
275
            os.chroot(chroot)
276
            os.chdir("/")
277
278
    # make sure the directory for the results exists
279
    results_dir = os.path.dirname(RESULTS_PATH)
280
    if chroot:
281
        results_dir = os.path.normpath(chroot + "/" + results_dir)
282
    utils.ensure_dir_exists(results_dir)
283
284
    args = ["oscap", "xccdf", "eval"]
285
    args.append("--remediate")
286
    args.append("--results=%s" % RESULTS_PATH)
287
    args.append("--report=%s" % REPORT_PATH)
288
289
    # oscap uses the default profile by default
290
    if profile.lower() != "default":
291
        args.append("--profile=%s" % profile)
292
    if ds_id:
293
        args.append("--datastream-id=%s" % ds_id)
294
    if xccdf_id:
295
        args.append("--xccdf-id=%s" % xccdf_id)
296
    if tailoring:
297
        args.append("--tailoring-file=%s" % tailoring)
298
299
    args.append(fpath)
300
301
    proc = SubprocessLauncher(args)
302
    proc.execute(preexec_fn=do_chroot)
303
    proc.log_messages()
304
305
    if proc.returncode not in (0, 2):
306
        # 0 -- success; 2 -- no error, but checks/remediation failed
307
        msg = "Content evaluation and remediation with the oscap tool "\
308
            "failed: %s" % proc.stderr
309
        raise OSCAPaddonError(msg)
310
311
    return proc.stdout
312
313
314
def extract_data(archive, out_dir, ensure_has_files=None):
315
    """
316
    Fuction that extracts the given archive to the given output directory. It
317
    tries to find out the archive type by the file name.
318
319
    :param archive: path to the archive file that should be extracted
320
    :type archive: str
321
    :param out_dir: output directory the archive should be extracted to
322
    :type out_dir: str
323
    :param ensure_has_files: relative paths to the files that must exist in the
324
                             archive
325
    :type ensure_has_files: iterable of strings or None
326
    :return: a list of files and directories extracted from the archive
327
    :rtype: [str]
328
329
    """
330
331
    if not ensure_has_files:
332
        ensure_has_files = []
333
334
    # get rid of empty file paths
335
    if not ensure_has_files:
336
        ensure_has_files = []
337
    else:
338
        ensure_has_files = [fpath for fpath in ensure_has_files if fpath]
339
340
    msg = "OSCAP addon: Extracting {archive}".format(archive=archive)
341
    if ensure_has_files:
342
        msg += ", expecting to find {files} there.".format(files=tuple(ensure_has_files))
343
    log.info(msg)
344
345
    result = []
346
    if archive.endswith(".zip"):
347
        # ZIP file
348
        try:
349
            zfile = zipfile.ZipFile(archive, "r")
350
        except Exception as exc:
351
            msg = _(f"Error extracting archive as a zipfile: {exc}")
352
            raise ExtractionError(msg)
353
354
        # generator for the paths of the files found in the archive (dirs end
355
        # with "/")
356
        files = set(info.filename for info in zfile.filelist
357
                    if not info.filename.endswith("/"))
358
        for fpath in ensure_has_files or ():
359
            if fpath not in files:
360
                msg = "File '%s' not found in the archive '%s'" % (fpath,
361
                                                                   archive)
362
                raise ExtractionError(msg)
363
364
        utils.ensure_dir_exists(out_dir)
365
        zfile.extractall(path=out_dir)
366
        result = [utils.join_paths(out_dir, info.filename) for info in zfile.filelist]
367
        zfile.close()
368
    elif archive.endswith(".tar"):
369
        # plain tarball
370
        result = _extract_tarball(archive, out_dir, ensure_has_files, None)
371
    elif archive.endswith(".tar.gz"):
372
        # gzipped tarball
373
        result = _extract_tarball(archive, out_dir, ensure_has_files, "gz")
374
    elif archive.endswith(".tar.bz2"):
375
        # bzipped tarball
376
        result = _extract_tarball(archive, out_dir, ensure_has_files, "bz2")
377
    elif archive.endswith(".rpm"):
378
        # RPM
379
        result = _extract_rpm(archive, out_dir, ensure_has_files)
380
    # elif other types of archives
381
    else:
382
        raise ExtractionError("Unsuported archive type")
383
    log.info("OSCAP addon: Extracted {files} from the supplied content"
384
             .format(files=result))
385
    return result
386
387
388
def _extract_tarball(archive, out_dir, ensure_has_files, alg):
389
    """
390
    Extract the given TAR archive to the given output directory and make sure
391
    the given file exists in the archive.
392
393
    :see: extract_data
394
    :param alg: compression algorithm used for the tarball
395
    :type alg: str (one of "gz", "bz2") or None
396
    :return: a list of files and directories extracted from the archive
397
    :rtype: [str]
398
399
    """
400
401
    if alg and alg not in ("gz", "bz2",):
402
        raise ExtractionError("Unsupported compression algorithm")
403
404
    mode = "r"
405
    if alg:
406
        mode += ":%s" % alg
407
408
    try:
409
        tfile = tarfile.TarFile.open(archive, mode)
410
    except tarfile.TarError as err:
411
        raise ExtractionError(str(err))
412
413
    # generator for the paths of the files found in the archive
414
    files = set(member.path for member in tfile.getmembers()
415
                if member.isfile())
416
417
    for fpath in ensure_has_files or ():
418
        if fpath not in files:
419
            msg = "File '%s' not found in the archive '%s'" % (fpath, archive)
420
            raise ExtractionError(msg)
421
422
    utils.ensure_dir_exists(out_dir)
423
    tfile.extractall(path=out_dir)
424
    result = [utils.join_paths(out_dir, member.path) for member in tfile.getmembers()]
425
    tfile.close()
426
427
    return result
428
429
430
def _extract_rpm(rpm_path, root="/", ensure_has_files=None):
431
    """
432
    Extract the given RPM into the directory tree given by the root argument
433
    and make sure the given file exists in the archive.
434
435
    :param rpm_path: path to the RPM file that should be extracted
436
    :type rpm_path: str
437
    :param root: root of the directory tree the RPM should be extracted into
438
    :type root: str
439
    :param ensure_has_files: relative paths to the files that must exist in the
440
                             RPM
441
    :type ensure_has_files: iterable of strings or None
442
    :return: a list of files and directories extracted from the archive
443
    :rtype: [str]
444
445
    """
446
447
    # run rpm2cpio and process the output with the cpioarchive module
448
    temp_fd, temp_path = tempfile.mkstemp(prefix="oscap_rpm")
449
    proc = subprocess.Popen(["rpm2cpio", rpm_path], stdout=temp_fd)
450
    proc.wait()
451
    if proc.returncode != 0:
452
        msg = "Failed to convert RPM '%s' to cpio archive" % rpm_path
453
        raise ExtractionError(msg)
454
455
    os.close(temp_fd)
456
457
    try:
458
        archive = cpioarchive.CpioArchive(temp_path)
459
    except cpioarchive.CpioError as err:
460
        raise ExtractionError(str(err))
461
462
    # get entries from the archive (supports only iteration over entries)
463
    entries = set(entry for entry in archive)
464
465
    # cpio entry names (paths) start with the dot
466
    entry_names = [entry.name.lstrip(".") for entry in entries]
467
468
    for fpath in ensure_has_files or ():
469
        # RPM->cpio entries have absolute paths
470
        if fpath not in entry_names and \
471
           os.path.join("/", fpath) not in entry_names:
472
            msg = "File '%s' not found in the archive '%s'" % (fpath, rpm_path)
473
            raise ExtractionError(msg)
474
475
    try:
476
        for entry in entries:
477
            if entry.size == 0:
478
                continue
479
            dirname = os.path.dirname(entry.name.lstrip("."))
480
            out_dir = os.path.normpath(root + dirname)
481
            utils.ensure_dir_exists(out_dir)
482
483
            out_fpath = os.path.normpath(root + entry.name.lstrip("."))
484
            if os.path.exists(out_fpath):
485
                continue
486
            with open(out_fpath, "wb") as out_file:
487
                buf = entry.read(IO_BUF_SIZE)
488
                while buf:
489
                    out_file.write(buf)
490
                    buf = entry.read(IO_BUF_SIZE)
491
    except (IOError, cpioarchive.CpioError) as e:
492
        raise ExtractionError(e)
493
494
    # cleanup
495
    archive.close()
496
    os.unlink(temp_path)
497
498
    return [os.path.normpath(root + name) for name in entry_names]
499
500
501
def strip_content_dir(fpaths, phase="preinst"):
502
    """
503
    Strip content directory prefix from the file paths for either
504
    pre-installation or post-installation phase.
505
506
    :param fpaths: iterable of file paths to strip content directory prefix
507
                   from
508
    :type fpaths: iterable of strings
509
    :param phase: specifies pre-installation or post-installation phase
510
    :type phase: "preinst" or "postinst"
511
    :return: the same iterable of file paths as given with the content
512
             directory prefix stripped
513
    :rtype: same type as fpaths
514
515
    """
516
517
    if phase == "preinst":
518
        remove_prefix = lambda x: x[len(INSTALLATION_CONTENT_DIR):]
519
    else:
520
        remove_prefix = lambda x: x[len(TARGET_CONTENT_DIR):]
521
522
    return utils.keep_type_map(remove_prefix, fpaths)
523
524
525
def get_ssg_path(root="/"):
526
    return utils.join_paths(root, SSG_DIR + SSG_CONTENT)
527
528
529
def ssg_available(root="/"):
530
    """
531
    Tries to find the SCAP Security Guide under the given root.
532
533
    :return: True if SSG was found under the given root, False otherwise
534
535
    """
536
537
    return os.path.exists(get_ssg_path(root))
538
539
540
def get_content_name(data):
541
    if data.content_type == "scap-security-guide":
542
        raise ValueError("Using scap-security-guide, no single content file")
543
544
    rest = "/anonymous_content"
545
    for prefix in SUPPORTED_URL_PREFIXES:
546
        if data.content_url.startswith(prefix):
547
            rest = data.content_url[len(prefix):]
548
            break
549
550
    parts = rest.rsplit("/", 1)
551
    if len(parts) != 2:
552
        raise ValueError("Unsupported url '%s'" % data.content_url)
553
554
    return parts[1]
555
556
557
def get_raw_preinst_content_path(data):
558
    """Path to the raw (unextracted, ...) pre-installation content file"""
559
    if data.content_type == "scap-security-guide":
560
        log.debug("Using scap-security-guide, no single content file")
561
        return None
562
563
    content_name = get_content_name(data)
564
    return utils.join_paths(INSTALLATION_CONTENT_DIR, content_name)
565
566
567
def get_preinst_content_path(data):
568
    """Path to the pre-installation content file"""
569
    if data.content_type == "scap-security-guide":
570
        # SSG is not copied to the standard place
571
        return data.content_path
572
573
    if data.content_type == "datastream":
574
        return utils.join_paths(
575
            INSTALLATION_CONTENT_DIR,
576
            get_content_name(data)
577
        )
578
579
    return utils.join_paths(
580
        INSTALLATION_CONTENT_DIR,
581
        data.content_path
582
    )
583
584
585
def get_postinst_content_path(data):
586
    """Path to the post-installation content file"""
587
    if data.content_type == "datastream":
588
        return utils.join_paths(
589
            TARGET_CONTENT_DIR,
590
            get_content_name(data)
591
        )
592
593
    if data.content_type in ("rpm", "scap-security-guide"):
594
        # no path magic in case of RPM (SSG is installed as an RPM)
595
        return data.content_path
596
597
    return utils.join_paths(
598
        TARGET_CONTENT_DIR,
599
        data.content_path
600
    )
601
602
603
def get_preinst_tailoring_path(data):
604
    """Path to the pre-installation tailoring file (if any)"""
605
    if not data.tailoring_path:
606
        return ""
607
608
    return utils.join_paths(
609
        INSTALLATION_CONTENT_DIR,
610
        data.tailoring_path
611
    )
612
613
614
def get_postinst_tailoring_path(data):
615
    """Path to the post-installation tailoring file (if any)"""
616
    if not data.tailoring_path:
617
        return ""
618
619
    if data.content_type == "rpm":
620
        # no path magic in case of RPM
621
        return data.tailoring_path
622
623
    return utils.join_paths(
624
        TARGET_CONTENT_DIR,
625
        data.tailoring_path
626
    )
627
628
629
def get_payload_proxy():
630
    """Get the DBus proxy of the active payload.
631
632
    :return: a DBus proxy
633
    """
634
    payloads_proxy = PAYLOADS.get_proxy()
635
    object_path = payloads_proxy.ActivePayload
636
637
    if not object_path:
638
        raise ValueError("Active payload is not set.")
639
640
    return PAYLOADS.get_proxy(object_path)
641
642
643
def get_packages_data() -> PackagesConfigurationData:
644
    """Get the DBus data with the packages configuration.
645
646
    :return: a packages configuration
647
    """
648
    payload_proxy = get_payload_proxy()
649
650
    if payload_proxy.Type != PAYLOAD_TYPE_DNF:
651
        return PackagesConfigurationData()
652
653
    return PackagesConfigurationData.from_structure(
654
        payload_proxy.Packages
655
    )
656
657
658
def set_packages_data(data: PackagesConfigurationData):
659
    """Set the DBus data with the packages configuration.
660
661
    :param data: a packages configuration
662
    """
663
    payload_proxy = get_payload_proxy()
664
665
    if payload_proxy.Type != PAYLOAD_TYPE_DNF:
666
        log.debug("The payload doesn't support packages.")
667
        return
668
669
    return payload_proxy.SetPackages(
670
        PackagesConfigurationData.to_structure(data)
671
    )
672