Passed
Pull Request — rhel8-branch (#190)
by Matěj
01:04
created

schedule_firstboot_remediation()   A

Complexity

Conditions 2

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 10
nop 6
dl 0
loc 14
rs 9.9
c 0
b 0
f 0
1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""
22
Module with various classes and functions needed by the OSCAP addon that are
23
not specific to any installation mode (tui, gui, ks).
24
25
"""
26
27
import os
28
import tempfile
29
import subprocess
30
import zipfile
31
import tarfile
32
import textwrap
33
34
import cpioarchive
35
import re
36
import logging
37
38
from collections import namedtuple
39
import gettext
40
from functools import wraps
41
from pyanaconda.core import constants
42
from org_fedora_oscap import utils
43
44
log = logging.getLogger("anaconda")
45
46
47
# mimick pyanaconda/core/i18n.py
48
def _(string):
49
    if string:
50
        return gettext.translation("oscap-anaconda-addon", fallback=True).gettext(string)
51
    else:
52
        return ""
53
54
55
def N_(string): return string
56
57
58
# everything else should be private
59
__all__ = ["run_oscap_remediate", "get_fix_rules_pre",
60
           "extract_data", "strip_content_dir",
61
           "OSCAPaddonError"]
62
63
INSTALLATION_CONTENT_DIR = "/tmp/openscap_data/"
64
TARGET_CONTENT_DIR = "/root/openscap_data/"
65
66
SSG_DIR = "/usr/share/xml/scap/ssg/content/"
67
68
# Enable patches that set the content name at package-time
69
DEFAULT_SSG_CONTENT_NAME = ""
70
SSG_CONTENT = DEFAULT_SSG_CONTENT_NAME
71
if not SSG_CONTENT:
72
    if constants.shortProductName != 'anaconda':
73
        if constants.shortProductName == 'fedora':
74
            SSG_CONTENT = "ssg-fedora-ds.xml"
75
        else:
76
            SSG_CONTENT = (
77
                "ssg-{name}{version}-ds.xml"
78
                .format(
79
                    name=constants.shortProductName,
80
                    version=constants.productVersion.strip(".")[0]))
81
82
RESULTS_PATH = utils.join_paths(TARGET_CONTENT_DIR,
83
                                "eval_remediate_results.xml")
84
REPORT_PATH = utils.join_paths(TARGET_CONTENT_DIR,
85
                               "eval_remediate_report.html")
86
87
PRE_INSTALL_FIX_SYSTEM_ATTR = "urn:redhat:anaconda:pre"
88
89
THREAD_FETCH_DATA = "AnaOSCAPdataFetchThread"
90
91
SUPPORTED_ARCHIVES = (".zip", ".tar", ".tar.gz", ".tar.bz2", )
92
93
# buffer size for reading and writing out data (in bytes)
94
IO_BUF_SIZE = 2 * 1024 * 1024
95
96
97
class OSCAPaddonError(Exception):
98
    """Exception class for OSCAP addon related errors."""
99
100
    pass
101
102
103
class OSCAPaddonNetworkError(OSCAPaddonError):
104
    """Exception class for OSCAP addon related network errors."""
105
106
    pass
107
108
109
class ExtractionError(OSCAPaddonError):
110
    """Exception class for the extraction errors."""
111
112
    pass
113
114
115
MESSAGE_TYPE_FATAL = 0
116
MESSAGE_TYPE_WARNING = 1
117
MESSAGE_TYPE_INFO = 2
118
119
# namedtuple for messages returned from the rules evaluation
120
#   origin -- class (inherited from RuleHandler) that generated the message
121
#   type -- one of the MESSAGE_TYPE_* constants defined above
122
#   text -- the actual message that should be displayed, logged, ...
123
RuleMessage = namedtuple("RuleMessage", ["origin", "type", "text"])
124
125
126
class SubprocessLauncher(object):
127
    def __init__(self, args):
128
        self.args = args
129
        self.stdout = ""
130
        self.stderr = ""
131
        self.messages = []
132
        self.returncode = None
133
134
    def execute(self, ** kwargs):
135
        command_string = " ".join(self.args)
136
        log.info(
137
            "OSCAP addon: Executing subprocess: '{command_string}'"
138
            .format(command_string=command_string))
139
        try:
140
            proc = subprocess.Popen(self.args, stdout=subprocess.PIPE,
141
                                    stderr=subprocess.PIPE, ** kwargs)
142
        except OSError as oserr:
143
            msg = ("Failed to execute command '{command_string}': {oserr}"
144
                   .format(command_string=command_string, oserr=oserr))
145
            raise OSCAPaddonError(msg)
146
147
        (stdout, stderr) = proc.communicate()
148
        self.stdout = stdout.decode()
149
        self.stderr = stderr.decode(errors="replace")
150
        self.messages = re.findall(r'OpenSCAP Error:.*', self.stderr)
151
        self.messages = self.messages + re.findall(r'E: oscap:.*', self.stderr)
152
153
        self.returncode = proc.returncode
154
155
    def log_messages(self):
156
        for message in self.messages:
157
            log.warning("OSCAP addon: " + message)
158
159
160
def get_fix_rules_pre(profile, fpath, ds_id="", xccdf_id="", tailoring=""):
161
    """
162
    Get fix rules for the pre-installation environment for a given profile in a
163
    given datastream and checklist in a given file.
164
165
    :see: run_oscap_remediate
166
    :see: _run_oscap_gen_fix
167
    :return: fix rules for a given profile
168
    :rtype: str
169
170
    """
171
172
    return _run_oscap_gen_fix(profile, fpath, PRE_INSTALL_FIX_SYSTEM_ATTR,
173
                              ds_id=ds_id, xccdf_id=xccdf_id,
174
                              tailoring=tailoring)
175
176
177
def _run_oscap_gen_fix(profile, fpath, template, ds_id="", xccdf_id="",
178
                       tailoring=""):
179
    """
180
    Run oscap tool on a given file to get the contents of fix elements with the
181
    'system' attribute equal to a given template for a given datastream,
182
    checklist and profile.
183
184
    :see: run_oscap_remediate
185
    :param template: the value of the 'system' attribute of the fix elements
186
    :type template: str
187
    :return: oscap tool's stdout
188
    :rtype: str
189
190
    """
191
192
    if not profile:
193
        return ""
194
195
    args = ["oscap", "xccdf", "generate", "fix"]
196
    args.append("--template=%s" % template)
197
198
    # oscap uses the default profile by default
199
    if profile.lower() != "default":
200
        args.append("--profile=%s" % profile)
201
    if ds_id:
202
        args.append("--datastream-id=%s" % ds_id)
203
    if xccdf_id:
204
        args.append("--xccdf-id=%s" % xccdf_id)
205
    if tailoring:
206
        args.append("--tailoring-file=%s" % tailoring)
207
208
    args.append(fpath)
209
210
    proc = SubprocessLauncher(args)
211
    proc.execute()
212
    proc.log_messages()
213
    if proc.returncode != 0:
214
        msg = "Failed to generate fix rules with the oscap tool: %s" % proc.stderr
215
        raise OSCAPaddonError(msg)
216
217
    return proc.stdout
218
219
220
def do_chroot(chroot):
221
    """Helper function doing the chroot if requested."""
222
    if chroot and chroot != "/":
223
        os.chroot(chroot)
224
        os.chdir("/")
225
226
227
def assert_scanner_works(chroot, executable="oscap"):
228
    args = [executable, "--version"]
229
    command = " ".join(args)
230
231
    try:
232
        proc = subprocess.Popen(
233
            args, preexec_fn=lambda: do_chroot(chroot),
234
            stdout=subprocess.PIPE, stderr=subprocess.PIPE)
235
        (stdout, stderr) = proc.communicate()
236
        stderr = stderr.decode(errors="replace")
237
    except OSError as exc:
238
        msg = _(f"Basic invocation '{command}' fails: {str(exc)}")
239
        raise OSCAPaddonError(msg)
240
    if proc.returncode != 0:
241
        msg = _(
242
            f"Basic scanner invocation '{command}' exited "
243
            "with non-zero error code {proc.returncode}: {stderr}")
244
        raise OSCAPaddonError(msg)
245
    return True
246
247
248
def run_oscap_remediate(profile, fpath, ds_id="", xccdf_id="", tailoring="",
249
                        chroot=""):
250
    """
251
    Run the evaluation and remediation with the oscap tool on a given file,
252
    doing the remediation as defined in a given profile defined in a given
253
    checklist that is a part of a given datastream. If requested, run in
254
    chroot.
255
256
    :param profile: id of the profile that will drive the remediation
257
    :type profile: str
258
    :param fpath: path to a file with SCAP content
259
    :type fpath: str
260
    :param ds_id: ID of the datastream that contains the checklist defining
261
                  the profile
262
    :type ds_id: str
263
    :param xccdf_id: ID of the checklist that defines the profile
264
    :type xccdf_id: str
265
    :param tailoring: path to a tailoring file
266
    :type tailoring: str
267
    :param chroot: path to the root the oscap tool should be run in
268
    :type chroot: str
269
    :return: oscap tool's stdout (summary of the rules, checks and fixes)
270
    :rtype: str
271
272
    """
273
274
    if not profile:
275
        return ""
276
277
    # make sure the directory for the results exists
278
    results_dir = os.path.dirname(RESULTS_PATH)
279
    if chroot:
280
        results_dir = os.path.normpath(chroot + "/" + results_dir)
281
    utils.ensure_dir_exists(results_dir)
282
283
    args = ["oscap", "xccdf", "eval"]
284
    args.append("--remediate")
285
    args.append("--results=%s" % RESULTS_PATH)
286
    args.append("--report=%s" % REPORT_PATH)
287
288
    # oscap uses the default profile by default
289
    if profile.lower() != "default":
290
        args.append("--profile=%s" % profile)
291
    if ds_id:
292
        args.append("--datastream-id=%s" % ds_id)
293
    if xccdf_id:
294
        args.append("--xccdf-id=%s" % xccdf_id)
295
    if tailoring:
296
        args.append("--tailoring-file=%s" % tailoring)
297
298
    args.append(fpath)
299
300
    proc = SubprocessLauncher(args)
301
    proc.execute(preexec_fn=lambda: do_chroot(chroot))
302
    proc.log_messages()
303
304
    if proc.returncode not in (0, 2):
305
        # 0 -- success; 2 -- no error, but checks/remediation failed
306
        msg = "Content evaluation and remediation with the oscap tool "\
307
            "failed: %s" % proc.stderr
308
        raise OSCAPaddonError(msg)
309
310
    return proc.stdout
311
312
313
def _schedule_firstboot_remediation(
314
        chroot, profile, ds_path, results_path, report_path, ds_id, xccdf_id, tailoring_path):
315
    config = textwrap.dedent(f"""\
316
    OSCAP_REMEDIATE_DS='{ds_path}'
317
    OSCAP_REMEDIATE_PROFILE_ID='{profile}'
318
    OSCAP_REMEDIATE_ARF_RESULT='{results_path}'
319
    OSCAP_REMEDIATE_HTML_REPORT='{report_path}'
320
    OSCAP_REMEDIATE_VERBOSE_LOGS='/var/tmp/oscap_verbose.log'
321
    """)
322
    if ds_id:
323
        config += "OSCAP_REMEDIATE_DATASTREAM_ID='{ds_id}'\n"
324
    if xccdf_id:
325
        config += "OSCAP_REMEDIATE_XCCDF_ID='{xccdf_id}'\n"
326
    if tailoring_path:
327
        config += "OSCAP_REMEDIATE_TAILORING='{tailoring_path}'\n"
328
329
    relative_filename = "var/tmp/oscap-remediate-offline.conf.sh"
330
    local_config_filename = f"/{relative_filename}"
331
    chroot_config_filename = os.path.join(chroot, relative_filename)
332
    with open(chroot_config_filename, "w") as f:
333
        f.write(config)
334
    os.symlink(local_config_filename,
335
               os.path.join(chroot, "system-update"))
336
337
338
def schedule_firstboot_remediation(chroot, profile, fpath, ds_id="", xccdf_id="", tailoring=""):
339
    if not profile:
340
        return ""
341
342
    # make sure the directory for the results exists
343
    results_dir = os.path.dirname(RESULTS_PATH)
344
    results_dir = os.path.normpath(chroot + "/" + results_dir)
345
    utils.ensure_dir_exists(results_dir)
346
347
    log.info("OSCAP addon: Scheduling firstboot remediation")
348
    _schedule_firstboot_remediation(
349
        chroot, profile, fpath, RESULTS_PATH, REPORT_PATH, ds_id, xccdf_id, tailoring)
350
351
    return ""
352
353
354
def extract_data(archive, out_dir, ensure_has_files=None):
355
    """
356
    Fuction that extracts the given archive to the given output directory. It
357
    tries to find out the archive type by the file name.
358
359
    :param archive: path to the archive file that should be extracted
360
    :type archive: str
361
    :param out_dir: output directory the archive should be extracted to
362
    :type out_dir: str
363
    :param ensure_has_files: relative paths to the files that must exist in the
364
                             archive
365
    :type ensure_has_files: iterable of strings or None
366
    :return: a list of files and directories extracted from the archive
367
    :rtype: [str]
368
369
    """
370
371
    if not ensure_has_files:
372
        ensure_has_files = []
373
374
    # get rid of empty file paths
375
    if not ensure_has_files:
376
        ensure_has_files = []
377
    else:
378
        ensure_has_files = [fpath for fpath in ensure_has_files if fpath]
379
380
    msg = "OSCAP addon: Extracting {archive}".format(archive=archive)
381
    if ensure_has_files:
382
        msg += ", expecting to find {files} there.".format(files=tuple(ensure_has_files))
383
    log.info(msg)
384
385
    result = []
386
    if archive.endswith(".zip"):
387
        # ZIP file
388
        try:
389
            zfile = zipfile.ZipFile(archive, "r")
390
        except Exception as exc:
391
            msg = _(f"Error extracting archive as a zipfile: {exc}")
392
            raise ExtractionError(msg)
393
394
        # generator for the paths of the files found in the archive (dirs end
395
        # with "/")
396
        files = set(info.filename for info in zfile.filelist
397
                    if not info.filename.endswith("/"))
398
        for fpath in ensure_has_files or ():
399
            if fpath not in files:
400
                msg = "File '%s' not found in the archive '%s'" % (fpath,
401
                                                                   archive)
402
                raise ExtractionError(msg)
403
404
        utils.ensure_dir_exists(out_dir)
405
        zfile.extractall(path=out_dir)
406
        result = [utils.join_paths(out_dir, info.filename) for info in zfile.filelist]
407
        zfile.close()
408
    elif archive.endswith(".tar"):
409
        # plain tarball
410
        result = _extract_tarball(archive, out_dir, ensure_has_files, None)
411
    elif archive.endswith(".tar.gz"):
412
        # gzipped tarball
413
        result = _extract_tarball(archive, out_dir, ensure_has_files, "gz")
414
    elif archive.endswith(".tar.bz2"):
415
        # bzipped tarball
416
        result = _extract_tarball(archive, out_dir, ensure_has_files, "bz2")
417
    elif archive.endswith(".rpm"):
418
        # RPM
419
        result = _extract_rpm(archive, out_dir, ensure_has_files)
420
    # elif other types of archives
421
    else:
422
        raise ExtractionError("Unsuported archive type")
423
    log.info("OSCAP addon: Extracted {files} from the supplied content"
424
             .format(files=result))
425
    return result
426
427
428
def _extract_tarball(archive, out_dir, ensure_has_files, alg):
429
    """
430
    Extract the given TAR archive to the given output directory and make sure
431
    the given file exists in the archive.
432
433
    :see: extract_data
434
    :param alg: compression algorithm used for the tarball
435
    :type alg: str (one of "gz", "bz2") or None
436
    :return: a list of files and directories extracted from the archive
437
    :rtype: [str]
438
439
    """
440
441
    if alg and alg not in ("gz", "bz2",):
442
        raise ExtractionError("Unsupported compression algorithm")
443
444
    mode = "r"
445
    if alg:
446
        mode += ":%s" % alg
447
448
    try:
449
        tfile = tarfile.TarFile.open(archive, mode)
450
    except tarfile.TarError as err:
451
        raise ExtractionError(str(err))
452
453
    # generator for the paths of the files found in the archive
454
    files = set(member.path for member in tfile.getmembers()
455
                if member.isfile())
456
457
    for fpath in ensure_has_files or ():
458
        if fpath not in files:
459
            msg = "File '%s' not found in the archive '%s'" % (fpath, archive)
460
            raise ExtractionError(msg)
461
462
    utils.ensure_dir_exists(out_dir)
463
    tfile.extractall(path=out_dir)
464
    result = [utils.join_paths(out_dir, member.path) for member in tfile.getmembers()]
465
    tfile.close()
466
467
    return result
468
469
470
def _extract_rpm(rpm_path, root="/", ensure_has_files=None):
471
    """
472
    Extract the given RPM into the directory tree given by the root argument
473
    and make sure the given file exists in the archive.
474
475
    :param rpm_path: path to the RPM file that should be extracted
476
    :type rpm_path: str
477
    :param root: root of the directory tree the RPM should be extracted into
478
    :type root: str
479
    :param ensure_has_files: relative paths to the files that must exist in the
480
                             RPM
481
    :type ensure_has_files: iterable of strings or None
482
    :return: a list of files and directories extracted from the archive
483
    :rtype: [str]
484
485
    """
486
487
    # run rpm2cpio and process the output with the cpioarchive module
488
    temp_fd, temp_path = tempfile.mkstemp(prefix="oscap_rpm")
489
    proc = subprocess.Popen(["rpm2cpio", rpm_path], stdout=temp_fd)
490
    proc.wait()
491
    if proc.returncode != 0:
492
        msg = "Failed to convert RPM '%s' to cpio archive" % rpm_path
493
        raise ExtractionError(msg)
494
495
    os.close(temp_fd)
496
497
    try:
498
        archive = cpioarchive.CpioArchive(temp_path)
499
    except cpioarchive.CpioError as err:
500
        raise ExtractionError(str(err))
501
502
    # get entries from the archive (supports only iteration over entries)
503
    entries = set(entry for entry in archive)
504
505
    # cpio entry names (paths) start with the dot
506
    entry_names = [entry.name.lstrip(".") for entry in entries]
507
508
    for fpath in ensure_has_files or ():
509
        # RPM->cpio entries have absolute paths
510
        if fpath not in entry_names and \
511
           os.path.join("/", fpath) not in entry_names:
512
            msg = "File '%s' not found in the archive '%s'" % (fpath, rpm_path)
513
            raise ExtractionError(msg)
514
515
    try:
516
        for entry in entries:
517
            if entry.size == 0:
518
                continue
519
            dirname = os.path.dirname(entry.name.lstrip("."))
520
            out_dir = os.path.normpath(root + dirname)
521
            utils.ensure_dir_exists(out_dir)
522
523
            out_fpath = os.path.normpath(root + entry.name.lstrip("."))
524
            if os.path.exists(out_fpath):
525
                continue
526
            with open(out_fpath, "wb") as out_file:
527
                buf = entry.read(IO_BUF_SIZE)
528
                while buf:
529
                    out_file.write(buf)
530
                    buf = entry.read(IO_BUF_SIZE)
531
    except (IOError, cpioarchive.CpioError) as e:
532
        raise ExtractionError(e)
533
534
    # cleanup
535
    archive.close()
536
    os.unlink(temp_path)
537
538
    return [os.path.normpath(root + name) for name in entry_names]
539
540
541
def strip_content_dir(fpaths, phase="preinst"):
542
    """
543
    Strip content directory prefix from the file paths for either
544
    pre-installation or post-installation phase.
545
546
    :param fpaths: iterable of file paths to strip content directory prefix
547
                   from
548
    :type fpaths: iterable of strings
549
    :param phase: specifies pre-installation or post-installation phase
550
    :type phase: "preinst" or "postinst"
551
    :return: the same iterable of file paths as given with the content
552
             directory prefix stripped
553
    :rtype: same type as fpaths
554
555
    """
556
557
    if phase == "preinst":
558
        remove_prefix = lambda x: x[len(INSTALLATION_CONTENT_DIR):]
559
    else:
560
        remove_prefix = lambda x: x[len(TARGET_CONTENT_DIR):]
561
562
    return utils.keep_type_map(remove_prefix, fpaths)
563
564
565
def get_ssg_path(root="/"):
566
    return utils.join_paths(root, SSG_DIR + SSG_CONTENT)
567
568
569
def ssg_available(root="/"):
570
    """
571
    Tries to find the SCAP Security Guide under the given root.
572
573
    :return: True if SSG was found under the given root, False otherwise
574
575
    """
576
577
    return os.path.exists(get_ssg_path(root))
578
579
580
def dry_run_skip(func):
581
    """
582
    Decorator that makes sure the decorated function is noop in the dry-run
583
    mode.
584
585
    :param func: a decorated function that needs to have the first parameter an
586
                 object with the _addon_data attribute referencing the OSCAP
587
                 addon's ksdata
588
    """
589
590
    @wraps(func)
591
    def decorated(self, *args, **kwargs):
592
        if self._addon_data.dry_run:
593
            return
594
        else:
595
            return func(self, *args, **kwargs)
596
597
    return decorated
598