| Total Complexity | 87 |
| Total Lines | 671 |
| Duplicated Lines | 2.24 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like org_fedora_oscap.common often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # |
||
| 2 | # Copyright (C) 2013 Red Hat, Inc. |
||
| 3 | # |
||
| 4 | # This copyrighted material is made available to anyone wishing to use, |
||
| 5 | # modify, copy, or redistribute it subject to the terms and conditions of |
||
| 6 | # the GNU General Public License v.2, or (at your option) any later version. |
||
| 7 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
| 8 | # ANY WARRANTY expressed or implied, including the implied warranties of |
||
| 9 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General |
||
| 10 | # Public License for more details. You should have received a copy of the |
||
| 11 | # GNU General Public License along with this program; if not, write to the |
||
| 12 | # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
||
| 13 | # 02110-1301, USA. Any Red Hat trademarks that are incorporated in the |
||
| 14 | # source code or documentation are not subject to the GNU General Public |
||
| 15 | # License and may only be used or replicated with the express permission of |
||
| 16 | # Red Hat, Inc. |
||
| 17 | # |
||
| 18 | # Red Hat Author(s): Vratislav Podzimek <[email protected]> |
||
| 19 | # |
||
| 20 | |||
| 21 | """ |
||
| 22 | Module with various classes and functions needed by the OSCAP addon that are |
||
| 23 | not specific to any installation mode (tui, gui, ks). |
||
| 24 | |||
| 25 | """ |
||
| 26 | |||
| 27 | import os |
||
| 28 | import tempfile |
||
| 29 | import subprocess |
||
| 30 | import zipfile |
||
| 31 | import tarfile |
||
| 32 | |||
| 33 | import cpioarchive |
||
| 34 | import re |
||
| 35 | import logging |
||
| 36 | |||
| 37 | from collections import namedtuple |
||
| 38 | import gettext |
||
| 39 | from functools import wraps |
||
| 40 | |||
| 41 | from dasbus.identifier import DBusServiceIdentifier |
||
| 42 | from pyanaconda.core import constants |
||
| 43 | from pyanaconda.core.dbus import DBus |
||
| 44 | from pyanaconda.core.constants import PAYLOAD_TYPE_DNF |
||
| 45 | from pyanaconda.modules.common.constants.namespaces import ADDONS_NAMESPACE |
||
| 46 | from pyanaconda.modules.common.constants.services import PAYLOADS |
||
| 47 | from pyanaconda.modules.common.structures.payload import PackagesConfigurationData |
||
| 48 | from pyanaconda.threading import threadMgr, AnacondaThread |
||
| 49 | |||
| 50 | from org_fedora_oscap import utils |
||
| 51 | |||
| 52 | log = logging.getLogger("anaconda") |
||
| 53 | |||
| 54 | |||
| 55 | # mimick pyanaconda/core/i18n.py |
||
| 56 | def _(string): |
||
| 57 | if string: |
||
| 58 | return gettext.translation("oscap-anaconda-addon", fallback=True).gettext(string) |
||
| 59 | else: |
||
| 60 | return "" |
||
| 61 | |||
| 62 | |||
| 63 | def N_(string): return string |
||
| 64 | |||
| 65 | |||
| 66 | # everything else should be private |
||
| 67 | __all__ = ["run_oscap_remediate", "get_fix_rules_pre", |
||
| 68 | "extract_data", "strip_content_dir", |
||
| 69 | "OSCAPaddonError", "get_payload_proxy", "get_packages_data", |
||
| 70 | "set_packages_data"] |
||
| 71 | |||
| 72 | INSTALLATION_CONTENT_DIR = "/tmp/openscap_data/" |
||
| 73 | TARGET_CONTENT_DIR = "/root/openscap_data/" |
||
| 74 | |||
| 75 | SSG_DIR = "/usr/share/xml/scap/ssg/content/" |
||
| 76 | |||
| 77 | # Enable patches that set the content name at package-time |
||
| 78 | DEFAULT_SSG_CONTENT_NAME = "" |
||
| 79 | SSG_CONTENT = DEFAULT_SSG_CONTENT_NAME |
||
| 80 | if not SSG_CONTENT: |
||
| 81 | if constants.shortProductName != 'anaconda': |
||
| 82 | if constants.shortProductName == 'fedora': |
||
| 83 | SSG_CONTENT = "ssg-fedora-ds.xml" |
||
| 84 | else: |
||
| 85 | SSG_CONTENT = ( |
||
| 86 | "ssg-{name}{version}-ds.xml" |
||
| 87 | .format( |
||
| 88 | name=constants.shortProductName, |
||
| 89 | version=constants.productVersion.strip(".")[0])) |
||
| 90 | |||
| 91 | RESULTS_PATH = utils.join_paths(TARGET_CONTENT_DIR, |
||
| 92 | "eval_remediate_results.xml") |
||
| 93 | REPORT_PATH = utils.join_paths(TARGET_CONTENT_DIR, |
||
| 94 | "eval_remediate_report.html") |
||
| 95 | |||
| 96 | PRE_INSTALL_FIX_SYSTEM_ATTR = "urn:redhat:anaconda:pre" |
||
| 97 | |||
| 98 | THREAD_FETCH_DATA = "AnaOSCAPdataFetchThread" |
||
| 99 | |||
| 100 | SUPPORTED_ARCHIVES = (".zip", ".tar", ".tar.gz", ".tar.bz2", ) |
||
| 101 | |||
| 102 | SUPPORTED_CONTENT_TYPES = ( |
||
| 103 | "datastream", "rpm", "archive", "scap-security-guide", |
||
| 104 | ) |
||
| 105 | |||
| 106 | SUPPORTED_URL_PREFIXES = ( |
||
| 107 | "http://", "https://", "ftp://", # LABEL:?, hdaX:?, |
||
| 108 | ) |
||
| 109 | |||
| 110 | # buffer size for reading and writing out data (in bytes) |
||
| 111 | IO_BUF_SIZE = 2 * 1024 * 1024 |
||
| 112 | |||
| 113 | # DBus constants |
||
| 114 | KDUMP = DBusServiceIdentifier( |
||
| 115 | namespace=ADDONS_NAMESPACE, |
||
| 116 | basename="Kdump", |
||
| 117 | message_bus=DBus |
||
| 118 | ) |
||
| 119 | |||
| 120 | |||
| 121 | class OSCAPaddonError(Exception): |
||
| 122 | """Exception class for OSCAP addon related errors.""" |
||
| 123 | |||
| 124 | pass |
||
| 125 | |||
| 126 | |||
| 127 | class OSCAPaddonNetworkError(OSCAPaddonError): |
||
| 128 | """Exception class for OSCAP addon related network errors.""" |
||
| 129 | |||
| 130 | pass |
||
| 131 | |||
| 132 | |||
| 133 | class ExtractionError(OSCAPaddonError): |
||
| 134 | """Exception class for the extraction errors.""" |
||
| 135 | |||
| 136 | pass |
||
| 137 | |||
| 138 | |||
| 139 | MESSAGE_TYPE_FATAL = 0 |
||
| 140 | MESSAGE_TYPE_WARNING = 1 |
||
| 141 | MESSAGE_TYPE_INFO = 2 |
||
| 142 | |||
| 143 | # namedtuple for messages returned from the rules evaluation |
||
| 144 | # origin -- class (inherited from RuleHandler) that generated the message |
||
| 145 | # type -- one of the MESSAGE_TYPE_* constants defined above |
||
| 146 | # text -- the actual message that should be displayed, logged, ... |
||
| 147 | RuleMessage = namedtuple("RuleMessage", ["origin", "type", "text"]) |
||
| 148 | |||
| 149 | |||
| 150 | class SubprocessLauncher(object): |
||
| 151 | def __init__(self, args): |
||
| 152 | self.args = args |
||
| 153 | self.stdout = "" |
||
| 154 | self.stderr = "" |
||
| 155 | self.messages = [] |
||
| 156 | self.returncode = None |
||
| 157 | |||
| 158 | def execute(self, ** kwargs): |
||
| 159 | command_string = " ".join(self.args) |
||
| 160 | log.info( |
||
| 161 | "OSCAP addon: Executing subprocess: '{command_string}'" |
||
| 162 | .format(command_string=command_string)) |
||
| 163 | try: |
||
| 164 | proc = subprocess.Popen(self.args, stdout=subprocess.PIPE, |
||
| 165 | stderr=subprocess.PIPE, ** kwargs) |
||
| 166 | except OSError as oserr: |
||
| 167 | msg = "Failed to run the oscap tool: %s" % oserr |
||
| 168 | raise OSCAPaddonError(msg) |
||
| 169 | |||
| 170 | (stdout, stderr) = proc.communicate() |
||
| 171 | self.stdout = stdout.decode() |
||
| 172 | self.stderr = stderr.decode(errors="replace") |
||
| 173 | self.messages = re.findall(r'OpenSCAP Error:.*', self.stderr) |
||
| 174 | self.messages = self.messages + re.findall(r'E: oscap:.*', self.stderr) |
||
| 175 | |||
| 176 | self.returncode = proc.returncode |
||
| 177 | |||
| 178 | def log_messages(self): |
||
| 179 | for message in self.messages: |
||
| 180 | log.warning("OSCAP addon: " + message) |
||
| 181 | |||
| 182 | |||
| 183 | def get_fix_rules_pre(profile, fpath, ds_id="", xccdf_id="", tailoring=""): |
||
| 184 | """ |
||
| 185 | Get fix rules for the pre-installation environment for a given profile in a |
||
| 186 | given datastream and checklist in a given file. |
||
| 187 | |||
| 188 | :see: run_oscap_remediate |
||
| 189 | :see: _run_oscap_gen_fix |
||
| 190 | :return: fix rules for a given profile |
||
| 191 | :rtype: str |
||
| 192 | |||
| 193 | """ |
||
| 194 | |||
| 195 | return _run_oscap_gen_fix(profile, fpath, PRE_INSTALL_FIX_SYSTEM_ATTR, |
||
| 196 | ds_id=ds_id, xccdf_id=xccdf_id, |
||
| 197 | tailoring=tailoring) |
||
| 198 | |||
| 199 | |||
| 200 | def _run_oscap_gen_fix(profile, fpath, template, ds_id="", xccdf_id="", |
||
| 201 | tailoring=""): |
||
| 202 | """ |
||
| 203 | Run oscap tool on a given file to get the contents of fix elements with the |
||
| 204 | 'system' attribute equal to a given template for a given datastream, |
||
| 205 | checklist and profile. |
||
| 206 | |||
| 207 | :see: run_oscap_remediate |
||
| 208 | :param template: the value of the 'system' attribute of the fix elements |
||
| 209 | :type template: str |
||
| 210 | :return: oscap tool's stdout |
||
| 211 | :rtype: str |
||
| 212 | |||
| 213 | """ |
||
| 214 | |||
| 215 | if not profile: |
||
| 216 | return "" |
||
| 217 | |||
| 218 | args = ["oscap", "xccdf", "generate", "fix"] |
||
| 219 | args.append("--template=%s" % template) |
||
| 220 | |||
| 221 | # oscap uses the default profile by default |
||
| 222 | if profile.lower() != "default": |
||
| 223 | args.append("--profile=%s" % profile) |
||
| 224 | if ds_id: |
||
| 225 | args.append("--datastream-id=%s" % ds_id) |
||
| 226 | if xccdf_id: |
||
| 227 | args.append("--xccdf-id=%s" % xccdf_id) |
||
| 228 | if tailoring: |
||
| 229 | args.append("--tailoring-file=%s" % tailoring) |
||
| 230 | |||
| 231 | args.append(fpath) |
||
| 232 | |||
| 233 | proc = SubprocessLauncher(args) |
||
| 234 | proc.execute() |
||
| 235 | proc.log_messages() |
||
| 236 | if proc.returncode != 0: |
||
| 237 | msg = "Failed to generate fix rules with the oscap tool: %s" % proc.stderr |
||
| 238 | raise OSCAPaddonError(msg) |
||
| 239 | |||
| 240 | return proc.stdout |
||
| 241 | |||
| 242 | |||
| 243 | def run_oscap_remediate(profile, fpath, ds_id="", xccdf_id="", tailoring="", |
||
| 244 | chroot=""): |
||
| 245 | """ |
||
| 246 | Run the evaluation and remediation with the oscap tool on a given file, |
||
| 247 | doing the remediation as defined in a given profile defined in a given |
||
| 248 | checklist that is a part of a given datastream. If requested, run in |
||
| 249 | chroot. |
||
| 250 | |||
| 251 | :param profile: id of the profile that will drive the remediation |
||
| 252 | :type profile: str |
||
| 253 | :param fpath: path to a file with SCAP content |
||
| 254 | :type fpath: str |
||
| 255 | :param ds_id: ID of the datastream that contains the checklist defining |
||
| 256 | the profile |
||
| 257 | :type ds_id: str |
||
| 258 | :param xccdf_id: ID of the checklist that defines the profile |
||
| 259 | :type xccdf_id: str |
||
| 260 | :param tailoring: path to a tailoring file |
||
| 261 | :type tailoring: str |
||
| 262 | :param chroot: path to the root the oscap tool should be run in |
||
| 263 | :type chroot: str |
||
| 264 | :return: oscap tool's stdout (summary of the rules, checks and fixes) |
||
| 265 | :rtype: str |
||
| 266 | |||
| 267 | """ |
||
| 268 | |||
| 269 | if not profile: |
||
| 270 | return "" |
||
| 271 | |||
| 272 | def do_chroot(): |
||
| 273 | """Helper function doing the chroot if requested.""" |
||
| 274 | if chroot and chroot != "/": |
||
| 275 | os.chroot(chroot) |
||
| 276 | os.chdir("/") |
||
| 277 | |||
| 278 | # make sure the directory for the results exists |
||
| 279 | results_dir = os.path.dirname(RESULTS_PATH) |
||
| 280 | if chroot: |
||
| 281 | results_dir = os.path.normpath(chroot + "/" + results_dir) |
||
| 282 | utils.ensure_dir_exists(results_dir) |
||
| 283 | |||
| 284 | args = ["oscap", "xccdf", "eval"] |
||
| 285 | args.append("--remediate") |
||
| 286 | args.append("--results=%s" % RESULTS_PATH) |
||
| 287 | args.append("--report=%s" % REPORT_PATH) |
||
| 288 | |||
| 289 | # oscap uses the default profile by default |
||
| 290 | if profile.lower() != "default": |
||
| 291 | args.append("--profile=%s" % profile) |
||
| 292 | if ds_id: |
||
| 293 | args.append("--datastream-id=%s" % ds_id) |
||
| 294 | if xccdf_id: |
||
| 295 | args.append("--xccdf-id=%s" % xccdf_id) |
||
| 296 | if tailoring: |
||
| 297 | args.append("--tailoring-file=%s" % tailoring) |
||
| 298 | |||
| 299 | args.append(fpath) |
||
| 300 | |||
| 301 | proc = SubprocessLauncher(args) |
||
| 302 | proc.execute(preexec_fn=do_chroot) |
||
| 303 | proc.log_messages() |
||
| 304 | |||
| 305 | if proc.returncode not in (0, 2): |
||
| 306 | # 0 -- success; 2 -- no error, but checks/remediation failed |
||
| 307 | msg = "Content evaluation and remediation with the oscap tool "\ |
||
| 308 | "failed: %s" % proc.stderr |
||
| 309 | raise OSCAPaddonError(msg) |
||
| 310 | |||
| 311 | return proc.stdout |
||
| 312 | |||
| 313 | |||
| 314 | def extract_data(archive, out_dir, ensure_has_files=None): |
||
| 315 | """ |
||
| 316 | Fuction that extracts the given archive to the given output directory. It |
||
| 317 | tries to find out the archive type by the file name. |
||
| 318 | |||
| 319 | :param archive: path to the archive file that should be extracted |
||
| 320 | :type archive: str |
||
| 321 | :param out_dir: output directory the archive should be extracted to |
||
| 322 | :type out_dir: str |
||
| 323 | :param ensure_has_files: relative paths to the files that must exist in the |
||
| 324 | archive |
||
| 325 | :type ensure_has_files: iterable of strings or None |
||
| 326 | :return: a list of files and directories extracted from the archive |
||
| 327 | :rtype: [str] |
||
| 328 | |||
| 329 | """ |
||
| 330 | |||
| 331 | if not ensure_has_files: |
||
| 332 | ensure_has_files = [] |
||
| 333 | |||
| 334 | # get rid of empty file paths |
||
| 335 | if not ensure_has_files: |
||
| 336 | ensure_has_files = [] |
||
| 337 | else: |
||
| 338 | ensure_has_files = [fpath for fpath in ensure_has_files if fpath] |
||
| 339 | |||
| 340 | msg = "OSCAP addon: Extracting {archive}".format(archive=archive) |
||
| 341 | if ensure_has_files: |
||
| 342 | msg += ", expecting to find {files} there.".format(files=tuple(ensure_has_files)) |
||
| 343 | log.info(msg) |
||
| 344 | |||
| 345 | result = [] |
||
| 346 | if archive.endswith(".zip"): |
||
| 347 | # ZIP file |
||
| 348 | try: |
||
| 349 | zfile = zipfile.ZipFile(archive, "r") |
||
| 350 | except Exception as exc: |
||
| 351 | msg = _(f"Error extracting archive as a zipfile: {exc}") |
||
| 352 | raise ExtractionError(msg) |
||
| 353 | |||
| 354 | # generator for the paths of the files found in the archive (dirs end |
||
| 355 | # with "/") |
||
| 356 | files = set(info.filename for info in zfile.filelist |
||
| 357 | if not info.filename.endswith("/")) |
||
| 358 | for fpath in ensure_has_files or (): |
||
| 359 | if fpath not in files: |
||
| 360 | msg = "File '%s' not found in the archive '%s'" % (fpath, |
||
| 361 | archive) |
||
| 362 | raise ExtractionError(msg) |
||
| 363 | |||
| 364 | utils.ensure_dir_exists(out_dir) |
||
| 365 | zfile.extractall(path=out_dir) |
||
| 366 | result = [utils.join_paths(out_dir, info.filename) for info in zfile.filelist] |
||
| 367 | zfile.close() |
||
| 368 | elif archive.endswith(".tar"): |
||
| 369 | # plain tarball |
||
| 370 | result = _extract_tarball(archive, out_dir, ensure_has_files, None) |
||
| 371 | elif archive.endswith(".tar.gz"): |
||
| 372 | # gzipped tarball |
||
| 373 | result = _extract_tarball(archive, out_dir, ensure_has_files, "gz") |
||
| 374 | elif archive.endswith(".tar.bz2"): |
||
| 375 | # bzipped tarball |
||
| 376 | result = _extract_tarball(archive, out_dir, ensure_has_files, "bz2") |
||
| 377 | elif archive.endswith(".rpm"): |
||
| 378 | # RPM |
||
| 379 | result = _extract_rpm(archive, out_dir, ensure_has_files) |
||
| 380 | # elif other types of archives |
||
| 381 | else: |
||
| 382 | raise ExtractionError("Unsuported archive type") |
||
| 383 | log.info("OSCAP addon: Extracted {files} from the supplied content" |
||
| 384 | .format(files=result)) |
||
| 385 | return result |
||
| 386 | |||
| 387 | |||
| 388 | def _extract_tarball(archive, out_dir, ensure_has_files, alg): |
||
| 389 | """ |
||
| 390 | Extract the given TAR archive to the given output directory and make sure |
||
| 391 | the given file exists in the archive. |
||
| 392 | |||
| 393 | :see: extract_data |
||
| 394 | :param alg: compression algorithm used for the tarball |
||
| 395 | :type alg: str (one of "gz", "bz2") or None |
||
| 396 | :return: a list of files and directories extracted from the archive |
||
| 397 | :rtype: [str] |
||
| 398 | |||
| 399 | """ |
||
| 400 | |||
| 401 | if alg and alg not in ("gz", "bz2",): |
||
| 402 | raise ExtractionError("Unsupported compression algorithm") |
||
| 403 | |||
| 404 | mode = "r" |
||
| 405 | if alg: |
||
| 406 | mode += ":%s" % alg |
||
| 407 | |||
| 408 | try: |
||
| 409 | tfile = tarfile.TarFile.open(archive, mode) |
||
| 410 | except tarfile.TarError as err: |
||
| 411 | raise ExtractionError(str(err)) |
||
| 412 | |||
| 413 | # generator for the paths of the files found in the archive |
||
| 414 | files = set(member.path for member in tfile.getmembers() |
||
| 415 | if member.isfile()) |
||
| 416 | |||
| 417 | for fpath in ensure_has_files or (): |
||
| 418 | if fpath not in files: |
||
| 419 | msg = "File '%s' not found in the archive '%s'" % (fpath, archive) |
||
| 420 | raise ExtractionError(msg) |
||
| 421 | |||
| 422 | utils.ensure_dir_exists(out_dir) |
||
| 423 | tfile.extractall(path=out_dir) |
||
| 424 | result = [utils.join_paths(out_dir, member.path) for member in tfile.getmembers()] |
||
| 425 | tfile.close() |
||
| 426 | |||
| 427 | return result |
||
| 428 | |||
| 429 | |||
| 430 | def _extract_rpm(rpm_path, root="/", ensure_has_files=None): |
||
| 431 | """ |
||
| 432 | Extract the given RPM into the directory tree given by the root argument |
||
| 433 | and make sure the given file exists in the archive. |
||
| 434 | |||
| 435 | :param rpm_path: path to the RPM file that should be extracted |
||
| 436 | :type rpm_path: str |
||
| 437 | :param root: root of the directory tree the RPM should be extracted into |
||
| 438 | :type root: str |
||
| 439 | :param ensure_has_files: relative paths to the files that must exist in the |
||
| 440 | RPM |
||
| 441 | :type ensure_has_files: iterable of strings or None |
||
| 442 | :return: a list of files and directories extracted from the archive |
||
| 443 | :rtype: [str] |
||
| 444 | |||
| 445 | """ |
||
| 446 | |||
| 447 | # run rpm2cpio and process the output with the cpioarchive module |
||
| 448 | temp_fd, temp_path = tempfile.mkstemp(prefix="oscap_rpm") |
||
| 449 | proc = subprocess.Popen(["rpm2cpio", rpm_path], stdout=temp_fd) |
||
| 450 | proc.wait() |
||
| 451 | if proc.returncode != 0: |
||
| 452 | msg = "Failed to convert RPM '%s' to cpio archive" % rpm_path |
||
| 453 | raise ExtractionError(msg) |
||
| 454 | |||
| 455 | os.close(temp_fd) |
||
| 456 | |||
| 457 | try: |
||
| 458 | archive = cpioarchive.CpioArchive(temp_path) |
||
| 459 | except cpioarchive.CpioError as err: |
||
| 460 | raise ExtractionError(str(err)) |
||
| 461 | |||
| 462 | # get entries from the archive (supports only iteration over entries) |
||
| 463 | entries = set(entry for entry in archive) |
||
| 464 | |||
| 465 | # cpio entry names (paths) start with the dot |
||
| 466 | entry_names = [entry.name.lstrip(".") for entry in entries] |
||
| 467 | |||
| 468 | for fpath in ensure_has_files or (): |
||
| 469 | # RPM->cpio entries have absolute paths |
||
| 470 | if fpath not in entry_names and \ |
||
| 471 | os.path.join("/", fpath) not in entry_names: |
||
| 472 | msg = "File '%s' not found in the archive '%s'" % (fpath, rpm_path) |
||
| 473 | raise ExtractionError(msg) |
||
| 474 | |||
| 475 | try: |
||
| 476 | for entry in entries: |
||
| 477 | if entry.size == 0: |
||
| 478 | continue |
||
| 479 | dirname = os.path.dirname(entry.name.lstrip(".")) |
||
| 480 | out_dir = os.path.normpath(root + dirname) |
||
| 481 | utils.ensure_dir_exists(out_dir) |
||
| 482 | |||
| 483 | out_fpath = os.path.normpath(root + entry.name.lstrip(".")) |
||
| 484 | if os.path.exists(out_fpath): |
||
| 485 | continue |
||
| 486 | with open(out_fpath, "wb") as out_file: |
||
| 487 | buf = entry.read(IO_BUF_SIZE) |
||
| 488 | while buf: |
||
| 489 | out_file.write(buf) |
||
| 490 | buf = entry.read(IO_BUF_SIZE) |
||
| 491 | except (IOError, cpioarchive.CpioError) as e: |
||
| 492 | raise ExtractionError(e) |
||
| 493 | |||
| 494 | # cleanup |
||
| 495 | archive.close() |
||
| 496 | os.unlink(temp_path) |
||
| 497 | |||
| 498 | return [os.path.normpath(root + name) for name in entry_names] |
||
| 499 | |||
| 500 | |||
| 501 | def strip_content_dir(fpaths, phase="preinst"): |
||
| 502 | """ |
||
| 503 | Strip content directory prefix from the file paths for either |
||
| 504 | pre-installation or post-installation phase. |
||
| 505 | |||
| 506 | :param fpaths: iterable of file paths to strip content directory prefix |
||
| 507 | from |
||
| 508 | :type fpaths: iterable of strings |
||
| 509 | :param phase: specifies pre-installation or post-installation phase |
||
| 510 | :type phase: "preinst" or "postinst" |
||
| 511 | :return: the same iterable of file paths as given with the content |
||
| 512 | directory prefix stripped |
||
| 513 | :rtype: same type as fpaths |
||
| 514 | |||
| 515 | """ |
||
| 516 | |||
| 517 | if phase == "preinst": |
||
| 518 | remove_prefix = lambda x: x[len(INSTALLATION_CONTENT_DIR):] |
||
| 519 | else: |
||
| 520 | remove_prefix = lambda x: x[len(TARGET_CONTENT_DIR):] |
||
| 521 | |||
| 522 | return utils.keep_type_map(remove_prefix, fpaths) |
||
| 523 | |||
| 524 | |||
| 525 | def get_ssg_path(root="/"): |
||
| 526 | return utils.join_paths(root, SSG_DIR + SSG_CONTENT) |
||
| 527 | |||
| 528 | |||
| 529 | def ssg_available(root="/"): |
||
| 530 | """ |
||
| 531 | Tries to find the SCAP Security Guide under the given root. |
||
| 532 | |||
| 533 | :return: True if SSG was found under the given root, False otherwise |
||
| 534 | |||
| 535 | """ |
||
| 536 | |||
| 537 | return os.path.exists(get_ssg_path(root)) |
||
| 538 | |||
| 539 | |||
| 540 | View Code Duplication | def get_content_name(data): |
|
|
|
|||
| 541 | if data.content_type == "scap-security-guide": |
||
| 542 | raise ValueError("Using scap-security-guide, no single content file") |
||
| 543 | |||
| 544 | rest = "/anonymous_content" |
||
| 545 | for prefix in SUPPORTED_URL_PREFIXES: |
||
| 546 | if data.content_url.startswith(prefix): |
||
| 547 | rest = data.content_url[len(prefix):] |
||
| 548 | break |
||
| 549 | |||
| 550 | parts = rest.rsplit("/", 1) |
||
| 551 | if len(parts) != 2: |
||
| 552 | raise ValueError("Unsupported url '%s'" % data.content_url) |
||
| 553 | |||
| 554 | return parts[1] |
||
| 555 | |||
| 556 | |||
| 557 | def get_raw_preinst_content_path(data): |
||
| 558 | """Path to the raw (unextracted, ...) pre-installation content file""" |
||
| 559 | if data.content_type == "scap-security-guide": |
||
| 560 | log.debug("Using scap-security-guide, no single content file") |
||
| 561 | return None |
||
| 562 | |||
| 563 | content_name = get_content_name(data) |
||
| 564 | return utils.join_paths(INSTALLATION_CONTENT_DIR, content_name) |
||
| 565 | |||
| 566 | |||
| 567 | def get_preinst_content_path(data): |
||
| 568 | """Path to the pre-installation content file""" |
||
| 569 | if data.content_type == "scap-security-guide": |
||
| 570 | # SSG is not copied to the standard place |
||
| 571 | return data.content_path |
||
| 572 | |||
| 573 | if data.content_type == "datastream": |
||
| 574 | return utils.join_paths( |
||
| 575 | INSTALLATION_CONTENT_DIR, |
||
| 576 | get_content_name(data) |
||
| 577 | ) |
||
| 578 | |||
| 579 | return utils.join_paths( |
||
| 580 | INSTALLATION_CONTENT_DIR, |
||
| 581 | data.content_path |
||
| 582 | ) |
||
| 583 | |||
| 584 | |||
| 585 | def get_postinst_content_path(data): |
||
| 586 | """Path to the post-installation content file""" |
||
| 587 | if data.content_type == "datastream": |
||
| 588 | return utils.join_paths( |
||
| 589 | TARGET_CONTENT_DIR, |
||
| 590 | get_content_name(data) |
||
| 591 | ) |
||
| 592 | |||
| 593 | if data.content_type in ("rpm", "scap-security-guide"): |
||
| 594 | # no path magic in case of RPM (SSG is installed as an RPM) |
||
| 595 | return data.content_path |
||
| 596 | |||
| 597 | return utils.join_paths( |
||
| 598 | TARGET_CONTENT_DIR, |
||
| 599 | data.content_path |
||
| 600 | ) |
||
| 601 | |||
| 602 | |||
| 603 | def get_preinst_tailoring_path(data): |
||
| 604 | """Path to the pre-installation tailoring file (if any)""" |
||
| 605 | if not data.tailoring_path: |
||
| 606 | return "" |
||
| 607 | |||
| 608 | return utils.join_paths( |
||
| 609 | INSTALLATION_CONTENT_DIR, |
||
| 610 | data.tailoring_path |
||
| 611 | ) |
||
| 612 | |||
| 613 | |||
| 614 | def get_postinst_tailoring_path(data): |
||
| 615 | """Path to the post-installation tailoring file (if any)""" |
||
| 616 | if not data.tailoring_path: |
||
| 617 | return "" |
||
| 618 | |||
| 619 | if data.content_type == "rpm": |
||
| 620 | # no path magic in case of RPM |
||
| 621 | return data.tailoring_path |
||
| 622 | |||
| 623 | return utils.join_paths( |
||
| 624 | TARGET_CONTENT_DIR, |
||
| 625 | data.tailoring_path |
||
| 626 | ) |
||
| 627 | |||
| 628 | |||
| 629 | def get_payload_proxy(): |
||
| 630 | """Get the DBus proxy of the active payload. |
||
| 631 | |||
| 632 | :return: a DBus proxy |
||
| 633 | """ |
||
| 634 | payloads_proxy = PAYLOADS.get_proxy() |
||
| 635 | object_path = payloads_proxy.ActivePayload |
||
| 636 | |||
| 637 | if not object_path: |
||
| 638 | raise ValueError("Active payload is not set.") |
||
| 639 | |||
| 640 | return PAYLOADS.get_proxy(object_path) |
||
| 641 | |||
| 642 | |||
| 643 | def get_packages_data() -> PackagesConfigurationData: |
||
| 644 | """Get the DBus data with the packages configuration. |
||
| 645 | |||
| 646 | :return: a packages configuration |
||
| 647 | """ |
||
| 648 | payload_proxy = get_payload_proxy() |
||
| 649 | |||
| 650 | if payload_proxy.Type != PAYLOAD_TYPE_DNF: |
||
| 651 | return PackagesConfigurationData() |
||
| 652 | |||
| 653 | return PackagesConfigurationData.from_structure( |
||
| 654 | payload_proxy.Packages |
||
| 655 | ) |
||
| 656 | |||
| 657 | |||
| 658 | def set_packages_data(data: PackagesConfigurationData): |
||
| 659 | """Set the DBus data with the packages configuration. |
||
| 660 | |||
| 661 | :param data: a packages configuration |
||
| 662 | """ |
||
| 663 | payload_proxy = get_payload_proxy() |
||
| 664 | |||
| 665 | if payload_proxy.Type != PAYLOAD_TYPE_DNF: |
||
| 666 | log.debug("The payload doesn't support packages.") |
||
| 667 | return |
||
| 668 | |||
| 669 | return payload_proxy.SetPackages( |
||
| 670 | PackagesConfigurationData.to_structure(data) |
||
| 671 | ) |
||
| 672 |