| Total Complexity | 42 |
| Total Lines | 314 |
| Duplicated Lines | 27.39 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like org_fedora_oscap.service.installation often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # |
||
| 2 | # Copyright (C) 2020 Red Hat, Inc. |
||
| 3 | # |
||
| 4 | # This copyrighted material is made available to anyone wishing to use, |
||
| 5 | # modify, copy, or redistribute it subject to the terms and conditions of |
||
| 6 | # the GNU General Public License v.2, or (at your option) any later version. |
||
| 7 | # This program is distributed in the hope that it will be useful, but WITHOUT |
||
| 8 | # ANY WARRANTY expressed or implied, including the implied warranties of |
||
| 9 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General |
||
| 10 | # Public License for more details. You should have received a copy of the |
||
| 11 | # GNU General Public License along with this program; if not, write to the |
||
| 12 | # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
||
| 13 | # 02110-1301, USA. Any Red Hat trademarks that are incorporated in the |
||
| 14 | # source code or documentation are not subject to the GNU General Public |
||
| 15 | # License and may only be used or replicated with the express permission of |
||
| 16 | # Red Hat, Inc. |
||
| 17 | # |
||
| 18 | import logging |
||
| 19 | import os |
||
| 20 | import shutil |
||
| 21 | |||
| 22 | from pyanaconda.core import util |
||
| 23 | from pyanaconda.modules.common.task import Task |
||
| 24 | from pyanaconda.modules.common.errors.installation import NonCriticalInstallationError |
||
| 25 | |||
| 26 | from org_fedora_oscap import common, data_fetch, rule_handling, utils |
||
| 27 | from org_fedora_oscap.common import _, get_packages_data, set_packages_data |
||
| 28 | from org_fedora_oscap.content_handling import ContentCheckError |
||
| 29 | from org_fedora_oscap import content_discovery |
||
| 30 | |||
| 31 | log = logging.getLogger("anaconda") |
||
| 32 | |||
| 33 | |||
| 34 | # scanner is useful for the post remediation |
||
| 35 | REQUIRED_PACKAGES = ("openscap", "openscap-scanner") |
||
| 36 | |||
| 37 | |||
| 38 | def _handle_error(exception): |
||
| 39 | log.error("OSCAP Addon: Failed to fetch and initialize SCAP content!") |
||
| 40 | |||
| 41 | if isinstance(exception, ContentCheckError): |
||
| 42 | msg = _("The integrity check of the security content failed.") |
||
| 43 | terminate(msg) |
||
| 44 | elif ( |
||
| 45 | isinstance(exception, common.OSCAPaddonError) |
||
| 46 | or isinstance(exception, data_fetch.DataFetchError)): |
||
| 47 | msg = _("There was an error fetching and loading the security content:\n" + |
||
| 48 | f"{str(exception)}") |
||
| 49 | terminate(msg) |
||
| 50 | |||
| 51 | else: |
||
| 52 | msg = _("There was an unexpected problem with the supplied content.") |
||
| 53 | terminate(msg) |
||
| 54 | |||
| 55 | |||
| 56 | def terminate(message): |
||
| 57 | message += "\n" + _("The installation should be aborted.") |
||
| 58 | raise NonCriticalInstallationError(message) |
||
| 59 | |||
| 60 | |||
| 61 | class PrepareValidContent(Task): |
||
| 62 | """The installation task for fetching the content.""" |
||
| 63 | |||
| 64 | def __init__(self, policy_data, file_path, content_path): |
||
| 65 | """Create a task.""" |
||
| 66 | super().__init__() |
||
| 67 | self._policy_data = policy_data |
||
| 68 | self._file_path = file_path |
||
| 69 | self._content_path = content_path |
||
| 70 | self.content_bringer = content_discovery.ContentBringer(policy_data) |
||
| 71 | |||
| 72 | @property |
||
| 73 | def name(self): |
||
| 74 | return "Fetch the content, and optionally perform check or archive extraction" |
||
| 75 | |||
| 76 | def run(self): |
||
| 77 | """Run the task.""" |
||
| 78 | # Is the content available? |
||
| 79 | fetching_thread_name = None |
||
| 80 | if not os.path.exists(self._content_path): |
||
| 81 | # content not available/fetched yet |
||
| 82 | fetching_thread_name = self.content_bringer.fetch_content( |
||
| 83 | _handle_error, self._policy_data.certificates) |
||
| 84 | |||
| 85 | content_dest = None |
||
| 86 | if self._policy_data.content_type != "scap-security-guide": |
||
| 87 | content_dest = self._file_path |
||
| 88 | |||
| 89 | content = self.content_bringer.finish_content_fetch( |
||
| 90 | fetching_thread_name, self._policy_data.fingerprint, |
||
| 91 | lambda msg: log.info("OSCAP Addon: " + msg), content_dest, _handle_error) |
||
| 92 | |||
| 93 | if not content: |
||
| 94 | # this shouldn't happen because error handling is supposed to |
||
| 95 | # terminate the addon before finish_content_fetch returns |
||
| 96 | _handle_error(Exception()) |
||
| 97 | |||
| 98 | remote_content_was_present = ( |
||
| 99 | not fetching_thread_name |
||
| 100 | and self._policy_data.content_type != "scap-security-guide") |
||
| 101 | if remote_content_was_present: |
||
| 102 | content.add_file(self._content_path) |
||
| 103 | |||
| 104 | try: |
||
| 105 | # just check that preferred content exists |
||
| 106 | _ = self.content_bringer.get_preferred_content(content) |
||
| 107 | except Exception as exc: |
||
| 108 | terminate(str(exc)) |
||
| 109 | |||
| 110 | |||
| 111 | class EvaluateRulesTask(Task): |
||
| 112 | """The installation task for the evaluation of the rules.""" |
||
| 113 | |||
| 114 | def __init__(self, policy_data, content_path, tailoring_path): |
||
| 115 | """Create a task.""" |
||
| 116 | super().__init__() |
||
| 117 | self._policy_data = policy_data |
||
| 118 | self._content_path = content_path |
||
| 119 | self._tailoring_path = tailoring_path |
||
| 120 | |||
| 121 | @property |
||
| 122 | def name(self): |
||
| 123 | return "Evaluate the rules" |
||
| 124 | |||
| 125 | def run(self): |
||
| 126 | """Run the task.""" |
||
| 127 | rule_data = self._initialize_rules() |
||
| 128 | self._evaluate_rules(rule_data) |
||
| 129 | |||
| 130 | def _initialize_rules(self): |
||
| 131 | try: |
||
| 132 | rule_data = rule_handling.get_rule_data_from_content( |
||
| 133 | self._policy_data.profile_id, self._content_path, |
||
| 134 | self._policy_data.datastream_id, self._policy_data.xccdf_id, |
||
| 135 | self._tailoring_path) |
||
| 136 | return rule_data |
||
| 137 | |||
| 138 | except common.OSCAPaddonError as e: |
||
| 139 | _handle_error(e) |
||
| 140 | |||
| 141 | |||
| 142 | def _evaluate_rules(self, rule_data): |
||
| 143 | # evaluate rules, do automatic fixes and stop if something that cannot |
||
| 144 | # be fixed automatically is wrong |
||
| 145 | all_messages = rule_data.eval_rules(None, None) |
||
| 146 | fatal_messages = [message for message in all_messages |
||
| 147 | if message.type == common.MESSAGE_TYPE_FATAL] |
||
| 148 | if any(fatal_messages): |
||
| 149 | msg_lines = [_("Wrong configuration detected!")] |
||
| 150 | msg_lines.extend([m.text for m in fatal_messages]) |
||
| 151 | terminate("\n".join(msg_lines)) |
||
| 152 | return |
||
| 153 | |||
| 154 | # add packages needed on the target system to the list of packages |
||
| 155 | # that are requested to be installed |
||
| 156 | packages_data = get_packages_data() |
||
| 157 | pkgs_to_install = list(REQUIRED_PACKAGES) |
||
| 158 | |||
| 159 | if self._policy_data.content_type == "scap-security-guide": |
||
| 160 | pkgs_to_install.append("scap-security-guide") |
||
| 161 | |||
| 162 | for pkg in pkgs_to_install: |
||
| 163 | if pkg not in packages_data.packages: |
||
| 164 | packages_data.packages.append(pkg) |
||
| 165 | |||
| 166 | set_packages_data(packages_data) |
||
| 167 | |||
| 168 | |||
| 169 | class InstallContentTask(Task): |
||
| 170 | """The installation task for installation of the content.""" |
||
| 171 | |||
| 172 | def __init__(self, sysroot, policy_data, file_path, |
||
| 173 | content_path, tailoring_path, target_directory): |
||
| 174 | """Create a task.""" |
||
| 175 | super().__init__() |
||
| 176 | self._sysroot = sysroot |
||
| 177 | self._policy_data = policy_data |
||
| 178 | self._file_path = file_path |
||
| 179 | self._content_path = content_path |
||
| 180 | self._tailoring_path = tailoring_path |
||
| 181 | self._target_directory = target_directory |
||
| 182 | |||
| 183 | @property |
||
| 184 | def name(self): |
||
| 185 | return "Install the content" |
||
| 186 | |||
| 187 | def run(self): |
||
| 188 | """Run the task.""" |
||
| 189 | target_content_dir = utils.join_paths( |
||
| 190 | self._sysroot, |
||
| 191 | self._target_directory |
||
| 192 | ) |
||
| 193 | |||
| 194 | utils.ensure_dir_exists(target_content_dir) |
||
| 195 | |||
| 196 | if self._policy_data.content_type == "scap-security-guide": |
||
| 197 | pass # nothing needed |
||
| 198 | elif self._policy_data.content_type == "datastream": |
||
| 199 | shutil.copy2(self._content_path, target_content_dir) |
||
| 200 | elif self._policy_data.content_type == "rpm": |
||
| 201 | # copy the RPM to the target system |
||
| 202 | shutil.copy2(self._file_path, target_content_dir) |
||
| 203 | |||
| 204 | # get the path of the RPM |
||
| 205 | content_name = common.get_content_name(self._policy_data) |
||
| 206 | package_path = utils.join_paths(self._target_directory, content_name) |
||
| 207 | |||
| 208 | # and install it with yum |
||
| 209 | ret = util.execWithRedirect( |
||
| 210 | "yum", ["-y", "--nogpg", "install", package_path], |
||
| 211 | root=self._sysroot |
||
| 212 | ) |
||
| 213 | |||
| 214 | if ret != 0: |
||
| 215 | msg = _(f"Failed to install content RPM to the target system.") |
||
| 216 | terminate(msg) |
||
| 217 | return |
||
| 218 | else: |
||
| 219 | pattern = utils.join_paths(common.INSTALLATION_CONTENT_DIR, "*") |
||
| 220 | utils.universal_copy(pattern, target_content_dir) |
||
| 221 | |||
| 222 | if os.path.exists(self._tailoring_path): |
||
| 223 | shutil.copy2(self._tailoring_path, target_content_dir) |
||
| 224 | |||
| 225 | |||
| 226 | View Code Duplication | class RemediateSystemTask(Task): |
|
|
|
|||
| 227 | """The installation task for running the remediation.""" |
||
| 228 | |||
| 229 | def __init__(self, sysroot, policy_data, target_content_path, |
||
| 230 | target_tailoring_path): |
||
| 231 | """Create a task.""" |
||
| 232 | super().__init__() |
||
| 233 | self._sysroot = sysroot |
||
| 234 | self._policy_data = policy_data |
||
| 235 | self._target_content_path = target_content_path |
||
| 236 | self._target_tailoring_path = target_tailoring_path |
||
| 237 | |||
| 238 | @property |
||
| 239 | def name(self): |
||
| 240 | return "Remediate the system" |
||
| 241 | |||
| 242 | def run(self): |
||
| 243 | """Run the task.""" |
||
| 244 | try: |
||
| 245 | common.assert_scanner_works( |
||
| 246 | chroot=self._sysroot, executable="oscap") |
||
| 247 | except Exception as exc: |
||
| 248 | msg_lines = [_( |
||
| 249 | "The 'oscap' scanner doesn't work in the installed system: {error}" |
||
| 250 | .format(error=str(exc)))] |
||
| 251 | msg_lines.append(_("As a result, the installed system can't be hardened.")) |
||
| 252 | terminate("\n".join(msg_lines)) |
||
| 253 | return |
||
| 254 | |||
| 255 | try: |
||
| 256 | common.run_oscap_remediate( |
||
| 257 | self._policy_data.profile_id, |
||
| 258 | self._target_content_path, |
||
| 259 | self._policy_data.datastream_id, |
||
| 260 | self._policy_data.xccdf_id, |
||
| 261 | self._target_tailoring_path, |
||
| 262 | chroot=self._sysroot |
||
| 263 | ) |
||
| 264 | except Exception as exc: |
||
| 265 | msg = _(f"Something went wrong during the final hardening: {str(exc)}.") |
||
| 266 | terminate(msg) |
||
| 267 | return |
||
| 268 | |||
| 269 | |||
| 270 | View Code Duplication | class ScheduleFirstbootRemediationTask(Task): |
|
| 271 | """The installation task for running the remediation.""" |
||
| 272 | |||
| 273 | def __init__(self, sysroot, policy_data, target_content_path, |
||
| 274 | target_tailoring_path): |
||
| 275 | """Create a task.""" |
||
| 276 | super().__init__() |
||
| 277 | self._sysroot = sysroot |
||
| 278 | self._policy_data = policy_data |
||
| 279 | self._target_content_path = target_content_path |
||
| 280 | self._target_tailoring_path = target_tailoring_path |
||
| 281 | |||
| 282 | @property |
||
| 283 | def name(self): |
||
| 284 | return "Schedule first-boot remediation" |
||
| 285 | |||
| 286 | def run(self): |
||
| 287 | """Run the task.""" |
||
| 288 | try: |
||
| 289 | common.assert_scanner_works( |
||
| 290 | chroot=self._sysroot, executable="oscap") |
||
| 291 | except Exception as exc: |
||
| 292 | msg_lines = [_( |
||
| 293 | "The 'oscap' scanner doesn't work in the installed system: {error}" |
||
| 294 | .format(error=str(exc)))] |
||
| 295 | msg_lines.append(_("As a result, the installed system can't be hardened.")) |
||
| 296 | terminate("\n".join(msg_lines)) |
||
| 297 | return |
||
| 298 | |||
| 299 | try: |
||
| 300 | common.schedule_firstboot_remediation( |
||
| 301 | self._sysroot, |
||
| 302 | self._policy_data.profile_id, |
||
| 303 | self._target_content_path, |
||
| 304 | self._policy_data.datastream_id, |
||
| 305 | self._policy_data.xccdf_id, |
||
| 306 | self._target_tailoring_path, |
||
| 307 | ) |
||
| 308 | except Exception as exc: |
||
| 309 | msg = _( |
||
| 310 | "Something went wrong when scheduling the first-boot remediation: {exc}." |
||
| 311 | .format(exc=str(exc))) |
||
| 312 | terminate(msg) |
||
| 313 | return |
||
| 314 |