Completed
Push — rhel8-branch ( 38296b...b8bdaf )
by Jan
21s queued 14s
created

org_fedora_oscap.common   F

Complexity

Total Complexity 64

Size/Duplication

Total Lines 530
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 64
eloc 255
dl 0
loc 530
rs 3.28
c 0
b 0
f 0

12 Functions

Rating   Name   Duplication   Size   Complexity  
A _() 0 5 2
A N_() 0 1 1
A dry_run_skip() 0 18 2
A get_ssg_path() 0 2 1
B _extract_tarball() 0 40 7
C run_oscap_remediate() 0 69 10
A strip_content_dir() 0 22 4
A get_fix_rules_pre() 0 15 1
B _run_oscap_gen_fix() 0 41 7
C extract_data() 0 69 11
A ssg_available() 0 9 1
D _extract_rpm() 0 69 12

3 Methods

Rating   Name   Duplication   Size   Complexity  
A SubprocessLauncher.execute() 0 19 2
A SubprocessLauncher.__init__() 0 6 1
A SubprocessLauncher.log_messages() 0 3 2

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.common often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""
22
Module with various classes and functions needed by the OSCAP addon that are
23
not specific to any installation mode (tui, gui, ks).
24
25
"""
26
27
import os
28
import tempfile
29
import subprocess
30
import zipfile
31
import tarfile
32
33
import cpioarchive
34
import re
35
import logging
36
37
from collections import namedtuple
38
import gettext
39
from functools import wraps
40
from pyanaconda.core import constants
41
from org_fedora_oscap import utils
42
43
log = logging.getLogger("anaconda")
44
45
46
# mimick pyanaconda/core/i18n.py
47
def _(string):
48
    if string:
49
        return gettext.translation("oscap-anaconda-addon", fallback=True).gettext(string)
50
    else:
51
        return ""
52
53
54
def N_(string): return string
55
56
57
# everything else should be private
58
__all__ = ["run_oscap_remediate", "get_fix_rules_pre",
59
           "extract_data", "strip_content_dir",
60
           "OSCAPaddonError"]
61
62
INSTALLATION_CONTENT_DIR = "/tmp/openscap_data/"
63
TARGET_CONTENT_DIR = "/root/openscap_data/"
64
65
SSG_DIR = "/usr/share/xml/scap/ssg/content/"
66
67
# Enable patches that set the content name at package-time
68
DEFAULT_SSG_CONTENT_NAME = ""
69
SSG_CONTENT = DEFAULT_SSG_CONTENT_NAME
70
if not SSG_CONTENT:
71
    if constants.shortProductName != 'anaconda':
72
        if constants.shortProductName == 'fedora':
73
            SSG_CONTENT = "ssg-fedora-ds.xml"
74
        else:
75
            SSG_CONTENT = (
76
                "ssg-{name}{version}-ds.xml"
77
                .format(
78
                    name=constants.shortProductName,
79
                    version=constants.productVersion.strip(".")[0]))
80
81
RESULTS_PATH = utils.join_paths(TARGET_CONTENT_DIR,
82
                                "eval_remediate_results.xml")
83
REPORT_PATH = utils.join_paths(TARGET_CONTENT_DIR,
84
                               "eval_remediate_report.html")
85
86
PRE_INSTALL_FIX_SYSTEM_ATTR = "urn:redhat:anaconda:pre"
87
88
THREAD_FETCH_DATA = "AnaOSCAPdataFetchThread"
89
90
SUPPORTED_ARCHIVES = (".zip", ".tar", ".tar.gz", ".tar.bz2", )
91
92
# buffer size for reading and writing out data (in bytes)
93
IO_BUF_SIZE = 2 * 1024 * 1024
94
95
96
class OSCAPaddonError(Exception):
97
    """Exception class for OSCAP addon related errors."""
98
99
    pass
100
101
102
class OSCAPaddonNetworkError(OSCAPaddonError):
103
    """Exception class for OSCAP addon related network errors."""
104
105
    pass
106
107
108
class ExtractionError(OSCAPaddonError):
109
    """Exception class for the extraction errors."""
110
111
    pass
112
113
114
MESSAGE_TYPE_FATAL = 0
115
MESSAGE_TYPE_WARNING = 1
116
MESSAGE_TYPE_INFO = 2
117
118
# namedtuple for messages returned from the rules evaluation
119
#   origin -- class (inherited from RuleHandler) that generated the message
120
#   type -- one of the MESSAGE_TYPE_* constants defined above
121
#   text -- the actual message that should be displayed, logged, ...
122
RuleMessage = namedtuple("RuleMessage", ["origin", "type", "text"])
123
124
125
class SubprocessLauncher(object):
126
    def __init__(self, args):
127
        self.args = args
128
        self.stdout = ""
129
        self.stderr = ""
130
        self.messages = []
131
        self.returncode = None
132
133
    def execute(self, ** kwargs):
134
        command_string = " ".join(self.args)
135
        log.info(
136
            "OSCAP addon: Executing subprocess: '{command_string}'"
137
            .format(command_string=command_string))
138
        try:
139
            proc = subprocess.Popen(self.args, stdout=subprocess.PIPE,
140
                                    stderr=subprocess.PIPE, ** kwargs)
141
        except OSError as oserr:
142
            msg = "Failed to run the oscap tool: %s" % oserr
143
            raise OSCAPaddonError(msg)
144
145
        (stdout, stderr) = proc.communicate()
146
        self.stdout = stdout.decode()
147
        self.stderr = stderr.decode(errors="replace")
148
        self.messages = re.findall(r'OpenSCAP Error:.*', self.stderr)
149
        self.messages = self.messages + re.findall(r'E: oscap:.*', self.stderr)
150
151
        self.returncode = proc.returncode
152
153
    def log_messages(self):
154
        for message in self.messages:
155
            log.warning("OSCAP addon: " + message)
156
157
158
def get_fix_rules_pre(profile, fpath, ds_id="", xccdf_id="", tailoring=""):
159
    """
160
    Get fix rules for the pre-installation environment for a given profile in a
161
    given datastream and checklist in a given file.
162
163
    :see: run_oscap_remediate
164
    :see: _run_oscap_gen_fix
165
    :return: fix rules for a given profile
166
    :rtype: str
167
168
    """
169
170
    return _run_oscap_gen_fix(profile, fpath, PRE_INSTALL_FIX_SYSTEM_ATTR,
171
                              ds_id=ds_id, xccdf_id=xccdf_id,
172
                              tailoring=tailoring)
173
174
175
def _run_oscap_gen_fix(profile, fpath, template, ds_id="", xccdf_id="",
176
                       tailoring=""):
177
    """
178
    Run oscap tool on a given file to get the contents of fix elements with the
179
    'system' attribute equal to a given template for a given datastream,
180
    checklist and profile.
181
182
    :see: run_oscap_remediate
183
    :param template: the value of the 'system' attribute of the fix elements
184
    :type template: str
185
    :return: oscap tool's stdout
186
    :rtype: str
187
188
    """
189
190
    if not profile:
191
        return ""
192
193
    args = ["oscap", "xccdf", "generate", "fix"]
194
    args.append("--template=%s" % template)
195
196
    # oscap uses the default profile by default
197
    if profile.lower() != "default":
198
        args.append("--profile=%s" % profile)
199
    if ds_id:
200
        args.append("--datastream-id=%s" % ds_id)
201
    if xccdf_id:
202
        args.append("--xccdf-id=%s" % xccdf_id)
203
    if tailoring:
204
        args.append("--tailoring-file=%s" % tailoring)
205
206
    args.append(fpath)
207
208
    proc = SubprocessLauncher(args)
209
    proc.execute()
210
    proc.log_messages()
211
    if proc.returncode != 0:
212
        msg = "Failed to generate fix rules with the oscap tool: %s" % proc.stderr
213
        raise OSCAPaddonError(msg)
214
215
    return proc.stdout
216
217
218
def run_oscap_remediate(profile, fpath, ds_id="", xccdf_id="", tailoring="",
219
                        chroot=""):
220
    """
221
    Run the evaluation and remediation with the oscap tool on a given file,
222
    doing the remediation as defined in a given profile defined in a given
223
    checklist that is a part of a given datastream. If requested, run in
224
    chroot.
225
226
    :param profile: id of the profile that will drive the remediation
227
    :type profile: str
228
    :param fpath: path to a file with SCAP content
229
    :type fpath: str
230
    :param ds_id: ID of the datastream that contains the checklist defining
231
                  the profile
232
    :type ds_id: str
233
    :param xccdf_id: ID of the checklist that defines the profile
234
    :type xccdf_id: str
235
    :param tailoring: path to a tailoring file
236
    :type tailoring: str
237
    :param chroot: path to the root the oscap tool should be run in
238
    :type chroot: str
239
    :return: oscap tool's stdout (summary of the rules, checks and fixes)
240
    :rtype: str
241
242
    """
243
244
    if not profile:
245
        return ""
246
247
    def do_chroot():
248
        """Helper function doing the chroot if requested."""
249
        if chroot and chroot != "/":
250
            os.chroot(chroot)
251
            os.chdir("/")
252
253
    # make sure the directory for the results exists
254
    results_dir = os.path.dirname(RESULTS_PATH)
255
    if chroot:
256
        results_dir = os.path.normpath(chroot + "/" + results_dir)
257
    utils.ensure_dir_exists(results_dir)
258
259
    args = ["oscap", "xccdf", "eval"]
260
    args.append("--remediate")
261
    args.append("--results=%s" % RESULTS_PATH)
262
    args.append("--report=%s" % REPORT_PATH)
263
264
    # oscap uses the default profile by default
265
    if profile.lower() != "default":
266
        args.append("--profile=%s" % profile)
267
    if ds_id:
268
        args.append("--datastream-id=%s" % ds_id)
269
    if xccdf_id:
270
        args.append("--xccdf-id=%s" % xccdf_id)
271
    if tailoring:
272
        args.append("--tailoring-file=%s" % tailoring)
273
274
    args.append(fpath)
275
276
    proc = SubprocessLauncher(args)
277
    proc.execute(preexec_fn=do_chroot)
278
    proc.log_messages()
279
280
    if proc.returncode not in (0, 2):
281
        # 0 -- success; 2 -- no error, but checks/remediation failed
282
        msg = "Content evaluation and remediation with the oscap tool "\
283
            "failed: %s" % proc.stderr
284
        raise OSCAPaddonError(msg)
285
286
    return proc.stdout
287
288
289
def extract_data(archive, out_dir, ensure_has_files=None):
290
    """
291
    Fuction that extracts the given archive to the given output directory. It
292
    tries to find out the archive type by the file name.
293
294
    :param archive: path to the archive file that should be extracted
295
    :type archive: str
296
    :param out_dir: output directory the archive should be extracted to
297
    :type out_dir: str
298
    :param ensure_has_files: relative paths to the files that must exist in the
299
                             archive
300
    :type ensure_has_files: iterable of strings or None
301
    :return: a list of files and directories extracted from the archive
302
    :rtype: [str]
303
304
    """
305
306
    if not ensure_has_files:
307
        ensure_has_files = []
308
309
    # get rid of empty file paths
310
    ensure_has_files = [fpath for fpath in ensure_has_files if fpath]
311
312
    msg = "OSCAP addon: Extracting {archive}".format(archive=archive)
313
    if ensure_has_files:
314
        msg += ", expecting to find {files} there.".format(files=tuple(ensure_has_files))
315
    log.info(msg)
316
317
    result = []
318
    if archive.endswith(".zip"):
319
        # ZIP file
320
        try:
321
            zfile = zipfile.ZipFile(archive, "r")
322
        except Exception as exc:
323
            msg = _(f"Error extracting archive as a zipfile: {exc}")
324
            raise ExtractionError(msg)
325
326
        # generator for the paths of the files found in the archive (dirs end
327
        # with "/")
328
        files = set(info.filename for info in zfile.filelist
329
                    if not info.filename.endswith("/"))
330
        for fpath in ensure_has_files or ():
331
            if fpath not in files:
332
                msg = "File '%s' not found in the archive '%s'" % (fpath,
333
                                                                   archive)
334
                raise ExtractionError(msg)
335
336
        utils.ensure_dir_exists(out_dir)
337
        zfile.extractall(path=out_dir)
338
        result = [utils.join_paths(out_dir, info.filename) for info in zfile.filelist]
339
        zfile.close()
340
    elif archive.endswith(".tar"):
341
        # plain tarball
342
        result = _extract_tarball(archive, out_dir, ensure_has_files, None)
343
    elif archive.endswith(".tar.gz"):
344
        # gzipped tarball
345
        result = _extract_tarball(archive, out_dir, ensure_has_files, "gz")
346
    elif archive.endswith(".tar.bz2"):
347
        # bzipped tarball
348
        result = _extract_tarball(archive, out_dir, ensure_has_files, "bz2")
349
    elif archive.endswith(".rpm"):
350
        # RPM
351
        result = _extract_rpm(archive, out_dir, ensure_has_files)
352
    # elif other types of archives
353
    else:
354
        raise ExtractionError("Unsuported archive type")
355
    log.info("OSCAP addon: Extracted {files} from the supplied content"
356
             .format(files=result))
357
    return result
358
359
360
def _extract_tarball(archive, out_dir, ensure_has_files, alg):
361
    """
362
    Extract the given TAR archive to the given output directory and make sure
363
    the given file exists in the archive.
364
365
    :see: extract_data
366
    :param alg: compression algorithm used for the tarball
367
    :type alg: str (one of "gz", "bz2") or None
368
    :return: a list of files and directories extracted from the archive
369
    :rtype: [str]
370
371
    """
372
373
    if alg and alg not in ("gz", "bz2",):
374
        raise ExtractionError("Unsupported compression algorithm")
375
376
    mode = "r"
377
    if alg:
378
        mode += ":%s" % alg
379
380
    try:
381
        tfile = tarfile.TarFile.open(archive, mode)
382
    except tarfile.TarError as err:
383
        raise ExtractionError(str(err))
384
385
    # generator for the paths of the files found in the archive
386
    files = set(member.path for member in tfile.getmembers()
387
                if member.isfile())
388
389
    for fpath in ensure_has_files or ():
390
        if fpath not in files:
391
            msg = "File '%s' not found in the archive '%s'" % (fpath, archive)
392
            raise ExtractionError(msg)
393
394
    utils.ensure_dir_exists(out_dir)
395
    tfile.extractall(path=out_dir)
396
    result = [utils.join_paths(out_dir, member.path) for member in tfile.getmembers()]
397
    tfile.close()
398
399
    return result
400
401
402
def _extract_rpm(rpm_path, root="/", ensure_has_files=None):
403
    """
404
    Extract the given RPM into the directory tree given by the root argument
405
    and make sure the given file exists in the archive.
406
407
    :param rpm_path: path to the RPM file that should be extracted
408
    :type rpm_path: str
409
    :param root: root of the directory tree the RPM should be extracted into
410
    :type root: str
411
    :param ensure_has_files: relative paths to the files that must exist in the
412
                             RPM
413
    :type ensure_has_files: iterable of strings or None
414
    :return: a list of files and directories extracted from the archive
415
    :rtype: [str]
416
417
    """
418
419
    # run rpm2cpio and process the output with the cpioarchive module
420
    temp_fd, temp_path = tempfile.mkstemp(prefix="oscap_rpm")
421
    proc = subprocess.Popen(["rpm2cpio", rpm_path], stdout=temp_fd)
422
    proc.wait()
423
    if proc.returncode != 0:
424
        msg = "Failed to convert RPM '%s' to cpio archive" % rpm_path
425
        raise ExtractionError(msg)
426
427
    os.close(temp_fd)
428
429
    try:
430
        archive = cpioarchive.CpioArchive(temp_path)
431
    except cpioarchive.CpioError as err:
432
        raise ExtractionError(str(err))
433
434
    # get entries from the archive (supports only iteration over entries)
435
    entries = set(entry for entry in archive)
436
437
    # cpio entry names (paths) start with the dot
438
    entry_names = [entry.name.lstrip(".") for entry in entries]
439
440
    for fpath in ensure_has_files or ():
441
        # RPM->cpio entries have absolute paths
442
        if fpath not in entry_names and \
443
           os.path.join("/", fpath) not in entry_names:
444
            msg = "File '%s' not found in the archive '%s'" % (fpath, rpm_path)
445
            raise ExtractionError(msg)
446
447
    try:
448
        for entry in entries:
449
            if entry.size == 0:
450
                continue
451
            dirname = os.path.dirname(entry.name.lstrip("."))
452
            out_dir = os.path.normpath(root + dirname)
453
            utils.ensure_dir_exists(out_dir)
454
455
            out_fpath = os.path.normpath(root + entry.name.lstrip("."))
456
            if os.path.exists(out_fpath):
457
                continue
458
            with open(out_fpath, "wb") as out_file:
459
                buf = entry.read(IO_BUF_SIZE)
460
                while buf:
461
                    out_file.write(buf)
462
                    buf = entry.read(IO_BUF_SIZE)
463
    except (IOError, cpioarchive.CpioError) as e:
464
        raise ExtractionError(e)
465
466
    # cleanup
467
    archive.close()
468
    os.unlink(temp_path)
469
470
    return [os.path.normpath(root + name) for name in entry_names]
471
472
473
def strip_content_dir(fpaths, phase="preinst"):
474
    """
475
    Strip content directory prefix from the file paths for either
476
    pre-installation or post-installation phase.
477
478
    :param fpaths: iterable of file paths to strip content directory prefix
479
                   from
480
    :type fpaths: iterable of strings
481
    :param phase: specifies pre-installation or post-installation phase
482
    :type phase: "preinst" or "postinst"
483
    :return: the same iterable of file paths as given with the content
484
             directory prefix stripped
485
    :rtype: same type as fpaths
486
487
    """
488
489
    if phase == "preinst":
490
        remove_prefix = lambda x: x[len(INSTALLATION_CONTENT_DIR):]
491
    else:
492
        remove_prefix = lambda x: x[len(TARGET_CONTENT_DIR):]
493
494
    return utils.keep_type_map(remove_prefix, fpaths)
495
496
497
def get_ssg_path(root="/"):
498
    return utils.join_paths(root, SSG_DIR + SSG_CONTENT)
499
500
501
def ssg_available(root="/"):
502
    """
503
    Tries to find the SCAP Security Guide under the given root.
504
505
    :return: True if SSG was found under the given root, False otherwise
506
507
    """
508
509
    return os.path.exists(get_ssg_path(root))
510
511
512
def dry_run_skip(func):
513
    """
514
    Decorator that makes sure the decorated function is noop in the dry-run
515
    mode.
516
517
    :param func: a decorated function that needs to have the first parameter an
518
                 object with the _addon_data attribute referencing the OSCAP
519
                 addon's ksdata
520
    """
521
522
    @wraps(func)
523
    def decorated(self, *args, **kwargs):
524
        if self._addon_data.dry_run:
525
            return
526
        else:
527
            return func(self, *args, **kwargs)
528
529
    return decorated
530