Passed
Push — master ( 4a808c...338155 )
by
unknown
01:36 queued 11s
created

org_fedora_oscap.common._extract_rpm()   D

Complexity

Conditions 12

Size

Total Lines 69
Code Lines 39

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 39
dl 0
loc 69
rs 4.8
c 0
b 0
f 0
cc 12
nop 3

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like org_fedora_oscap.common._extract_rpm() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""
22
Module with various classes and functions needed by the OSCAP addon that are
23
not specific to any installation mode (tui, gui, ks).
24
25
"""
26
27
import os
28
import tempfile
29
import subprocess
30
import zipfile
31
import tarfile
32
33
import cpioarchive
34
import re
35
import logging
36
37
from collections import namedtuple
38
import gettext
39
from functools import wraps
40
from pyanaconda.core import constants
41
from pyanaconda.modules.common.constants.services import NETWORK
42
from pyanaconda.threading import threadMgr, AnacondaThread
43
from org_fedora_oscap import utils
44
from org_fedora_oscap.data_fetch import fetch_data
45
46
log = logging.getLogger("anaconda")
47
48
49
# mimick pyanaconda/core/i18n.py
50
def _(string):
51
    if string:
52
        return gettext.translation("oscap-anaconda-addon", fallback=True).gettext(string)
53
    else:
54
        return ""
55
56
57
def N_(string): return string
58
59
60
# everything else should be private
61
__all__ = ["run_oscap_remediate", "get_fix_rules_pre",
62
           "wait_and_fetch_net_data", "extract_data", "strip_content_dir",
63
           "OSCAPaddonError"]
64
65
INSTALLATION_CONTENT_DIR = "/tmp/openscap_data/"
66
TARGET_CONTENT_DIR = "/root/openscap_data/"
67
68
SSG_DIR = "/usr/share/xml/scap/ssg/content/"
69
SSG_CONTENT = "ssg-rhel7-ds.xml"
70
if constants.shortProductName != 'anaconda':
71
    if constants.shortProductName == 'fedora':
72
        SSG_CONTENT  = "ssg-fedora-ds.xml"
73
    else:
74
        SSG_CONTENT = "ssg-%s%s-ds.xml" % (constants.shortProductName,
75
                                            constants.productVersion.strip(".")[0])
76
77
RESULTS_PATH = utils.join_paths(TARGET_CONTENT_DIR,
78
                                "eval_remediate_results.xml")
79
REPORT_PATH = utils.join_paths(TARGET_CONTENT_DIR,
80
                               "eval_remediate_report.html")
81
82
PRE_INSTALL_FIX_SYSTEM_ATTR = "urn:redhat:anaconda:pre"
83
84
THREAD_FETCH_DATA = "AnaOSCAPdataFetchThread"
85
86
SUPPORTED_ARCHIVES = (".zip", ".tar", ".tar.gz", ".tar.bz2", )
87
88
# buffer size for reading and writing out data (in bytes)
89
IO_BUF_SIZE = 2 * 1024 * 1024
90
91
92
class OSCAPaddonError(Exception):
93
    """Exception class for OSCAP addon related errors."""
94
95
    pass
96
97
98
class OSCAPaddonNetworkError(OSCAPaddonError):
99
    """Exception class for OSCAP addon related network errors."""
100
101
    pass
102
103
104
class ExtractionError(OSCAPaddonError):
105
    """Exception class for the extraction errors."""
106
107
    pass
108
109
110
MESSAGE_TYPE_FATAL = 0
111
MESSAGE_TYPE_WARNING = 1
112
MESSAGE_TYPE_INFO = 2
113
114
# namedtuple for messages returned from the rules evaluation
115
#   origin -- class (inherited from RuleHandler) that generated the message
116
#   type -- one of the MESSAGE_TYPE_* constants defined above
117
#   text -- the actual message that should be displayed, logged, ...
118
RuleMessage = namedtuple("RuleMessage", ["origin", "type", "text"])
119
120
121
class SubprocessLauncher(object):
122
    def __init__(self, args):
123
        self.args = args
124
        self.stdout = ""
125
        self.stderr = ""
126
        self.messages = []
127
        self.returncode = None
128
129
    def execute(self, ** kwargs):
130
        try:
131
            proc = subprocess.Popen(self.args, stdout=subprocess.PIPE,
132
                                    stderr=subprocess.PIPE, ** kwargs)
133
        except OSError as oserr:
134
            msg = "Failed to run the oscap tool: %s" % oserr
135
            raise OSCAPaddonError(msg)
136
137
        (stdout, stderr) = proc.communicate()
138
        self.stdout = stdout.decode()
139
        self.stderr = stderr.decode()
140
        self.messages = re.findall(r'OpenSCAP Error:.*', self.stderr)
141
142
        self.returncode = proc.returncode
143
144
    def log_messages(self):
145
        for message in self.messages:
146
            log.warning("OSCAP addon: " + message)
147
148
149
def get_fix_rules_pre(profile, fpath, ds_id="", xccdf_id="", tailoring=""):
150
    """
151
    Get fix rules for the pre-installation environment for a given profile in a
152
    given datastream and checklist in a given file.
153
154
    :see: run_oscap_remediate
155
    :see: _run_oscap_gen_fix
156
    :return: fix rules for a given profile
157
    :rtype: str
158
159
    """
160
161
    return _run_oscap_gen_fix(profile, fpath, PRE_INSTALL_FIX_SYSTEM_ATTR,
162
                              ds_id=ds_id, xccdf_id=xccdf_id,
163
                              tailoring=tailoring)
164
165
166
def _run_oscap_gen_fix(profile, fpath, template, ds_id="", xccdf_id="",
167
                       tailoring=""):
168
    """
169
    Run oscap tool on a given file to get the contents of fix elements with the
170
    'system' attribute equal to a given template for a given datastream,
171
    checklist and profile.
172
173
    :see: run_oscap_remediate
174
    :param template: the value of the 'system' attribute of the fix elements
175
    :type template: str
176
    :return: oscap tool's stdout
177
    :rtype: str
178
179
    """
180
181
    if not profile:
182
        return ""
183
184
    args = ["oscap", "xccdf", "generate", "fix"]
185
    args.append("--template=%s" % template)
186
187
    # oscap uses the default profile by default
188
    if profile.lower() != "default":
189
        args.append("--profile=%s" % profile)
190
    if ds_id:
191
        args.append("--datastream-id=%s" % ds_id)
192
    if xccdf_id:
193
        args.append("--xccdf-id=%s" % xccdf_id)
194
    if tailoring:
195
        args.append("--tailoring-file=%s" % tailoring)
196
197
    args.append(fpath)
198
199
    proc = SubprocessLauncher(args)
200
    proc.execute()
201
    proc.log_messages()
202
    if proc.returncode != 0:
203
        msg = "Failed to generate fix rules with the oscap tool: %s" % proc.stderr
204
        raise OSCAPaddonError(msg)
205
206
    return proc.stdout
207
208
209
def run_oscap_remediate(profile, fpath, ds_id="", xccdf_id="", tailoring="",
210
                        chroot=""):
211
    """
212
    Run the evaluation and remediation with the oscap tool on a given file,
213
    doing the remediation as defined in a given profile defined in a given
214
    checklist that is a part of a given datastream. If requested, run in
215
    chroot.
216
217
    :param profile: id of the profile that will drive the remediation
218
    :type profile: str
219
    :param fpath: path to a file with SCAP content
220
    :type fpath: str
221
    :param ds_id: ID of the datastream that contains the checklist defining
222
                  the profile
223
    :type ds_id: str
224
    :param xccdf_id: ID of the checklist that defines the profile
225
    :type xccdf_id: str
226
    :param tailoring: path to a tailoring file
227
    :type tailoring: str
228
    :param chroot: path to the root the oscap tool should be run in
229
    :type chroot: str
230
    :return: oscap tool's stdout (summary of the rules, checks and fixes)
231
    :rtype: str
232
233
    """
234
235
    if not profile:
236
        return ""
237
238
    def do_chroot():
239
        """Helper function doing the chroot if requested."""
240
        if chroot and chroot != "/":
241
            os.chroot(chroot)
242
            os.chdir("/")
243
244
    # make sure the directory for the results exists
245
    results_dir = os.path.dirname(RESULTS_PATH)
246
    if chroot:
247
        results_dir = os.path.normpath(chroot + "/" + results_dir)
248
    utils.ensure_dir_exists(results_dir)
249
250
    args = ["oscap", "xccdf", "eval"]
251
    args.append("--remediate")
252
    args.append("--results=%s" % RESULTS_PATH)
253
    args.append("--report=%s" % REPORT_PATH)
254
255
    # oscap uses the default profile by default
256
    if profile.lower() != "default":
257
        args.append("--profile=%s" % profile)
258
    if ds_id:
259
        args.append("--datastream-id=%s" % ds_id)
260
    if xccdf_id:
261
        args.append("--xccdf-id=%s" % xccdf_id)
262
    if tailoring:
263
        args.append("--tailoring-file=%s" % tailoring)
264
265
    args.append(fpath)
266
267
    proc = SubprocessLauncher(args)
268
    proc.execute(preexec_fn=do_chroot)
269
    proc.log_messages()
270
271
    if proc.returncode not in (0, 2):
272
        # 0 -- success; 2 -- no error, but checks/remediation failed
273
        msg = "Content evaluation and remediation with the oscap tool "\
274
            "failed: %s" % proc.stderr
275
        raise OSCAPaddonError(msg)
276
277
    return proc.stdout
278
279
280
def wait_and_fetch_net_data(url, out_file, ca_certs=None):
281
    """
282
    Function that waits for network connection and starts a thread that fetches
283
    data over network.
284
285
    :see: org_fedora_oscap.data_fetch.fetch_data
286
    :return: the name of the thread running fetch_data
287
    :rtype: str
288
289
    """
290
291
    # get thread that tries to establish a network connection
292
    nm_conn_thread = threadMgr.get(constants.THREAD_WAIT_FOR_CONNECTING_NM)
293
    if nm_conn_thread:
294
        # NM still connecting, wait for it to finish
295
        nm_conn_thread.join()
296
297
    network_proxy = NETWORK.get_proxy()
298
    if not network_proxy.Connected():
299
        raise OSCAPaddonNetworkError("Network connection needed to fetch data.")
300
301
    fetch_data_thread = AnacondaThread(name=THREAD_FETCH_DATA,
302
                                       target=fetch_data,
303
                                       args=(url, out_file, ca_certs),
304
                                       fatal=False)
305
306
    # register and run the thread
307
    threadMgr.add(fetch_data_thread)
308
309
    return THREAD_FETCH_DATA
310
311
312
def extract_data(archive, out_dir, ensure_has_files=None):
313
    """
314
    Fuction that extracts the given archive to the given output directory. It
315
    tries to find out the archive type by the file name.
316
317
    :param archive: path to the archive file that should be extracted
318
    :type archive: str
319
    :param out_dir: output directory the archive should be extracted to
320
    :type out_dir: str
321
    :param ensure_has_files: relative paths to the files that must exist in the
322
                             archive
323
    :type ensure_has_files: iterable of strings or None
324
    :return: a list of files and directories extracted from the archive
325
    :rtype: [str]
326
327
    """
328
329
    # get rid of empty file paths
330
    ensure_has_files = [fpath for fpath in ensure_has_files if fpath]
331
332
    if archive.endswith(".zip"):
333
        # ZIP file
334
        try:
335
            zfile = zipfile.ZipFile(archive, "r")
336
        except zipfile.BadZipfile as err:
337
            raise ExtractionError(str(err))
338
339
        # generator for the paths of the files found in the archive (dirs end
340
        # with "/")
341
        files = set(info.filename for info in zfile.filelist
342
                    if not info.filename.endswith("/"))
343
        for fpath in ensure_has_files or ():
344
            if fpath not in files:
345
                msg = "File '%s' not found in the archive '%s'" % (fpath,
346
                                                                   archive)
347
                raise ExtractionError(msg)
348
349
        utils.ensure_dir_exists(out_dir)
350
        zfile.extractall(path=out_dir)
351
        result = [utils.join_paths(out_dir, info.filename) for info in zfile.filelist]
352
        zfile.close()
353
        return result
354
    elif archive.endswith(".tar"):
355
        # plain tarball
356
        return _extract_tarball(archive, out_dir, ensure_has_files, None)
357
    elif archive.endswith(".tar.gz"):
358
        # gzipped tarball
359
        return _extract_tarball(archive, out_dir, ensure_has_files, "gz")
360
    elif archive.endswith(".tar.bz2"):
361
        # bzipped tarball
362
        return _extract_tarball(archive, out_dir, ensure_has_files, "bz2")
363
    elif archive.endswith(".rpm"):
364
        # RPM
365
        return _extract_rpm(archive, out_dir, ensure_has_files)
366
    # elif other types of archives
367
    else:
368
        raise ExtractionError("Unsuported archive type")
369
370
371
def _extract_tarball(archive, out_dir, ensure_has_files, alg):
372
    """
373
    Extract the given TAR archive to the given output directory and make sure
374
    the given file exists in the archive.
375
376
    :see: extract_data
377
    :param alg: compression algorithm used for the tarball
378
    :type alg: str (one of "gz", "bz2") or None
379
    :return: a list of files and directories extracted from the archive
380
    :rtype: [str]
381
382
    """
383
384
    if alg and alg not in ("gz", "bz2",):
385
        raise ExtractionError("Unsupported compression algorithm")
386
387
    mode = "r"
388
    if alg:
389
        mode += ":%s" % alg
390
391
    try:
392
        tfile = tarfile.TarFile.open(archive, mode)
393
    except tarfile.TarError as err:
394
        raise ExtractionError(str(err))
395
396
    # generator for the paths of the files found in the archive
397
    files = set(member.path for member in tfile.getmembers()
398
                if member.isfile())
399
400
    for fpath in ensure_has_files or ():
401
        if fpath not in files:
402
            msg = "File '%s' not found in the archive '%s'" % (fpath, archive)
403
            raise ExtractionError(msg)
404
405
    utils.ensure_dir_exists(out_dir)
406
    tfile.extractall(path=out_dir)
407
    result = [utils.join_paths(out_dir, member.path) for member in tfile.getmembers()]
408
    tfile.close()
409
410
    return result
411
412
413
def _extract_rpm(rpm_path, root="/", ensure_has_files=None):
414
    """
415
    Extract the given RPM into the directory tree given by the root argument
416
    and make sure the given file exists in the archive.
417
418
    :param rpm_path: path to the RPM file that should be extracted
419
    :type rpm_path: str
420
    :param root: root of the directory tree the RPM should be extracted into
421
    :type root: str
422
    :param ensure_has_files: relative paths to the files that must exist in the
423
                             RPM
424
    :type ensure_has_files: iterable of strings or None
425
    :return: a list of files and directories extracted from the archive
426
    :rtype: [str]
427
428
    """
429
430
    # run rpm2cpio and process the output with the cpioarchive module
431
    temp_fd, temp_path = tempfile.mkstemp(prefix="oscap_rpm")
432
    proc = subprocess.Popen(["rpm2cpio", rpm_path], stdout=temp_fd)
433
    proc.wait()
434
    if proc.returncode != 0:
435
        msg = "Failed to convert RPM '%s' to cpio archive" % rpm_path
436
        raise ExtractionError(msg)
437
438
    os.close(temp_fd)
439
440
    try:
441
        archive = cpioarchive.CpioArchive(temp_path)
442
    except cpioarchive.CpioError as err:
443
        raise ExtractionError(str(err))
444
445
    # get entries from the archive (supports only iteration over entries)
446
    entries = set(entry for entry in archive)
447
448
    # cpio entry names (paths) start with the dot
449
    entry_names = [entry.name.lstrip(".") for entry in entries]
450
451
    for fpath in ensure_has_files or ():
452
        # RPM->cpio entries have absolute paths
453
        if fpath not in entry_names and \
454
           os.path.join("/", fpath) not in entry_names:
455
            msg = "File '%s' not found in the archive '%s'" % (fpath, rpm_path)
456
            raise ExtractionError(msg)
457
458
    try:
459
        for entry in entries:
460
            if entry.size == 0:
461
                continue
462
            dirname = os.path.dirname(entry.name.lstrip("."))
463
            out_dir = os.path.normpath(root + dirname)
464
            utils.ensure_dir_exists(out_dir)
465
466
            out_fpath = os.path.normpath(root + entry.name.lstrip("."))
467
            if os.path.exists(out_fpath):
468
                continue
469
            with open(out_fpath, "wb") as out_file:
470
                buf = entry.read(IO_BUF_SIZE)
471
                while buf:
472
                    out_file.write(buf)
473
                    buf = entry.read(IO_BUF_SIZE)
474
    except (IOError, cpioarchive.CpioError) as e:
475
        raise ExtractionError(e)
476
477
    # cleanup
478
    archive.close()
479
    os.unlink(temp_path)
480
481
    return [os.path.normpath(root + name) for name in entry_names]
482
483
484
def strip_content_dir(fpaths, phase="preinst"):
485
    """
486
    Strip content directory prefix from the file paths for either
487
    pre-installation or post-installation phase.
488
489
    :param fpaths: iterable of file paths to strip content directory prefix
490
                   from
491
    :type fpaths: iterable of strings
492
    :param phase: specifies pre-installation or post-installation phase
493
    :type phase: "preinst" or "postinst"
494
    :return: the same iterable of file paths as given with the content
495
             directory prefix stripped
496
    :rtype: same type as fpaths
497
498
    """
499
500
    if phase == "preinst":
501
        remove_prefix = lambda x: x[len(INSTALLATION_CONTENT_DIR):]
502
    else:
503
        remove_prefix = lambda x: x[len(TARGET_CONTENT_DIR):]
504
505
    return utils.keep_type_map(remove_prefix, fpaths)
506
507
508
def ssg_available(root="/"):
509
    """
510
    Tries to find the SCAP Security Guide under the given root.
511
512
    :return: True if SSG was found under the given root, False otherwise
513
514
    """
515
516
    return os.path.exists(utils.join_paths(root, SSG_DIR + SSG_CONTENT))
517
518
519
def dry_run_skip(func):
520
    """
521
    Decorator that makes sure the decorated function is noop in the dry-run
522
    mode.
523
524
    :param func: a decorated function that needs to have the first parameter an
525
                 object with the _addon_data attribute referencing the OSCAP
526
                 addon's ksdata
527
    """
528
529
    @wraps(func)
530
    def decorated(self, *args, **kwargs):
531
        if self._addon_data.dry_run:
532
            return
533
        else:
534
            return func(self, *args, **kwargs)
535
536
    return decorated
537