| Total Complexity | 367 |
| Total Lines | 1824 |
| Duplicated Lines | 1.32 % |
| Coverage | 33.33% |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like ssg.build_yaml often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | 2 | from __future__ import absolute_import |
|
| 2 | 2 | from __future__ import print_function |
|
| 3 | |||
| 4 | 2 | from collections import defaultdict |
|
| 5 | 2 | from copy import deepcopy |
|
| 6 | 2 | import datetime |
|
| 7 | 2 | import json |
|
| 8 | 2 | import os |
|
| 9 | 2 | import os.path |
|
| 10 | 2 | import re |
|
| 11 | 2 | import sys |
|
| 12 | 2 | from xml.sax.saxutils import escape |
|
| 13 | 2 | import glob |
|
| 14 | |||
| 15 | 2 | import yaml |
|
| 16 | |||
| 17 | 2 | from .build_cpe import CPEDoesNotExist |
|
| 18 | 2 | from .constants import XCCDF_REFINABLE_PROPERTIES, SCE_SYSTEM |
|
| 19 | 2 | from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir |
|
| 20 | 2 | from .rule_yaml import parse_prodtype |
|
| 21 | |||
| 22 | 2 | from .cce import is_cce_format_valid, is_cce_value_valid |
|
| 23 | 2 | from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand |
|
| 24 | 2 | from .utils import required_key, mkdir_p |
|
| 25 | |||
| 26 | 2 | from .xml import ElementTree as ET |
|
| 27 | 2 | from .shims import unicode_func |
|
| 28 | |||
| 29 | |||
| 30 | 2 | def add_sub_element(parent, tag, data): |
|
| 31 | """ |
||
| 32 | Creates a new child element under parent with tag tag, and sets |
||
| 33 | data as the content under the tag. In particular, data is a string |
||
| 34 | to be parsed as an XML tree, allowing sub-elements of children to be |
||
| 35 | added. |
||
| 36 | |||
| 37 | If data should not be parsed as an XML tree, either escape the contents |
||
| 38 | before passing into this function, or use ElementTree.SubElement(). |
||
| 39 | |||
| 40 | Returns the newly created subelement of type tag. |
||
| 41 | """ |
||
| 42 | # This is used because our YAML data contain XML and XHTML elements |
||
| 43 | # ET.SubElement() escapes the < > characters by < and > |
||
| 44 | # and therefore it does not add child elements |
||
| 45 | # we need to do a hack instead |
||
| 46 | # TODO: Remove this function after we move to Markdown everywhere in SSG |
||
| 47 | ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data) |
||
| 48 | |||
| 49 | try: |
||
| 50 | element = ET.fromstring(ustr.encode("utf-8")) |
||
| 51 | except Exception: |
||
| 52 | msg = ("Error adding subelement to an element '{0}' from string: '{1}'" |
||
| 53 | .format(parent.tag, ustr)) |
||
| 54 | raise RuntimeError(msg) |
||
| 55 | |||
| 56 | parent.append(element) |
||
| 57 | return element |
||
| 58 | |||
| 59 | |||
| 60 | 2 | def reorder_according_to_ordering(unordered, ordering, regex=None): |
|
| 61 | 2 | ordered = [] |
|
| 62 | 2 | if regex is None: |
|
| 63 | 2 | regex = "|".join(["({0})".format(item) for item in ordering]) |
|
| 64 | 2 | regex = re.compile(regex) |
|
| 65 | |||
| 66 | 2 | items_to_order = list(filter(regex.match, unordered)) |
|
| 67 | 2 | unordered = set(unordered) |
|
| 68 | |||
| 69 | 2 | for priority_type in ordering: |
|
| 70 | 2 | for item in items_to_order: |
|
| 71 | 2 | if priority_type in item and item in unordered: |
|
| 72 | 2 | ordered.append(item) |
|
| 73 | 2 | unordered.remove(item) |
|
| 74 | 2 | ordered.extend(sorted(unordered)) |
|
| 75 | 2 | return ordered |
|
| 76 | |||
| 77 | |||
| 78 | 2 | def add_warning_elements(element, warnings): |
|
| 79 | # The use of [{dict}, {dict}] in warnings is to handle the following |
||
| 80 | # scenario where multiple warnings have the same category which is |
||
| 81 | # valid in SCAP and our content: |
||
| 82 | # |
||
| 83 | # warnings: |
||
| 84 | # - general: Some general warning |
||
| 85 | # - general: Some other general warning |
||
| 86 | # - general: |- |
||
| 87 | # Some really long multiline general warning |
||
| 88 | # |
||
| 89 | # Each of the {dict} should have only one key/value pair. |
||
| 90 | for warning_dict in warnings: |
||
| 91 | warning = add_sub_element(element, "warning", list(warning_dict.values())[0]) |
||
| 92 | warning.set("category", list(warning_dict.keys())[0]) |
||
| 93 | |||
| 94 | |||
| 95 | 2 | def add_nondata_subelements(element, subelement, attribute, attr_data): |
|
| 96 | """Add multiple iterations of a sublement that contains an attribute but no data |
||
| 97 | For example, <requires id="my_required_id"/>""" |
||
| 98 | for data in attr_data: |
||
| 99 | req = ET.SubElement(element, subelement) |
||
| 100 | req.set(attribute, data) |
||
| 101 | |||
| 102 | |||
| 103 | 2 | def check_warnings(xccdf_structure): |
|
| 104 | 2 | for warning_list in xccdf_structure.warnings: |
|
| 105 | if len(warning_list) != 1: |
||
| 106 | msg = "Only one key/value pair should exist for each warnings dictionary" |
||
| 107 | raise ValueError(msg) |
||
| 108 | |||
| 109 | |||
| 110 | 2 | class SelectionHandler(object): |
|
| 111 | 2 | def __init__(self): |
|
| 112 | 2 | self.refine_rules = defaultdict(list) |
|
| 113 | 2 | self.variables = dict() |
|
| 114 | 2 | self.unselected = [] |
|
| 115 | 2 | self.selected = [] |
|
| 116 | |||
| 117 | 2 | @property |
|
| 118 | def selections(self): |
||
| 119 | selections = [] |
||
| 120 | for item in self.selected: |
||
| 121 | selections.append(str(item)) |
||
| 122 | for item in self.unselected: |
||
| 123 | selections.append("!"+str(item)) |
||
| 124 | for varname in self.variables.keys(): |
||
| 125 | selections.append(varname+"="+self.variables.get(varname)) |
||
| 126 | for rule, refinements in self.refine_rules.items(): |
||
| 127 | for prop, val in refinements: |
||
| 128 | selections.append("{rule}.{property}={value}" |
||
| 129 | .format(rule=rule, property=prop, value=val)) |
||
| 130 | return selections |
||
| 131 | |||
| 132 | 2 | @selections.setter |
|
| 133 | def selections(self, entries): |
||
| 134 | 2 | for item in entries: |
|
| 135 | 2 | self.apply_selection(item) |
|
| 136 | |||
| 137 | 2 | def apply_selection(self, item): |
|
| 138 | 2 | if "." in item: |
|
| 139 | rule, refinement = item.split(".", 1) |
||
| 140 | property_, value = refinement.split("=", 1) |
||
| 141 | if property_ not in XCCDF_REFINABLE_PROPERTIES: |
||
| 142 | msg = ("Property '{property_}' cannot be refined. " |
||
| 143 | "Rule properties that can be refined are {refinables}. " |
||
| 144 | "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'." |
||
| 145 | .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES, |
||
| 146 | rule_id=rule, value=value, profile=self.id_) |
||
| 147 | ) |
||
| 148 | raise ValueError(msg) |
||
| 149 | self.refine_rules[rule].append((property_, value)) |
||
| 150 | 2 | elif "=" in item: |
|
| 151 | 2 | varname, value = item.split("=", 1) |
|
| 152 | 2 | self.variables[varname] = value |
|
| 153 | 2 | elif item.startswith("!"): |
|
| 154 | self.unselected.append(item[1:]) |
||
| 155 | else: |
||
| 156 | 2 | self.selected.append(item) |
|
| 157 | |||
| 158 | 2 | def _subtract_refinements(self, extended_refinements): |
|
| 159 | """ |
||
| 160 | Given a dict of rule refinements from the extended profile, |
||
| 161 | "undo" every refinement prefixed with '!' in this profile. |
||
| 162 | """ |
||
| 163 | for rule, refinements in list(self.refine_rules.items()): |
||
| 164 | if rule.startswith("!"): |
||
| 165 | for prop, val in refinements: |
||
| 166 | extended_refinements[rule[1:]].remove((prop, val)) |
||
| 167 | del self.refine_rules[rule] |
||
| 168 | return extended_refinements |
||
| 169 | |||
| 170 | 2 | def update_with(self, rhs): |
|
| 171 | extended_selects = set(rhs.selected) |
||
| 172 | extra_selections = extended_selects.difference(set(self.selected)) |
||
| 173 | self.selected.extend(list(extra_selections)) |
||
| 174 | |||
| 175 | updated_variables = dict(rhs.variables) |
||
| 176 | updated_variables.update(self.variables) |
||
| 177 | self.variables = updated_variables |
||
| 178 | |||
| 179 | extended_refinements = deepcopy(rhs.refine_rules) |
||
| 180 | updated_refinements = self._subtract_refinements(extended_refinements) |
||
| 181 | updated_refinements.update(self.refine_rules) |
||
| 182 | self.refine_rules = updated_refinements |
||
| 183 | |||
| 184 | |||
| 185 | 2 | class XCCDFEntity(object): |
|
| 186 | """ |
||
| 187 | This class can load itself from a YAML with Jinja macros, |
||
| 188 | and it can also save itself to YAML. |
||
| 189 | |||
| 190 | It is supposed to work with the content in the project, |
||
| 191 | when entities are defined in the benchmark tree, |
||
| 192 | and they are compiled into flat YAMLs to the build directory. |
||
| 193 | """ |
||
| 194 | 2 | KEYS = dict( |
|
| 195 | id_=lambda: "", |
||
| 196 | definition_location=lambda: "", |
||
| 197 | ) |
||
| 198 | |||
| 199 | 2 | MANDATORY_KEYS = set() |
|
| 200 | |||
| 201 | 2 | GENERIC_FILENAME = "" |
|
| 202 | 2 | ID_LABEL = "id" |
|
| 203 | |||
| 204 | 2 | def __init__(self, id_): |
|
| 205 | 2 | super(XCCDFEntity, self).__init__() |
|
| 206 | 2 | self._assign_defaults() |
|
| 207 | 2 | self.id_ = id_ |
|
| 208 | |||
| 209 | 2 | def _assign_defaults(self): |
|
| 210 | 2 | for key, default in self.KEYS.items(): |
|
| 211 | 2 | default_val = default() |
|
| 212 | 2 | if isinstance(default_val, RuntimeError): |
|
| 213 | default_val = None |
||
| 214 | 2 | setattr(self, key, default_val) |
|
| 215 | |||
| 216 | 2 | @classmethod |
|
| 217 | def get_instance_from_full_dict(cls, data): |
||
| 218 | """ |
||
| 219 | Given a defining dictionary, produce an instance |
||
| 220 | by treating all dict elements as attributes. |
||
| 221 | |||
| 222 | Extend this if you want tight control over the instance creation process. |
||
| 223 | """ |
||
| 224 | 2 | entity = cls(data["id_"]) |
|
| 225 | 2 | for key, value in data.items(): |
|
| 226 | 2 | setattr(entity, key, value) |
|
| 227 | 2 | return entity |
|
| 228 | |||
| 229 | 2 | @classmethod |
|
| 230 | def process_input_dict(cls, input_contents, env_yaml): |
||
| 231 | """ |
||
| 232 | Take the contents of the definition as a dictionary, and |
||
| 233 | add defaults or raise errors if a required member is not present. |
||
| 234 | |||
| 235 | Extend this if you want to add, remove or alter the result |
||
| 236 | that will constitute the new instance. |
||
| 237 | """ |
||
| 238 | 2 | data = dict() |
|
| 239 | |||
| 240 | 2 | for key, default in cls.KEYS.items(): |
|
| 241 | 2 | if key in input_contents: |
|
| 242 | 2 | data[key] = input_contents[key] |
|
| 243 | 2 | del input_contents[key] |
|
| 244 | 2 | continue |
|
| 245 | |||
| 246 | 2 | if key not in cls.MANDATORY_KEYS: |
|
| 247 | 2 | data[key] = cls.KEYS[key]() |
|
| 248 | else: |
||
| 249 | msg = ( |
||
| 250 | "Key '{key}' is mandatory for definition of '{class_name}'." |
||
| 251 | .format(key=key, class_name=cls.__name__)) |
||
| 252 | raise ValueError(msg) |
||
| 253 | |||
| 254 | 2 | return data |
|
| 255 | |||
| 256 | 2 | @classmethod |
|
| 257 | 2 | def parse_yaml_into_processed_dict(cls, yaml_file, env_yaml=None): |
|
| 258 | """ |
||
| 259 | Given yaml filename and environment info, produce a dictionary |
||
| 260 | that defines the instance to be created. |
||
| 261 | This wraps :meth:`process_input_dict` and it adds generic keys on the top: |
||
| 262 | |||
| 263 | - `id_` as the entity ID that is deduced either from thefilename, |
||
| 264 | or from the parent directory name. |
||
| 265 | - `definition_location` as the original location whenre the entity got defined. |
||
| 266 | """ |
||
| 267 | 2 | file_basename = os.path.basename(yaml_file) |
|
| 268 | 2 | entity_id = file_basename.split(".")[0] |
|
| 269 | 2 | if file_basename == cls.GENERIC_FILENAME: |
|
| 270 | 2 | entity_id = os.path.basename(os.path.dirname(yaml_file)) |
|
| 271 | |||
| 272 | 2 | if env_yaml: |
|
| 273 | 2 | env_yaml[cls.ID_LABEL] = entity_id |
|
| 274 | 2 | yaml_data = open_and_macro_expand(yaml_file, env_yaml) |
|
| 275 | |||
| 276 | 2 | try: |
|
| 277 | 2 | processed_data = cls.process_input_dict(yaml_data, env_yaml) |
|
| 278 | except ValueError as exc: |
||
| 279 | msg = ( |
||
| 280 | "Error processing {yaml_file}: {exc}" |
||
| 281 | .format(yaml_file=yaml_file, exc=str(exc))) |
||
| 282 | raise ValueError(msg) |
||
| 283 | |||
| 284 | 2 | if yaml_data: |
|
| 285 | msg = ( |
||
| 286 | "Unparsed YAML data in '{yaml_file}': {keys}" |
||
| 287 | .format(yaml_file=yaml_file, keys=list(yaml_data.keys()))) |
||
| 288 | raise RuntimeError(msg) |
||
| 289 | |||
| 290 | 2 | if not processed_data.get("definition_location", ""): |
|
| 291 | 2 | processed_data["definition_location"] = yaml_file |
|
| 292 | |||
| 293 | 2 | processed_data["id_"] = entity_id |
|
| 294 | |||
| 295 | 2 | return processed_data |
|
| 296 | |||
| 297 | 2 | @classmethod |
|
| 298 | 2 | def from_yaml(cls, yaml_file, env_yaml=None): |
|
| 299 | 2 | yaml_file = os.path.normpath(yaml_file) |
|
| 300 | |||
| 301 | 2 | local_env_yaml = None |
|
| 302 | 2 | if env_yaml: |
|
| 303 | 2 | local_env_yaml = dict() |
|
| 304 | 2 | local_env_yaml.update(env_yaml) |
|
| 305 | |||
| 306 | 2 | data_dict = cls.parse_yaml_into_processed_dict(yaml_file, local_env_yaml) |
|
| 307 | |||
| 308 | 2 | result = cls.get_instance_from_full_dict(data_dict) |
|
| 309 | |||
| 310 | 2 | return result |
|
| 311 | |||
| 312 | 2 | def represent_as_dict(self): |
|
| 313 | """ |
||
| 314 | Produce a dict representation of the class. |
||
| 315 | |||
| 316 | Extend this method if you need the representation to be different from the object. |
||
| 317 | """ |
||
| 318 | 2 | data = dict() |
|
| 319 | 2 | for key in self.KEYS: |
|
| 320 | 2 | value = getattr(self, key) |
|
| 321 | 2 | if value or True: |
|
| 322 | 2 | data[key] = getattr(self, key) |
|
| 323 | 2 | del data["id_"] |
|
| 324 | 2 | return data |
|
| 325 | |||
| 326 | 2 | def dump_yaml(self, file_name, documentation_complete=True): |
|
| 327 | to_dump = self.represent_as_dict() |
||
| 328 | to_dump["documentation_complete"] = documentation_complete |
||
| 329 | with open(file_name, "w+") as f: |
||
| 330 | yaml.dump(to_dump, f, indent=4, sort_keys=False) |
||
| 331 | |||
| 332 | 2 | def to_xml_element(self): |
|
| 333 | raise NotImplementedError() |
||
| 334 | |||
| 335 | |||
| 336 | 2 | class Profile(XCCDFEntity, SelectionHandler): |
|
| 337 | """Represents XCCDF profile |
||
| 338 | """ |
||
| 339 | 2 | KEYS = { |
|
| 340 | "title": lambda: "", |
||
| 341 | "description": lambda: "", |
||
| 342 | "extends": lambda: "", |
||
| 343 | "metadata": lambda: None, |
||
| 344 | "reference": lambda: None, |
||
| 345 | "selections": lambda: list(), |
||
| 346 | "platforms": lambda: set(), |
||
| 347 | "cpe_names": lambda: set(), |
||
| 348 | "platform": lambda: None, |
||
| 349 | "filter_rules": lambda: "", |
||
| 350 | ** XCCDFEntity.KEYS |
||
| 351 | } |
||
| 352 | |||
| 353 | 2 | MANDATORY_KEYS = { |
|
| 354 | "title", |
||
| 355 | "description", |
||
| 356 | "selections", |
||
| 357 | } |
||
| 358 | |||
| 359 | 2 | @classmethod |
|
| 360 | def process_input_dict(cls, input_contents, env_yaml): |
||
| 361 | 2 | input_contents = super(Profile, cls).process_input_dict(input_contents, env_yaml) |
|
| 362 | |||
| 363 | 2 | platform = input_contents.get("platform") |
|
| 364 | 2 | if platform is not None: |
|
| 365 | input_contents["platforms"].add(platform) |
||
| 366 | |||
| 367 | 2 | if env_yaml: |
|
| 368 | 2 | for platform in input_contents["platforms"]: |
|
| 369 | try: |
||
| 370 | new_cpe_name = env_yaml["product_cpes"].get_cpe_name(platform) |
||
| 371 | input_contents["cpe_names"].add(new_cpe_name) |
||
| 372 | except CPEDoesNotExist: |
||
| 373 | msg = ( |
||
| 374 | "Unsupported platform '{platform}' in a profile." |
||
| 375 | .format(platform=platform)) |
||
| 376 | raise CPEDoesNotExist(msg) |
||
| 377 | |||
| 378 | 2 | return input_contents |
|
| 379 | |||
| 380 | 2 | @property |
|
| 381 | def rule_filter(self): |
||
| 382 | if self.filter_rules: |
||
| 383 | return rule_filter_from_def(self.filter_rules) |
||
| 384 | else: |
||
| 385 | return noop_rule_filterfunc |
||
| 386 | |||
| 387 | 2 | def to_xml_element(self): |
|
| 388 | element = ET.Element('Profile') |
||
| 389 | element.set("id", self.id_) |
||
| 390 | if self.extends: |
||
| 391 | element.set("extends", self.extends) |
||
| 392 | title = add_sub_element(element, "title", self.title) |
||
| 393 | title.set("override", "true") |
||
| 394 | desc = add_sub_element(element, "description", self.description) |
||
| 395 | desc.set("override", "true") |
||
| 396 | |||
| 397 | if self.reference: |
||
| 398 | add_sub_element(element, "reference", escape(self.reference)) |
||
| 399 | |||
| 400 | for cpe_name in self.cpe_names: |
||
| 401 | plat = ET.SubElement(element, "platform") |
||
| 402 | plat.set("idref", cpe_name) |
||
| 403 | |||
| 404 | for selection in self.selected: |
||
| 405 | select = ET.Element("select") |
||
| 406 | select.set("idref", selection) |
||
| 407 | select.set("selected", "true") |
||
| 408 | element.append(select) |
||
| 409 | |||
| 410 | for selection in self.unselected: |
||
| 411 | unselect = ET.Element("select") |
||
| 412 | unselect.set("idref", selection) |
||
| 413 | unselect.set("selected", "false") |
||
| 414 | element.append(unselect) |
||
| 415 | |||
| 416 | for value_id, selector in self.variables.items(): |
||
| 417 | refine_value = ET.Element("refine-value") |
||
| 418 | refine_value.set("idref", value_id) |
||
| 419 | refine_value.set("selector", selector) |
||
| 420 | element.append(refine_value) |
||
| 421 | |||
| 422 | for refined_rule, refinement_list in self.refine_rules.items(): |
||
| 423 | refine_rule = ET.Element("refine-rule") |
||
| 424 | refine_rule.set("idref", refined_rule) |
||
| 425 | for refinement in refinement_list: |
||
| 426 | refine_rule.set(refinement[0], refinement[1]) |
||
| 427 | element.append(refine_rule) |
||
| 428 | |||
| 429 | return element |
||
| 430 | |||
| 431 | 2 | def get_rule_selectors(self): |
|
| 432 | 2 | return self.selected + self.unselected |
|
| 433 | |||
| 434 | 2 | def get_variable_selectors(self): |
|
| 435 | 2 | return self.variables |
|
| 436 | |||
| 437 | 2 | def validate_refine_rules(self, rules): |
|
| 438 | existing_rule_ids = [r.id_ for r in rules] |
||
| 439 | for refine_rule, refinement_list in self.refine_rules.items(): |
||
| 440 | # Take first refinement to ilustrate where the error is |
||
| 441 | # all refinements in list are invalid, so it doesn't really matter |
||
| 442 | a_refinement = refinement_list[0] |
||
| 443 | |||
| 444 | if refine_rule not in existing_rule_ids: |
||
| 445 | msg = ( |
||
| 446 | "You are trying to refine a rule that doesn't exist. " |
||
| 447 | "Rule '{rule_id}' was not found in the benchmark. " |
||
| 448 | "Please check all rule refinements for rule: '{rule_id}', for example: " |
||
| 449 | "- {rule_id}.{property_}={value}' in profile {profile_id}." |
||
| 450 | .format(rule_id=refine_rule, profile_id=self.id_, |
||
| 451 | property_=a_refinement[0], value=a_refinement[1]) |
||
| 452 | ) |
||
| 453 | raise ValueError(msg) |
||
| 454 | |||
| 455 | if refine_rule not in self.get_rule_selectors(): |
||
| 456 | msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining " |
||
| 457 | "a rule that is not selected by it. The refinement will not have any " |
||
| 458 | "noticeable effect. Either select the rule or remove the rule refinement." |
||
| 459 | .format(rule_id=refine_rule, property_=a_refinement[0], |
||
| 460 | value=a_refinement[1], profile_id=self.id_) |
||
| 461 | ) |
||
| 462 | raise ValueError(msg) |
||
| 463 | |||
| 464 | 2 | def validate_variables(self, variables): |
|
| 465 | variables_by_id = dict() |
||
| 466 | for var in variables: |
||
| 467 | variables_by_id[var.id_] = var |
||
| 468 | |||
| 469 | for var_id, our_val in self.variables.items(): |
||
| 470 | if var_id not in variables_by_id: |
||
| 471 | all_vars_list = [" - %s" % v for v in variables_by_id.keys()] |
||
| 472 | msg = ( |
||
| 473 | "Value '{var_id}' in profile '{profile_name}' is not known. " |
||
| 474 | "We know only variables:\n{var_names}" |
||
| 475 | .format( |
||
| 476 | var_id=var_id, profile_name=self.id_, |
||
| 477 | var_names="\n".join(sorted(all_vars_list))) |
||
| 478 | ) |
||
| 479 | raise ValueError(msg) |
||
| 480 | |||
| 481 | allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()] |
||
| 482 | if our_val not in allowed_selectors: |
||
| 483 | msg = ( |
||
| 484 | "Value '{var_id}' in profile '{profile_name}' " |
||
| 485 | "uses the selector '{our_val}'. " |
||
| 486 | "This is not possible, as only selectors {all_selectors} are available. " |
||
| 487 | "Either change the selector used in the profile, or " |
||
| 488 | "add the selector-value pair to the variable definition." |
||
| 489 | .format( |
||
| 490 | var_id=var_id, profile_name=self.id_, our_val=our_val, |
||
| 491 | all_selectors=allowed_selectors, |
||
| 492 | ) |
||
| 493 | ) |
||
| 494 | raise ValueError(msg) |
||
| 495 | |||
| 496 | 2 | def validate_rules(self, rules, groups): |
|
| 497 | existing_rule_ids = [r.id_ for r in rules] |
||
| 498 | rule_selectors = self.get_rule_selectors() |
||
| 499 | for id_ in rule_selectors: |
||
| 500 | if id_ in groups: |
||
| 501 | msg = ( |
||
| 502 | "You have selected a group '{group_id}' instead of a " |
||
| 503 | "rule. Groups have no effect in the profile and are not " |
||
| 504 | "allowed to be selected. Please remove '{group_id}' " |
||
| 505 | "from profile '{profile_id}' before proceeding." |
||
| 506 | .format(group_id=id_, profile_id=self.id_) |
||
| 507 | ) |
||
| 508 | raise ValueError(msg) |
||
| 509 | if id_ not in existing_rule_ids: |
||
| 510 | msg = ( |
||
| 511 | "Rule '{rule_id}' was not found in the benchmark. Please " |
||
| 512 | "remove rule '{rule_id}' from profile '{profile_id}' " |
||
| 513 | "before proceeding." |
||
| 514 | .format(rule_id=id_, profile_id=self.id_) |
||
| 515 | ) |
||
| 516 | raise ValueError(msg) |
||
| 517 | |||
| 518 | 2 | def __sub__(self, other): |
|
| 519 | profile = Profile(self.id_) |
||
| 520 | profile.title = self.title |
||
| 521 | profile.description = self.description |
||
| 522 | profile.extends = self.extends |
||
| 523 | profile.platforms = self.platforms |
||
| 524 | profile.platform = self.platform |
||
| 525 | profile.selected = list(set(self.selected) - set(other.selected)) |
||
| 526 | profile.selected.sort() |
||
| 527 | profile.unselected = list(set(self.unselected) - set(other.unselected)) |
||
| 528 | profile.variables = dict ((k, v) for (k, v) in self.variables.items() |
||
| 529 | if k not in other.variables or v != other.variables[k]) |
||
| 530 | return profile |
||
| 531 | |||
| 532 | |||
| 533 | 2 | class ResolvableProfile(Profile): |
|
| 534 | 2 | def __init__(self, * args, ** kwargs): |
|
| 535 | super(ResolvableProfile, self).__init__(* args, ** kwargs) |
||
| 536 | self.resolved = False |
||
| 537 | |||
| 538 | 2 | def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list): |
|
| 539 | items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list] |
||
| 540 | return items |
||
| 541 | |||
| 542 | 2 | def resolve_controls(self, controls_manager): |
|
| 543 | pass |
||
| 544 | |||
| 545 | 2 | def extend_by(self, extended_profile): |
|
| 546 | self.update_with(extended_profile) |
||
| 547 | |||
| 548 | 2 | def resolve_selections_with_rules(self, rules_by_id): |
|
| 549 | selections = set() |
||
| 550 | for rid in self.selected: |
||
| 551 | if rid not in rules_by_id: |
||
| 552 | continue |
||
| 553 | rule = rules_by_id[rid] |
||
| 554 | if not self.rule_filter(rule): |
||
| 555 | continue |
||
| 556 | selections.add(rid) |
||
| 557 | self.selected = list(selections) |
||
| 558 | |||
| 559 | 2 | def resolve(self, all_profiles, rules_by_id, controls_manager=None): |
|
| 560 | if self.resolved: |
||
| 561 | return |
||
| 562 | |||
| 563 | if controls_manager: |
||
| 564 | self.resolve_controls(controls_manager) |
||
| 565 | |||
| 566 | self.resolve_selections_with_rules(rules_by_id) |
||
| 567 | |||
| 568 | if self.extends: |
||
| 569 | if self.extends not in all_profiles: |
||
| 570 | msg = ( |
||
| 571 | "Profile {name} extends profile {extended}, but " |
||
| 572 | "only profiles {known_profiles} are available for resolution." |
||
| 573 | .format(name=self.id_, extended=self.extends, |
||
| 574 | known_profiles=list(all_profiles.keys()))) |
||
| 575 | raise RuntimeError(msg) |
||
| 576 | extended_profile = all_profiles[self.extends] |
||
| 577 | extended_profile.resolve(all_profiles, rules_by_id, controls_manager) |
||
| 578 | |||
| 579 | self.extend_by(extended_profile) |
||
| 580 | |||
| 581 | self.selected = [s for s in set(self.selected) if s not in self.unselected] |
||
| 582 | |||
| 583 | self.unselected = [] |
||
| 584 | self.extends = None |
||
| 585 | |||
| 586 | self.selected = sorted(self.selected) |
||
| 587 | |||
| 588 | for rid in self.selected: |
||
| 589 | if rid not in rules_by_id: |
||
| 590 | msg = ( |
||
| 591 | "Rule {rid} is selected by {profile}, but the rule is not available. " |
||
| 592 | "This may be caused by a discrepancy of prodtypes." |
||
| 593 | .format(rid=rid, profile=self.id_)) |
||
| 594 | raise ValueError(msg) |
||
| 595 | |||
| 596 | self.resolved = True |
||
| 597 | |||
| 598 | |||
| 599 | 2 | class ProfileWithInlinePolicies(ResolvableProfile): |
|
| 600 | 2 | def __init__(self, * args, ** kwargs): |
|
| 601 | super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs) |
||
| 602 | self.controls_by_policy = defaultdict(list) |
||
| 603 | |||
| 604 | 2 | def apply_selection(self, item): |
|
| 605 | # ":" is the delimiter for controls but not when the item is a variable |
||
| 606 | if ":" in item and "=" not in item: |
||
| 607 | policy_id, control_id = item.split(":", 1) |
||
| 608 | self.controls_by_policy[policy_id].append(control_id) |
||
| 609 | else: |
||
| 610 | super(ProfileWithInlinePolicies, self).apply_selection(item) |
||
| 611 | |||
| 612 | 2 | def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids): |
|
| 613 | controls = [] |
||
| 614 | for cid in controls_ids: |
||
| 615 | if not cid.startswith("all"): |
||
| 616 | controls.extend( |
||
| 617 | self._controls_ids_to_controls(controls_manager, policy_id, [cid])) |
||
| 618 | elif ":" in cid: |
||
| 619 | _, level_id = cid.split(":", 1) |
||
| 620 | controls.extend( |
||
| 621 | controls_manager.get_all_controls_of_level(policy_id, level_id)) |
||
| 622 | else: |
||
| 623 | controls.extend( |
||
| 624 | controls_manager.get_all_controls(policy_id)) |
||
| 625 | return controls |
||
| 626 | |||
| 627 | 2 | def resolve_controls(self, controls_manager): |
|
| 628 | for policy_id, controls_ids in self.controls_by_policy.items(): |
||
| 629 | controls = self._process_controls_ids_into_controls( |
||
| 630 | controls_manager, policy_id, controls_ids) |
||
| 631 | |||
| 632 | for c in controls: |
||
| 633 | self.update_with(c) |
||
| 634 | |||
| 635 | |||
| 636 | 2 | class Value(XCCDFEntity): |
|
| 637 | """Represents XCCDF Value |
||
| 638 | """ |
||
| 639 | 2 | KEYS = { |
|
| 640 | "title": lambda: "", |
||
| 641 | "description": lambda: "", |
||
| 642 | "type": lambda: "", |
||
| 643 | "operator": lambda: "equals", |
||
| 644 | "interactive": lambda: False, |
||
| 645 | "options": lambda: dict(), |
||
| 646 | "warnings": lambda: list(), |
||
| 647 | ** XCCDFEntity.KEYS |
||
| 648 | } |
||
| 649 | |||
| 650 | 2 | MANDATORY_KEYS = { |
|
| 651 | "title", |
||
| 652 | "description", |
||
| 653 | "type", |
||
| 654 | } |
||
| 655 | |||
| 656 | 2 | @classmethod |
|
| 657 | def process_input_dict(cls, input_contents, env_yaml): |
||
| 658 | 2 | input_contents["interactive"] = ( |
|
| 659 | input_contents.get("interactive", "false").lower() == "true") |
||
| 660 | |||
| 661 | 2 | data = super(Value, cls).process_input_dict(input_contents, env_yaml) |
|
| 662 | |||
| 663 | 2 | possible_operators = ["equals", "not equal", "greater than", |
|
| 664 | "less than", "greater than or equal", |
||
| 665 | "less than or equal", "pattern match"] |
||
| 666 | |||
| 667 | 2 | if data["operator"] not in possible_operators: |
|
| 668 | raise ValueError( |
||
| 669 | "Found an invalid operator value '%s'. " |
||
| 670 | "Expected one of: %s" |
||
| 671 | % (data["operator"], ", ".join(possible_operators)) |
||
| 672 | ) |
||
| 673 | |||
| 674 | 2 | return data |
|
| 675 | |||
| 676 | 2 | @classmethod |
|
| 677 | 2 | def from_yaml(cls, yaml_file, env_yaml=None): |
|
| 678 | 2 | value = super(Value, cls).from_yaml(yaml_file, env_yaml) |
|
| 679 | |||
| 680 | 2 | check_warnings(value) |
|
| 681 | |||
| 682 | 2 | return value |
|
| 683 | |||
| 684 | 2 | def to_xml_element(self): |
|
| 685 | value = ET.Element('Value') |
||
| 686 | value.set('id', self.id_) |
||
| 687 | value.set('type', self.type) |
||
| 688 | if self.operator != "equals": # equals is the default |
||
| 689 | value.set('operator', self.operator) |
||
| 690 | if self.interactive: # False is the default |
||
| 691 | value.set('interactive', 'true') |
||
| 692 | title = ET.SubElement(value, 'title') |
||
| 693 | title.text = self.title |
||
| 694 | add_sub_element(value, 'description', self.description) |
||
| 695 | add_warning_elements(value, self.warnings) |
||
| 696 | |||
| 697 | for selector, option in self.options.items(): |
||
| 698 | # do not confuse Value with big V with value with small v |
||
| 699 | # value is child element of Value |
||
| 700 | value_small = ET.SubElement(value, 'value') |
||
| 701 | # by XCCDF spec, default value is value without selector |
||
| 702 | if selector != "default": |
||
| 703 | value_small.set('selector', str(selector)) |
||
| 704 | value_small.text = str(option) |
||
| 705 | |||
| 706 | return value |
||
| 707 | |||
| 708 | 2 | def to_file(self, file_name): |
|
| 709 | root = self.to_xml_element() |
||
| 710 | tree = ET.ElementTree(root) |
||
| 711 | tree.write(file_name) |
||
| 712 | |||
| 713 | |||
| 714 | 2 | class Benchmark(XCCDFEntity): |
|
| 715 | """Represents XCCDF Benchmark |
||
| 716 | """ |
||
| 717 | 2 | KEYS = { |
|
| 718 | "title": lambda: "", |
||
| 719 | "status": lambda: "", |
||
| 720 | "description": lambda: "", |
||
| 721 | "notice_id": lambda: "", |
||
| 722 | "notice_description": lambda: "", |
||
| 723 | "front_matter": lambda: "", |
||
| 724 | "rear_matter": lambda: "", |
||
| 725 | "cpes": lambda: list(), |
||
| 726 | "version": lambda: "0", |
||
| 727 | "profiles": lambda: list(), |
||
| 728 | "values": lambda: dict(), |
||
| 729 | "groups": lambda: dict(), |
||
| 730 | "rules": lambda: dict(), |
||
| 731 | "product_cpe_names": lambda: list(), |
||
| 732 | ** XCCDFEntity.KEYS |
||
| 733 | } |
||
| 734 | |||
| 735 | 2 | MANDATORY_KEYS = { |
|
| 736 | "title", |
||
| 737 | "status", |
||
| 738 | "description", |
||
| 739 | "front_matter", |
||
| 740 | "rear_matter", |
||
| 741 | "version", |
||
| 742 | } |
||
| 743 | |||
| 744 | 2 | GENERIC_FILENAME = "benchmark.yml" |
|
| 745 | |||
| 746 | 2 | def add_value_needed_for_ocil_clauses(self): |
|
| 747 | conditional_clause = Value("conditional_clause") |
||
| 748 | conditional_clause.title = "A conditional clause for check statements." |
||
| 749 | conditional_clause.description = conditional_clause.title |
||
| 750 | conditional_clause.type = "string" |
||
| 751 | conditional_clause.options = {"": "This is a placeholder"} |
||
| 752 | |||
| 753 | self.add_value(conditional_clause) |
||
| 754 | |||
| 755 | 2 | View Code Duplication | def load_entities(self, rules_by_id, values_by_id, groups_by_id): |
|
|
|||
| 756 | for rid, val in self.rules.items(): |
||
| 757 | if not val: |
||
| 758 | self.rules[rid] = rules_by_id[rid] |
||
| 759 | |||
| 760 | for vid, val in self.values.items(): |
||
| 761 | if not val: |
||
| 762 | self.values[vid] = values_by_id[vid] |
||
| 763 | |||
| 764 | for gid, val in self.groups.items(): |
||
| 765 | if not val: |
||
| 766 | self.groups[gid] = groups_by_id[gid] |
||
| 767 | |||
| 768 | 2 | @classmethod |
|
| 769 | def process_input_dict(cls, input_contents, env_yaml): |
||
| 770 | input_contents["front_matter"] = input_contents["front-matter"] |
||
| 771 | del input_contents["front-matter"] |
||
| 772 | input_contents["rear_matter"] = input_contents["rear-matter"] |
||
| 773 | del input_contents["rear-matter"] |
||
| 774 | |||
| 775 | data = super(Benchmark, cls).process_input_dict(input_contents, env_yaml) |
||
| 776 | |||
| 777 | notice_contents = required_key(input_contents, "notice") |
||
| 778 | del input_contents["notice"] |
||
| 779 | |||
| 780 | data["notice_id"] = required_key(notice_contents, "id") |
||
| 781 | del notice_contents["id"] |
||
| 782 | |||
| 783 | data["notice_description"] = required_key(notice_contents, "description") |
||
| 784 | del notice_contents["description"] |
||
| 785 | |||
| 786 | data["version"] = str(data["version"]) |
||
| 787 | |||
| 788 | return data |
||
| 789 | |||
| 790 | 2 | def represent_as_dict(self): |
|
| 791 | data = super(Benchmark, cls).represent_as_dict() |
||
| 792 | data["rear-matter"] = data["rear_matter"] |
||
| 793 | del data["rear_matter"] |
||
| 794 | |||
| 795 | data["front-matter"] = data["front_matter"] |
||
| 796 | del data["front_matter"] |
||
| 797 | return data |
||
| 798 | |||
| 799 | 2 | @classmethod |
|
| 800 | 2 | def from_yaml(cls, yaml_file, env_yaml=None, benchmark_id="product-name"): |
|
| 801 | benchmark = super(Benchmark, cls).from_yaml(yaml_file, env_yaml) |
||
| 802 | if env_yaml: |
||
| 803 | benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names() |
||
| 804 | |||
| 805 | benchmark.id_ = benchmark_id |
||
| 806 | |||
| 807 | return benchmark |
||
| 808 | |||
| 809 | 2 | def add_profiles_from_dir(self, dir_, env_yaml): |
|
| 810 | for dir_item in sorted(os.listdir(dir_)): |
||
| 811 | dir_item_path = os.path.join(dir_, dir_item) |
||
| 812 | if not os.path.isfile(dir_item_path): |
||
| 813 | continue |
||
| 814 | |||
| 815 | _, ext = os.path.splitext(os.path.basename(dir_item_path)) |
||
| 816 | if ext != '.profile': |
||
| 817 | sys.stderr.write( |
||
| 818 | "Encountered file '%s' while looking for profiles, " |
||
| 819 | "extension '%s' is unknown. Skipping..\n" |
||
| 820 | % (dir_item, ext) |
||
| 821 | ) |
||
| 822 | continue |
||
| 823 | |||
| 824 | try: |
||
| 825 | new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml) |
||
| 826 | except DocumentationNotComplete: |
||
| 827 | continue |
||
| 828 | except Exception as exc: |
||
| 829 | msg = ("Error building profile from '{fname}': '{error}'" |
||
| 830 | .format(fname=dir_item_path, error=str(exc))) |
||
| 831 | raise RuntimeError(msg) |
||
| 832 | if new_profile is None: |
||
| 833 | continue |
||
| 834 | |||
| 835 | self.profiles.append(new_profile) |
||
| 836 | |||
| 837 | 2 | def to_xml_element(self): |
|
| 838 | root = ET.Element('Benchmark') |
||
| 839 | root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance') |
||
| 840 | root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml') |
||
| 841 | root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/') |
||
| 842 | root.set('id', 'product-name') |
||
| 843 | root.set('xsi:schemaLocation', |
||
| 844 | 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd') |
||
| 845 | root.set('style', 'SCAP_1.1') |
||
| 846 | root.set('resolved', 'false') |
||
| 847 | root.set('xml:lang', 'en-US') |
||
| 848 | status = ET.SubElement(root, 'status') |
||
| 849 | status.set('date', datetime.date.today().strftime("%Y-%m-%d")) |
||
| 850 | status.text = self.status |
||
| 851 | add_sub_element(root, "title", self.title) |
||
| 852 | add_sub_element(root, "description", self.description) |
||
| 853 | notice = add_sub_element(root, "notice", self.notice_description) |
||
| 854 | notice.set('id', self.notice_id) |
||
| 855 | add_sub_element(root, "front-matter", self.front_matter) |
||
| 856 | add_sub_element(root, "rear-matter", self.rear_matter) |
||
| 857 | |||
| 858 | # The Benchmark applicability is determined by the CPEs |
||
| 859 | # defined in the product.yml |
||
| 860 | for cpe_name in self.product_cpe_names: |
||
| 861 | plat = ET.SubElement(root, "platform") |
||
| 862 | plat.set("idref", cpe_name) |
||
| 863 | |||
| 864 | version = ET.SubElement(root, 'version') |
||
| 865 | version.text = self.version |
||
| 866 | ET.SubElement(root, "metadata") |
||
| 867 | |||
| 868 | for profile in self.profiles: |
||
| 869 | root.append(profile.to_xml_element()) |
||
| 870 | |||
| 871 | for value in self.values.values(): |
||
| 872 | root.append(value.to_xml_element()) |
||
| 873 | |||
| 874 | groups_in_bench = list(self.groups.keys()) |
||
| 875 | priority_order = ["system", "services"] |
||
| 876 | groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order) |
||
| 877 | |||
| 878 | # Make system group the first, followed by services group |
||
| 879 | for group_id in groups_in_bench: |
||
| 880 | group = self.groups.get(group_id) |
||
| 881 | # Products using application benchmark don't have system or services group |
||
| 882 | if group is not None: |
||
| 883 | root.append(group.to_xml_element()) |
||
| 884 | |||
| 885 | for rule in self.rules.values(): |
||
| 886 | root.append(rule.to_xml_element()) |
||
| 887 | |||
| 888 | return root |
||
| 889 | |||
| 890 | 2 | def to_file(self, file_name, ): |
|
| 891 | root = self.to_xml_element() |
||
| 892 | tree = ET.ElementTree(root) |
||
| 893 | tree.write(file_name) |
||
| 894 | |||
| 895 | 2 | def add_value(self, value): |
|
| 896 | if value is None: |
||
| 897 | return |
||
| 898 | self.values[value.id_] = value |
||
| 899 | |||
| 900 | # The benchmark is also considered a group, so this function signature needs to match |
||
| 901 | # Group()'s add_group() |
||
| 902 | 2 | def add_group(self, group, env_yaml=None): |
|
| 903 | if group is None: |
||
| 904 | return |
||
| 905 | self.groups[group.id_] = group |
||
| 906 | |||
| 907 | 2 | def add_rule(self, rule): |
|
| 908 | if rule is None: |
||
| 909 | return |
||
| 910 | self.rules[rule.id_] = rule |
||
| 911 | |||
| 912 | 2 | def to_xccdf(self): |
|
| 913 | """We can easily extend this script to generate a valid XCCDF instead |
||
| 914 | of SSG SHORTHAND. |
||
| 915 | """ |
||
| 916 | raise NotImplementedError |
||
| 917 | |||
| 918 | 2 | def __str__(self): |
|
| 919 | return self.id_ |
||
| 920 | |||
| 921 | |||
| 922 | 2 | class Group(XCCDFEntity): |
|
| 923 | """Represents XCCDF Group |
||
| 924 | """ |
||
| 925 | 2 | ATTRIBUTES_TO_PASS_ON = ( |
|
| 926 | "platforms", |
||
| 927 | ) |
||
| 928 | |||
| 929 | 2 | GENERIC_FILENAME = "group.yml" |
|
| 930 | |||
| 931 | 2 | KEYS = { |
|
| 932 | "prodtype": lambda: "all", |
||
| 933 | "title": lambda: "", |
||
| 934 | "description": lambda: "", |
||
| 935 | "warnings": lambda: list(), |
||
| 936 | "requires": lambda: list(), |
||
| 937 | "conflicts": lambda: list(), |
||
| 938 | "values": lambda: dict(), |
||
| 939 | "groups": lambda: dict(), |
||
| 940 | "rules": lambda: dict(), |
||
| 941 | "platform": lambda: "", |
||
| 942 | "platforms": lambda: set(), |
||
| 943 | "cpe_names": lambda: set(), |
||
| 944 | ** XCCDFEntity.KEYS |
||
| 945 | } |
||
| 946 | |||
| 947 | 2 | MANDATORY_KEYS = { |
|
| 948 | "title", |
||
| 949 | "status", |
||
| 950 | "description", |
||
| 951 | "front_matter", |
||
| 952 | "rear_matter", |
||
| 953 | "version", |
||
| 954 | } |
||
| 955 | |||
| 956 | 2 | @classmethod |
|
| 957 | def process_input_dict(cls, input_contents, env_yaml): |
||
| 958 | data = super(Group, cls).process_input_dict(input_contents, env_yaml) |
||
| 959 | if data["rules"]: |
||
| 960 | rule_ids = data["rules"] |
||
| 961 | data["rules"] = {rid: None for rid in rule_ids} |
||
| 962 | |||
| 963 | if data["groups"]: |
||
| 964 | group_ids = data["groups"] |
||
| 965 | data["groups"] = {gid: None for gid in group_ids} |
||
| 966 | |||
| 967 | if data["values"]: |
||
| 968 | value_ids = data["values"] |
||
| 969 | data["values"] = {vid: None for vid in value_ids} |
||
| 970 | |||
| 971 | if data["platform"]: |
||
| 972 | data["platforms"].add(data["platform"]) |
||
| 973 | return data |
||
| 974 | |||
| 975 | 2 | View Code Duplication | def load_entities(self, rules_by_id, values_by_id, groups_by_id): |
| 976 | for rid, val in self.rules.items(): |
||
| 977 | if not val: |
||
| 978 | self.rules[rid] = rules_by_id[rid] |
||
| 979 | |||
| 980 | for vid, val in self.values.items(): |
||
| 981 | if not val: |
||
| 982 | self.values[vid] = values_by_id[vid] |
||
| 983 | |||
| 984 | for gid, val in self.groups.items(): |
||
| 985 | if not val: |
||
| 986 | self.groups[gid] = groups_by_id[gid] |
||
| 987 | |||
| 988 | 2 | def represent_as_dict(self): |
|
| 989 | yaml_contents = super(Group, self).represent_as_dict() |
||
| 990 | |||
| 991 | if self.rules: |
||
| 992 | yaml_contents["rules"] = sorted(list(self.rules.keys())) |
||
| 993 | if self.groups: |
||
| 994 | yaml_contents["groups"] = sorted(list(self.groups.keys())) |
||
| 995 | if self.values: |
||
| 996 | yaml_contents["values"] = sorted(list(self.values.keys())) |
||
| 997 | |||
| 998 | return yaml_contents |
||
| 999 | |||
| 1000 | 2 | def validate_prodtype(self, yaml_file): |
|
| 1001 | for ptype in self.prodtype.split(","): |
||
| 1002 | if ptype.strip() != ptype: |
||
| 1003 | msg = ( |
||
| 1004 | "Comma-separated '{prodtype}' prodtype " |
||
| 1005 | "in {yaml_file} contains whitespace." |
||
| 1006 | .format(prodtype=self.prodtype, yaml_file=yaml_file)) |
||
| 1007 | raise ValueError(msg) |
||
| 1008 | |||
| 1009 | 2 | def to_xml_element(self): |
|
| 1010 | group = ET.Element('Group') |
||
| 1011 | group.set('id', self.id_) |
||
| 1012 | if self.prodtype != "all": |
||
| 1013 | group.set("prodtype", self.prodtype) |
||
| 1014 | title = ET.SubElement(group, 'title') |
||
| 1015 | title.text = self.title |
||
| 1016 | add_sub_element(group, 'description', self.description) |
||
| 1017 | add_warning_elements(group, self.warnings) |
||
| 1018 | add_nondata_subelements(group, "requires", "id", self.requires) |
||
| 1019 | add_nondata_subelements(group, "conflicts", "id", self.conflicts) |
||
| 1020 | |||
| 1021 | for cpe_name in self.cpe_names: |
||
| 1022 | platform_el = ET.SubElement(group, "platform") |
||
| 1023 | platform_el.set("idref", cpe_name) |
||
| 1024 | |||
| 1025 | for _value in self.values.values(): |
||
| 1026 | group.append(_value.to_xml_element()) |
||
| 1027 | |||
| 1028 | # Rules that install or remove packages affect remediation |
||
| 1029 | # of other rules. |
||
| 1030 | # When packages installed/removed rules come first: |
||
| 1031 | # The Rules are ordered in more logical way, and |
||
| 1032 | # remediation order is natural, first the package is installed, then configured. |
||
| 1033 | rules_in_group = list(self.rules.keys()) |
||
| 1034 | regex = (r'(package_.*_(installed|removed))|' + |
||
| 1035 | r'(service_.*_(enabled|disabled))|' + |
||
| 1036 | r'install_smartcard_packages$') |
||
| 1037 | priority_order = ["installed", "install_smartcard_packages", "removed", |
||
| 1038 | "enabled", "disabled"] |
||
| 1039 | rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex) |
||
| 1040 | |||
| 1041 | # Add rules in priority order, first all packages installed, then removed, |
||
| 1042 | # followed by services enabled, then disabled |
||
| 1043 | for rule_id in rules_in_group: |
||
| 1044 | group.append(self.rules.get(rule_id).to_xml_element()) |
||
| 1045 | |||
| 1046 | # Add the sub groups after any current level group rules. |
||
| 1047 | # As package installed/removed and service enabled/disabled rules are usuallly in |
||
| 1048 | # top level group, this ensures groups that further configure a package or service |
||
| 1049 | # are after rules that install or remove it. |
||
| 1050 | groups_in_group = list(self.groups.keys()) |
||
| 1051 | priority_order = [ |
||
| 1052 | # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule. |
||
| 1053 | # Due to conflicts between rules rpm_verify_* rules and any rule that configures |
||
| 1054 | # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group, |
||
| 1055 | # the rules deviating from the system default should be evaluated later. |
||
| 1056 | # So that in the end the system has contents, permissions and ownership reset, and |
||
| 1057 | # any deviations or stricter settings are applied by the rules in the profile. |
||
| 1058 | "software", "integrity", "integrity-software", "rpm_verification", |
||
| 1059 | |||
| 1060 | # The account group has to precede audit group because |
||
| 1061 | # the rule package_screen_installed is desired to be executed before the rule |
||
| 1062 | # audit_rules_privileged_commands, othervise the rule |
||
| 1063 | # does not catch newly installed screen binary during remediation |
||
| 1064 | # and report fail |
||
| 1065 | "accounts", "auditing", |
||
| 1066 | |||
| 1067 | |||
| 1068 | # The FIPS group should come before Crypto, |
||
| 1069 | # if we want to set a different (stricter) Crypto Policy than FIPS. |
||
| 1070 | "fips", "crypto", |
||
| 1071 | |||
| 1072 | # The firewalld_activation must come before ruleset_modifications, othervise |
||
| 1073 | # remediations for ruleset_modifications won't work |
||
| 1074 | "firewalld_activation", "ruleset_modifications", |
||
| 1075 | |||
| 1076 | # Rules from group disabling_ipv6 must precede rules from configuring_ipv6, |
||
| 1077 | # otherwise the remediation prints error although it is successful |
||
| 1078 | "disabling_ipv6", "configuring_ipv6" |
||
| 1079 | ] |
||
| 1080 | groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order) |
||
| 1081 | for group_id in groups_in_group: |
||
| 1082 | _group = self.groups[group_id] |
||
| 1083 | group.append(_group.to_xml_element()) |
||
| 1084 | |||
| 1085 | return group |
||
| 1086 | |||
| 1087 | 2 | def to_file(self, file_name): |
|
| 1088 | root = self.to_xml_element() |
||
| 1089 | tree = ET.ElementTree(root) |
||
| 1090 | tree.write(file_name) |
||
| 1091 | |||
| 1092 | 2 | def add_value(self, value): |
|
| 1093 | if value is None: |
||
| 1094 | return |
||
| 1095 | self.values[value.id_] = value |
||
| 1096 | |||
| 1097 | 2 | def add_group(self, group, env_yaml=None): |
|
| 1098 | if group is None: |
||
| 1099 | return |
||
| 1100 | if self.platforms and not group.platforms: |
||
| 1101 | group.platforms = self.platforms |
||
| 1102 | self.groups[group.id_] = group |
||
| 1103 | self._pass_our_properties_on_to(group) |
||
| 1104 | |||
| 1105 | # Once the group has inherited properties, update cpe_names |
||
| 1106 | if env_yaml: |
||
| 1107 | for platform in group.platforms: |
||
| 1108 | try: |
||
| 1109 | group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform)) |
||
| 1110 | except CPEDoesNotExist: |
||
| 1111 | print("Unsupported platform '%s' in group '%s'." % (platform, group.id_)) |
||
| 1112 | raise |
||
| 1113 | |||
| 1114 | 2 | def _pass_our_properties_on_to(self, obj): |
|
| 1115 | for attr in self.ATTRIBUTES_TO_PASS_ON: |
||
| 1116 | if hasattr(obj, attr) and getattr(obj, attr) is None: |
||
| 1117 | setattr(obj, attr, getattr(self, attr)) |
||
| 1118 | |||
| 1119 | 2 | def add_rule(self, rule, env_yaml=None): |
|
| 1120 | if rule is None: |
||
| 1121 | return |
||
| 1122 | if self.platforms and not rule.platforms: |
||
| 1123 | rule.platforms = self.platforms |
||
| 1124 | self.rules[rule.id_] = rule |
||
| 1125 | self._pass_our_properties_on_to(rule) |
||
| 1126 | |||
| 1127 | # Once the rule has inherited properties, update cpe_names |
||
| 1128 | if env_yaml: |
||
| 1129 | for platform in rule.platforms: |
||
| 1130 | try: |
||
| 1131 | rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform)) |
||
| 1132 | except CPEDoesNotExist: |
||
| 1133 | print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_)) |
||
| 1134 | raise |
||
| 1135 | |||
| 1136 | 2 | def __str__(self): |
|
| 1137 | return self.id_ |
||
| 1138 | |||
| 1139 | |||
| 1140 | 2 | def noop_rule_filterfunc(rule): |
|
| 1141 | return True |
||
| 1142 | |||
| 1143 | 2 | def rule_filter_from_def(filterdef): |
|
| 1144 | if filterdef is None or filterdef == "": |
||
| 1145 | return noop_rule_filterfunc |
||
| 1146 | |||
| 1147 | def filterfunc(rule): |
||
| 1148 | # Remove globals for security and only expose |
||
| 1149 | # variables relevant to the rule |
||
| 1150 | return eval(filterdef, {"__builtins__": None}, rule.__dict__) |
||
| 1151 | return filterfunc |
||
| 1152 | |||
| 1153 | |||
| 1154 | 2 | class Rule(XCCDFEntity): |
|
| 1155 | """Represents XCCDF Rule |
||
| 1156 | """ |
||
| 1157 | 2 | KEYS = { |
|
| 1158 | "prodtype": lambda: "all", |
||
| 1159 | "title": lambda: "", |
||
| 1160 | "description": lambda: "", |
||
| 1161 | "rationale": lambda: "", |
||
| 1162 | "severity": lambda: "", |
||
| 1163 | "references": lambda: dict(), |
||
| 1164 | "identifiers": lambda: dict(), |
||
| 1165 | "ocil_clause": lambda: None, |
||
| 1166 | "ocil": lambda: None, |
||
| 1167 | "oval_external_content": lambda: None, |
||
| 1168 | "warnings": lambda: list(), |
||
| 1169 | "conflicts": lambda: list(), |
||
| 1170 | "requires": lambda: list(), |
||
| 1171 | "platform": lambda: None, |
||
| 1172 | "platforms": lambda: set(), |
||
| 1173 | "inherited_platforms": lambda: list(), |
||
| 1174 | "template": lambda: None, |
||
| 1175 | "cpe_names": lambda: set(), |
||
| 1176 | ** XCCDFEntity.KEYS |
||
| 1177 | } |
||
| 1178 | |||
| 1179 | 2 | MANDATORY_KEYS = { |
|
| 1180 | "title", |
||
| 1181 | "description", |
||
| 1182 | "rationale", |
||
| 1183 | "severity", |
||
| 1184 | } |
||
| 1185 | |||
| 1186 | 2 | GENERIC_FILENAME = "rule.yml" |
|
| 1187 | 2 | ID_LABEL = "rule_id" |
|
| 1188 | |||
| 1189 | 2 | PRODUCT_REFERENCES = ("stigid", "cis",) |
|
| 1190 | 2 | GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",) |
|
| 1191 | |||
| 1192 | 2 | def __init__(self, id_): |
|
| 1193 | 2 | super(Rule, self).__init__(id_) |
|
| 1194 | 2 | self.sce_metadata = None |
|
| 1195 | |||
| 1196 | 2 | def __deepcopy__(self, memo): |
|
| 1197 | cls = self.__class__ |
||
| 1198 | result = cls.__new__(cls) |
||
| 1199 | memo[id(self)] = result |
||
| 1200 | for k, v in self.__dict__.items(): |
||
| 1201 | # These are difficult to deep copy, so let's just re-use them. |
||
| 1202 | if k != "template" and k != "local_env_yaml": |
||
| 1203 | setattr(result, k, deepcopy(v, memo)) |
||
| 1204 | else: |
||
| 1205 | setattr(result, k, v) |
||
| 1206 | return result |
||
| 1207 | |||
| 1208 | 2 | @classmethod |
|
| 1209 | 2 | def from_yaml(cls, yaml_file, env_yaml=None, sce_metadata=None): |
|
| 1210 | 2 | rule = super(Rule, cls).from_yaml(yaml_file, env_yaml) |
|
| 1211 | |||
| 1212 | # platforms are read as list from the yaml file |
||
| 1213 | # we need them to convert to set again |
||
| 1214 | 2 | rule.platforms = set(rule.platforms) |
|
| 1215 | |||
| 1216 | # rule.platforms.update(set(rule.inherited_platforms)) |
||
| 1217 | |||
| 1218 | 2 | check_warnings(rule) |
|
| 1219 | |||
| 1220 | # ensure that content of rule.platform is in rule.platforms as |
||
| 1221 | # well |
||
| 1222 | 2 | if rule.platform is not None: |
|
| 1223 | 2 | rule.platforms.add(rule.platform) |
|
| 1224 | |||
| 1225 | # Convert the platform names to CPE names |
||
| 1226 | # But only do it if an env_yaml was specified (otherwise there would be no product CPEs |
||
| 1227 | # to lookup), and the rule's prodtype matches the product being built |
||
| 1228 | 2 | if ( |
|
| 1229 | env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype) |
||
| 1230 | or env_yaml and rule.prodtype == "all"): |
||
| 1231 | for platform in rule.platforms: |
||
| 1232 | try: |
||
| 1233 | rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform)) |
||
| 1234 | except CPEDoesNotExist: |
||
| 1235 | print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_)) |
||
| 1236 | raise |
||
| 1237 | |||
| 1238 | 2 | if sce_metadata and rule.id_ in sce_metadata: |
|
| 1239 | rule.sce_metadata = sce_metadata[rule.id_] |
||
| 1240 | rule.sce_metadata["relative_path"] = os.path.join( |
||
| 1241 | env_yaml["product"], "checks/sce", rule.sce_metadata['filename']) |
||
| 1242 | |||
| 1243 | 2 | rule.validate_prodtype(yaml_file) |
|
| 1244 | 2 | rule.validate_identifiers(yaml_file) |
|
| 1245 | 2 | rule.validate_references(yaml_file) |
|
| 1246 | 2 | return rule |
|
| 1247 | |||
| 1248 | 2 | def _verify_stigid_format(self, product): |
|
| 1249 | 2 | stig_id = self.references.get("stigid", None) |
|
| 1250 | 2 | if not stig_id: |
|
| 1251 | 2 | return |
|
| 1252 | 2 | if "," in stig_id: |
|
| 1253 | 2 | raise ValueError("Rules can not have multiple STIG IDs.") |
|
| 1254 | |||
| 1255 | 2 | def _verify_disa_cci_format(self): |
|
| 1256 | 2 | cci_id = self.references.get("disa", None) |
|
| 1257 | 2 | if not cci_id: |
|
| 1258 | 2 | return |
|
| 1259 | cci_ex = re.compile(r'^CCI-[0-9]{6}$') |
||
| 1260 | for cci in cci_id.split(","): |
||
| 1261 | if not cci_ex.match(cci): |
||
| 1262 | raise ValueError("CCI '{}' is in the wrong format! " |
||
| 1263 | "Format should be similar to: " |
||
| 1264 | "CCI-XXXXXX".format(cci)) |
||
| 1265 | self.references["disa"] = cci_id |
||
| 1266 | |||
| 1267 | 2 | def normalize(self, product): |
|
| 1268 | 2 | try: |
|
| 1269 | 2 | self.make_refs_and_identifiers_product_specific(product) |
|
| 1270 | 2 | self.make_template_product_specific(product) |
|
| 1271 | 2 | except Exception as exc: |
|
| 1272 | 2 | msg = ( |
|
| 1273 | "Error normalizing '{rule}': {msg}" |
||
| 1274 | .format(rule=self.id_, msg=str(exc)) |
||
| 1275 | ) |
||
| 1276 | 2 | raise RuntimeError(msg) |
|
| 1277 | |||
| 1278 | 2 | def _get_product_only_references(self): |
|
| 1279 | 2 | product_references = dict() |
|
| 1280 | |||
| 1281 | 2 | for ref in Rule.PRODUCT_REFERENCES: |
|
| 1282 | 2 | start = "{0}@".format(ref) |
|
| 1283 | 2 | for gref, gval in self.references.items(): |
|
| 1284 | 2 | if ref == gref or gref.startswith(start): |
|
| 1285 | 2 | product_references[gref] = gval |
|
| 1286 | 2 | return product_references |
|
| 1287 | |||
| 1288 | 2 | def make_template_product_specific(self, product): |
|
| 1289 | 2 | product_suffix = "@{0}".format(product) |
|
| 1290 | |||
| 1291 | 2 | if not self.template: |
|
| 1292 | return |
||
| 1293 | |||
| 1294 | 2 | not_specific_vars = self.template.get("vars", dict()) |
|
| 1295 | 2 | specific_vars = self._make_items_product_specific( |
|
| 1296 | not_specific_vars, product_suffix, True) |
||
| 1297 | 2 | self.template["vars"] = specific_vars |
|
| 1298 | |||
| 1299 | 2 | not_specific_backends = self.template.get("backends", dict()) |
|
| 1300 | 2 | specific_backends = self._make_items_product_specific( |
|
| 1301 | not_specific_backends, product_suffix, True) |
||
| 1302 | 2 | self.template["backends"] = specific_backends |
|
| 1303 | |||
| 1304 | 2 | def make_refs_and_identifiers_product_specific(self, product): |
|
| 1305 | 2 | product_suffix = "@{0}".format(product) |
|
| 1306 | |||
| 1307 | 2 | product_references = self._get_product_only_references() |
|
| 1308 | 2 | general_references = self.references.copy() |
|
| 1309 | 2 | for todel in product_references: |
|
| 1310 | 2 | general_references.pop(todel) |
|
| 1311 | 2 | for ref in Rule.PRODUCT_REFERENCES: |
|
| 1312 | 2 | if ref in general_references: |
|
| 1313 | msg = "Unexpected reference identifier ({0}) without " |
||
| 1314 | msg += "product qualifier ({0}@{1}) while building rule " |
||
| 1315 | msg += "{2}" |
||
| 1316 | msg = msg.format(ref, product, self.id_) |
||
| 1317 | raise ValueError(msg) |
||
| 1318 | |||
| 1319 | 2 | to_set = dict( |
|
| 1320 | identifiers=(self.identifiers, False), |
||
| 1321 | general_references=(general_references, True), |
||
| 1322 | product_references=(product_references, False), |
||
| 1323 | ) |
||
| 1324 | 2 | for name, (dic, allow_overwrites) in to_set.items(): |
|
| 1325 | 2 | try: |
|
| 1326 | 2 | new_items = self._make_items_product_specific( |
|
| 1327 | dic, product_suffix, allow_overwrites) |
||
| 1328 | 2 | except ValueError as exc: |
|
| 1329 | 2 | msg = ( |
|
| 1330 | "Error processing {what} for rule '{rid}': {msg}" |
||
| 1331 | .format(what=name, rid=self.id_, msg=str(exc)) |
||
| 1332 | ) |
||
| 1333 | 2 | raise ValueError(msg) |
|
| 1334 | 2 | dic.clear() |
|
| 1335 | 2 | dic.update(new_items) |
|
| 1336 | |||
| 1337 | 2 | self.references = general_references |
|
| 1338 | 2 | self._verify_disa_cci_format() |
|
| 1339 | 2 | self.references.update(product_references) |
|
| 1340 | |||
| 1341 | 2 | self._verify_stigid_format(product) |
|
| 1342 | |||
| 1343 | 2 | def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False): |
|
| 1344 | 2 | new_items = dict() |
|
| 1345 | 2 | for full_label, value in items_dict.items(): |
|
| 1346 | 2 | if "@" not in full_label and full_label not in new_items: |
|
| 1347 | 2 | new_items[full_label] = value |
|
| 1348 | 2 | continue |
|
| 1349 | |||
| 1350 | 2 | label = full_label.split("@")[0] |
|
| 1351 | |||
| 1352 | # this test should occur before matching product_suffix with the product qualifier |
||
| 1353 | # present in the reference, so it catches problems even for products that are not |
||
| 1354 | # being built at the moment |
||
| 1355 | 2 | if label in Rule.GLOBAL_REFERENCES: |
|
| 1356 | msg = ( |
||
| 1357 | "You cannot use product-qualified for the '{item_u}' reference. " |
||
| 1358 | "Please remove the product-qualifier and merge values with the " |
||
| 1359 | "existing reference if there is any. Original line: {item_q}: {value_q}" |
||
| 1360 | .format(item_u=label, item_q=full_label, value_q=value) |
||
| 1361 | ) |
||
| 1362 | raise ValueError(msg) |
||
| 1363 | |||
| 1364 | 2 | if not full_label.endswith(product_suffix): |
|
| 1365 | 2 | continue |
|
| 1366 | |||
| 1367 | 2 | if label in items_dict and not allow_overwrites and value != items_dict[label]: |
|
| 1368 | 2 | msg = ( |
|
| 1369 | "There is a product-qualified '{item_q}' item, " |
||
| 1370 | "but also an unqualified '{item_u}' item " |
||
| 1371 | "and those two differ in value - " |
||
| 1372 | "'{value_q}' vs '{value_u}' respectively." |
||
| 1373 | .format(item_q=full_label, item_u=label, |
||
| 1374 | value_q=value, value_u=items_dict[label]) |
||
| 1375 | ) |
||
| 1376 | 2 | raise ValueError(msg) |
|
| 1377 | 2 | new_items[label] = value |
|
| 1378 | 2 | return new_items |
|
| 1379 | |||
| 1380 | 2 | def validate_identifiers(self, yaml_file): |
|
| 1381 | 2 | if self.identifiers is None: |
|
| 1382 | raise ValueError("Empty identifier section in file %s" % yaml_file) |
||
| 1383 | |||
| 1384 | # Validate all identifiers are non-empty: |
||
| 1385 | 2 | for ident_type, ident_val in self.identifiers.items(): |
|
| 1386 | 2 | if not isinstance(ident_type, str) or not isinstance(ident_val, str): |
|
| 1387 | raise ValueError("Identifiers and values must be strings: %s in file %s" |
||
| 1388 | % (ident_type, yaml_file)) |
||
| 1389 | 2 | if ident_val.strip() == "": |
|
| 1390 | raise ValueError("Identifiers must not be empty: %s in file %s" |
||
| 1391 | % (ident_type, yaml_file)) |
||
| 1392 | 2 | if ident_type[0:3] == 'cce': |
|
| 1393 | 2 | if not is_cce_format_valid(ident_val): |
|
| 1394 | raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'" |
||
| 1395 | " in file '%s'" % (ident_val, ident_type, yaml_file)) |
||
| 1396 | 2 | if not is_cce_value_valid("CCE-" + ident_val): |
|
| 1397 | raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'" |
||
| 1398 | " in file '%s'" % (ident_val, ident_type, yaml_file)) |
||
| 1399 | |||
| 1400 | 2 | def validate_references(self, yaml_file): |
|
| 1401 | 2 | if self.references is None: |
|
| 1402 | raise ValueError("Empty references section in file %s" % yaml_file) |
||
| 1403 | |||
| 1404 | 2 | for ref_type, ref_val in self.references.items(): |
|
| 1405 | 2 | if not isinstance(ref_type, str) or not isinstance(ref_val, str): |
|
| 1406 | raise ValueError("References and values must be strings: %s in file %s" |
||
| 1407 | % (ref_type, yaml_file)) |
||
| 1408 | 2 | if ref_val.strip() == "": |
|
| 1409 | raise ValueError("References must not be empty: %s in file %s" |
||
| 1410 | % (ref_type, yaml_file)) |
||
| 1411 | |||
| 1412 | 2 | for ref_type, ref_val in self.references.items(): |
|
| 1413 | 2 | for ref in ref_val.split(","): |
|
| 1414 | 2 | if ref.strip() != ref: |
|
| 1415 | msg = ( |
||
| 1416 | "Comma-separated '{ref_type}' reference " |
||
| 1417 | "in {yaml_file} contains whitespace." |
||
| 1418 | .format(ref_type=ref_type, yaml_file=yaml_file)) |
||
| 1419 | raise ValueError(msg) |
||
| 1420 | |||
| 1421 | 2 | def validate_prodtype(self, yaml_file): |
|
| 1422 | 2 | for ptype in self.prodtype.split(","): |
|
| 1423 | 2 | if ptype.strip() != ptype: |
|
| 1424 | msg = ( |
||
| 1425 | "Comma-separated '{prodtype}' prodtype " |
||
| 1426 | "in {yaml_file} contains whitespace." |
||
| 1427 | .format(prodtype=self.prodtype, yaml_file=yaml_file)) |
||
| 1428 | raise ValueError(msg) |
||
| 1429 | |||
| 1430 | 2 | def to_xml_element(self): |
|
| 1431 | rule = ET.Element('Rule') |
||
| 1432 | rule.set('id', self.id_) |
||
| 1433 | if self.prodtype != "all": |
||
| 1434 | rule.set("prodtype", self.prodtype) |
||
| 1435 | rule.set('severity', self.severity) |
||
| 1436 | add_sub_element(rule, 'title', self.title) |
||
| 1437 | add_sub_element(rule, 'description', self.description) |
||
| 1438 | add_sub_element(rule, 'rationale', self.rationale) |
||
| 1439 | |||
| 1440 | main_ident = ET.Element('ident') |
||
| 1441 | for ident_type, ident_val in self.identifiers.items(): |
||
| 1442 | # This is not true if items were normalized |
||
| 1443 | if '@' in ident_type: |
||
| 1444 | # the ident is applicable only on some product |
||
| 1445 | # format : 'policy@product', eg. 'stigid@product' |
||
| 1446 | # for them, we create a separate <ref> element |
||
| 1447 | policy, product = ident_type.split('@') |
||
| 1448 | ident = ET.SubElement(rule, 'ident') |
||
| 1449 | ident.set(policy, ident_val) |
||
| 1450 | ident.set('prodtype', product) |
||
| 1451 | else: |
||
| 1452 | main_ident.set(ident_type, ident_val) |
||
| 1453 | |||
| 1454 | if main_ident.attrib: |
||
| 1455 | rule.append(main_ident) |
||
| 1456 | |||
| 1457 | main_ref = ET.Element('ref') |
||
| 1458 | for ref_type, ref_val in self.references.items(): |
||
| 1459 | # This is not true if items were normalized |
||
| 1460 | if '@' in ref_type: |
||
| 1461 | # the reference is applicable only on some product |
||
| 1462 | # format : 'policy@product', eg. 'stigid@product' |
||
| 1463 | # for them, we create a separate <ref> element |
||
| 1464 | policy, product = ref_type.split('@') |
||
| 1465 | ref = ET.SubElement(rule, 'ref') |
||
| 1466 | ref.set(policy, ref_val) |
||
| 1467 | ref.set('prodtype', product) |
||
| 1468 | else: |
||
| 1469 | main_ref.set(ref_type, ref_val) |
||
| 1470 | |||
| 1471 | if main_ref.attrib: |
||
| 1472 | rule.append(main_ref) |
||
| 1473 | |||
| 1474 | ocil_parent = rule |
||
| 1475 | check_parent = rule |
||
| 1476 | |||
| 1477 | if self.sce_metadata: |
||
| 1478 | # TODO: This is pretty much another hack, just like the previous OVAL |
||
| 1479 | # one. However, we avoided the external SCE content as I'm not sure it |
||
| 1480 | # is generally useful (unlike say, CVE checking with external OVAL) |
||
| 1481 | # |
||
| 1482 | # Additionally, we build the content (check subelement) here rather |
||
| 1483 | # than in xslt due to the nature of our SCE metadata. |
||
| 1484 | # |
||
| 1485 | # Finally, before we begin, we might have an element with both SCE |
||
| 1486 | # and OVAL. We have no way of knowing (right here) whether that is |
||
| 1487 | # the case (due to a variety of issues, most notably, that linking |
||
| 1488 | # hasn't yet occurred). So we must rely on the content author's |
||
| 1489 | # good will, by annotating SCE content with a complex-check tag |
||
| 1490 | # if necessary. |
||
| 1491 | |||
| 1492 | if 'complex-check' in self.sce_metadata: |
||
| 1493 | # Here we have an issue: XCCDF allows EITHER one or more check |
||
| 1494 | # elements OR a single complex-check. While we have an explicit |
||
| 1495 | # case handling the OVAL-and-SCE interaction, OCIL entries have |
||
| 1496 | # (historically) been alongside OVAL content and been in an |
||
| 1497 | # "OR" manner -- preferring OVAL to SCE. In order to accomplish |
||
| 1498 | # this, we thus need to add _yet another parent_ when OCIL data |
||
| 1499 | # is present, and add update ocil_parent accordingly. |
||
| 1500 | if self.ocil or self.ocil_clause: |
||
| 1501 | ocil_parent = ET.SubElement(ocil_parent, "complex-check") |
||
| 1502 | ocil_parent.set('operator', 'OR') |
||
| 1503 | |||
| 1504 | check_parent = ET.SubElement(ocil_parent, "complex-check") |
||
| 1505 | check_parent.set('operator', self.sce_metadata['complex-check']) |
||
| 1506 | |||
| 1507 | # Now, add the SCE check element to the tree. |
||
| 1508 | check = ET.SubElement(check_parent, "check") |
||
| 1509 | check.set("system", SCE_SYSTEM) |
||
| 1510 | |||
| 1511 | if 'check-import' in self.sce_metadata: |
||
| 1512 | if isinstance(self.sce_metadata['check-import'], str): |
||
| 1513 | self.sce_metadata['check-import'] = [self.sce_metadata['check-import']] |
||
| 1514 | for entry in self.sce_metadata['check-import']: |
||
| 1515 | check_import = ET.SubElement(check, 'check-import') |
||
| 1516 | check_import.set('import-name', entry) |
||
| 1517 | check_import.text = None |
||
| 1518 | |||
| 1519 | if 'check-export' in self.sce_metadata: |
||
| 1520 | if isinstance(self.sce_metadata['check-export'], str): |
||
| 1521 | self.sce_metadata['check-export'] = [self.sce_metadata['check-export']] |
||
| 1522 | for entry in self.sce_metadata['check-export']: |
||
| 1523 | export, value = entry.split('=') |
||
| 1524 | check_export = ET.SubElement(check, 'check-export') |
||
| 1525 | check_export.set('value-id', value) |
||
| 1526 | check_export.set('export-name', export) |
||
| 1527 | check_export.text = None |
||
| 1528 | |||
| 1529 | check_ref = ET.SubElement(check, "check-content-ref") |
||
| 1530 | href = self.sce_metadata['relative_path'] |
||
| 1531 | check_ref.set("href", href) |
||
| 1532 | |||
| 1533 | if self.oval_external_content: |
||
| 1534 | check = ET.SubElement(check_parent, 'check') |
||
| 1535 | check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5") |
||
| 1536 | external_content = ET.SubElement(check, "check-content-ref") |
||
| 1537 | external_content.set("href", self.oval_external_content) |
||
| 1538 | else: |
||
| 1539 | # TODO: This is pretty much a hack, oval ID will be the same as rule ID |
||
| 1540 | # and we don't want the developers to have to keep them in sync. |
||
| 1541 | # Therefore let's just add an OVAL ref of that ID. |
||
| 1542 | oval_ref = ET.SubElement(check_parent, "oval") |
||
| 1543 | oval_ref.set("id", self.id_) |
||
| 1544 | |||
| 1545 | if self.ocil or self.ocil_clause: |
||
| 1546 | ocil = add_sub_element(ocil_parent, 'ocil', self.ocil if self.ocil else "") |
||
| 1547 | if self.ocil_clause: |
||
| 1548 | ocil.set("clause", self.ocil_clause) |
||
| 1549 | |||
| 1550 | add_warning_elements(rule, self.warnings) |
||
| 1551 | add_nondata_subelements(rule, "requires", "id", self.requires) |
||
| 1552 | add_nondata_subelements(rule, "conflicts", "id", self.conflicts) |
||
| 1553 | |||
| 1554 | for cpe_name in self.cpe_names: |
||
| 1555 | platform_el = ET.SubElement(rule, "platform") |
||
| 1556 | platform_el.set("idref", cpe_name) |
||
| 1557 | |||
| 1558 | return rule |
||
| 1559 | |||
| 1560 | 2 | def to_file(self, file_name): |
|
| 1561 | root = self.to_xml_element() |
||
| 1562 | tree = ET.ElementTree(root) |
||
| 1563 | tree.write(file_name) |
||
| 1564 | |||
| 1565 | 2 | def __hash__(self): |
|
| 1566 | """ Controls are meant to be unique, so using the |
||
| 1567 | ID should suffice""" |
||
| 1568 | return hash(self.id_) |
||
| 1569 | |||
| 1570 | 2 | def __eq__(self, other): |
|
| 1571 | return isinstance(other, self.__class__) and self.id_ == other.id_ |
||
| 1572 | |||
| 1573 | 2 | def __ne__(self, other): |
|
| 1574 | return not self != other |
||
| 1575 | |||
| 1576 | 2 | def __lt__(self, other): |
|
| 1577 | return self.id_ < other.id_ |
||
| 1578 | |||
| 1579 | 2 | def __str__(self): |
|
| 1580 | return self.id_ |
||
| 1581 | |||
| 1582 | |||
| 1583 | 2 | class DirectoryLoader(object): |
|
| 1584 | 2 | def __init__(self, profiles_dir, env_yaml): |
|
| 1585 | self.benchmark_file = None |
||
| 1586 | self.group_file = None |
||
| 1587 | self.loaded_group = None |
||
| 1588 | self.rule_files = [] |
||
| 1589 | self.value_files = [] |
||
| 1590 | self.subdirectories = [] |
||
| 1591 | |||
| 1592 | self.all_values = dict() |
||
| 1593 | self.all_rules = dict() |
||
| 1594 | self.all_groups = dict() |
||
| 1595 | |||
| 1596 | self.profiles_dir = profiles_dir |
||
| 1597 | self.env_yaml = env_yaml |
||
| 1598 | self.product = env_yaml["product"] |
||
| 1599 | |||
| 1600 | self.parent_group = None |
||
| 1601 | |||
| 1602 | 2 | def _collect_items_to_load(self, guide_directory): |
|
| 1603 | for dir_item in sorted(os.listdir(guide_directory)): |
||
| 1604 | dir_item_path = os.path.join(guide_directory, dir_item) |
||
| 1605 | _, extension = os.path.splitext(dir_item) |
||
| 1606 | |||
| 1607 | if extension == '.var': |
||
| 1608 | self.value_files.append(dir_item_path) |
||
| 1609 | elif dir_item == "benchmark.yml": |
||
| 1610 | if self.benchmark_file: |
||
| 1611 | raise ValueError("Multiple benchmarks in one directory") |
||
| 1612 | self.benchmark_file = dir_item_path |
||
| 1613 | elif dir_item == "group.yml": |
||
| 1614 | if self.group_file: |
||
| 1615 | raise ValueError("Multiple groups in one directory") |
||
| 1616 | self.group_file = dir_item_path |
||
| 1617 | elif extension == '.rule': |
||
| 1618 | self.rule_files.append(dir_item_path) |
||
| 1619 | elif is_rule_dir(dir_item_path): |
||
| 1620 | self.rule_files.append(get_rule_dir_yaml(dir_item_path)) |
||
| 1621 | elif dir_item != "tests": |
||
| 1622 | if os.path.isdir(dir_item_path): |
||
| 1623 | self.subdirectories.append(dir_item_path) |
||
| 1624 | else: |
||
| 1625 | sys.stderr.write( |
||
| 1626 | "Encountered file '%s' while recursing, extension '%s' " |
||
| 1627 | "is unknown. Skipping..\n" |
||
| 1628 | % (dir_item, extension) |
||
| 1629 | ) |
||
| 1630 | |||
| 1631 | 2 | def load_benchmark_or_group(self, guide_directory): |
|
| 1632 | """ |
||
| 1633 | Loads a given benchmark or group from the specified benchmark_file or |
||
| 1634 | group_file, in the context of guide_directory, profiles_dir and env_yaml. |
||
| 1635 | |||
| 1636 | Returns the loaded group or benchmark. |
||
| 1637 | """ |
||
| 1638 | group = None |
||
| 1639 | if self.group_file and self.benchmark_file: |
||
| 1640 | raise ValueError("A .benchmark file and a .group file were found in " |
||
| 1641 | "the same directory '%s'" % (guide_directory)) |
||
| 1642 | |||
| 1643 | # we treat benchmark as a special form of group in the following code |
||
| 1644 | if self.benchmark_file: |
||
| 1645 | group = Benchmark.from_yaml( |
||
| 1646 | self.benchmark_file, self.env_yaml, 'product-name' |
||
| 1647 | ) |
||
| 1648 | group.add_value_needed_for_ocil_clauses() |
||
| 1649 | if self.profiles_dir: |
||
| 1650 | group.add_profiles_from_dir(self.profiles_dir, self.env_yaml) |
||
| 1651 | |||
| 1652 | if self.group_file: |
||
| 1653 | group = Group.from_yaml(self.group_file, self.env_yaml) |
||
| 1654 | self.all_groups[group.id_] = group |
||
| 1655 | |||
| 1656 | return group |
||
| 1657 | |||
| 1658 | 2 | def _load_group_process_and_recurse(self, guide_directory): |
|
| 1659 | self.loaded_group = self.load_benchmark_or_group(guide_directory) |
||
| 1660 | |||
| 1661 | if self.loaded_group: |
||
| 1662 | |||
| 1663 | if self.parent_group: |
||
| 1664 | self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml) |
||
| 1665 | |||
| 1666 | self._process_values() |
||
| 1667 | self._recurse_into_subdirs() |
||
| 1668 | self._process_rules() |
||
| 1669 | |||
| 1670 | 2 | def process_directory_tree(self, start_dir, extra_group_dirs=None): |
|
| 1671 | self._collect_items_to_load(start_dir) |
||
| 1672 | if extra_group_dirs: |
||
| 1673 | self.subdirectories += extra_group_dirs |
||
| 1674 | self._load_group_process_and_recurse(start_dir) |
||
| 1675 | |||
| 1676 | 2 | def process_directory_trees(self, directories): |
|
| 1677 | start_dir = directories[0] |
||
| 1678 | extra_group_dirs = directories[1:] |
||
| 1679 | return self.process_directory_tree(start_dir, extra_group_dirs) |
||
| 1680 | |||
| 1681 | 2 | def _recurse_into_subdirs(self): |
|
| 1682 | for subdir in self.subdirectories: |
||
| 1683 | loader = self._get_new_loader() |
||
| 1684 | loader.parent_group = self.loaded_group |
||
| 1685 | loader.process_directory_tree(subdir) |
||
| 1686 | self.all_values.update(loader.all_values) |
||
| 1687 | self.all_rules.update(loader.all_rules) |
||
| 1688 | self.all_groups.update(loader.all_groups) |
||
| 1689 | |||
| 1690 | 2 | def _get_new_loader(self): |
|
| 1691 | raise NotImplementedError() |
||
| 1692 | |||
| 1693 | 2 | def _process_values(self): |
|
| 1694 | raise NotImplementedError() |
||
| 1695 | |||
| 1696 | 2 | def _process_rules(self): |
|
| 1697 | raise NotImplementedError() |
||
| 1698 | |||
| 1699 | 2 | def save_all_entities(self, base_dir): |
|
| 1700 | destdir = os.path.join(base_dir, "rules") |
||
| 1701 | mkdir_p(destdir) |
||
| 1702 | if self.all_rules: |
||
| 1703 | self.save_entities(self.all_rules.values(), destdir) |
||
| 1704 | |||
| 1705 | destdir = os.path.join(base_dir, "groups") |
||
| 1706 | mkdir_p(destdir) |
||
| 1707 | if self.all_groups: |
||
| 1708 | self.save_entities(self.all_groups.values(), destdir) |
||
| 1709 | |||
| 1710 | destdir = os.path.join(base_dir, "values") |
||
| 1711 | mkdir_p(destdir) |
||
| 1712 | if self.all_values: |
||
| 1713 | self.save_entities(self.all_values.values(), destdir) |
||
| 1714 | |||
| 1715 | 2 | def save_entities(self, entities, destdir): |
|
| 1716 | if not entities: |
||
| 1717 | return |
||
| 1718 | for entity in entities: |
||
| 1719 | basename = entity.id_ + ".yml" |
||
| 1720 | dest_filename = os.path.join(destdir, basename) |
||
| 1721 | entity.dump_yaml(dest_filename) |
||
| 1722 | |||
| 1723 | |||
| 1724 | 2 | class BuildLoader(DirectoryLoader): |
|
| 1725 | 2 | def __init__(self, profiles_dir, env_yaml, |
|
| 1726 | sce_metadata_path=None): |
||
| 1727 | super(BuildLoader, self).__init__(profiles_dir, env_yaml) |
||
| 1728 | |||
| 1729 | self.sce_metadata = None |
||
| 1730 | if sce_metadata_path and os.path.getsize(sce_metadata_path): |
||
| 1731 | self.sce_metadata = json.load(open(sce_metadata_path, 'r')) |
||
| 1732 | |||
| 1733 | 2 | def _process_values(self): |
|
| 1734 | for value_yaml in self.value_files: |
||
| 1735 | value = Value.from_yaml(value_yaml, self.env_yaml) |
||
| 1736 | self.all_values[value.id_] = value |
||
| 1737 | self.loaded_group.add_value(value) |
||
| 1738 | |||
| 1739 | 2 | def _process_rules(self): |
|
| 1740 | for rule_yaml in self.rule_files: |
||
| 1741 | try: |
||
| 1742 | rule = Rule.from_yaml(rule_yaml, self.env_yaml, self.sce_metadata) |
||
| 1743 | except DocumentationNotComplete: |
||
| 1744 | # Happens on non-debug build when a rule is "documentation-incomplete" |
||
| 1745 | continue |
||
| 1746 | prodtypes = parse_prodtype(rule.prodtype) |
||
| 1747 | if "all" not in prodtypes and self.product not in prodtypes: |
||
| 1748 | continue |
||
| 1749 | self.all_rules[rule.id_] = rule |
||
| 1750 | self.loaded_group.add_rule(rule, env_yaml=self.env_yaml) |
||
| 1751 | |||
| 1752 | if self.loaded_group.platforms: |
||
| 1753 | rule.inherited_platforms += self.loaded_group.platforms |
||
| 1754 | |||
| 1755 | rule.normalize(self.env_yaml["product"]) |
||
| 1756 | |||
| 1757 | 2 | def _get_new_loader(self): |
|
| 1758 | loader = BuildLoader( |
||
| 1759 | self.profiles_dir, self.env_yaml) |
||
| 1760 | # Do it this way so we only have to parse the SCE metadata once. |
||
| 1761 | loader.sce_metadata = self.sce_metadata |
||
| 1762 | return loader |
||
| 1763 | |||
| 1764 | 2 | def export_group_to_file(self, filename): |
|
| 1765 | return self.loaded_group.to_file(filename) |
||
| 1766 | |||
| 1767 | |||
| 1768 | 2 | class LinearLoader(object): |
|
| 1769 | 2 | def __init__(self, env_yaml, resolved_path): |
|
| 1770 | self.resolved_rules_dir = os.path.join(resolved_path, "rules") |
||
| 1771 | self.rules = dict() |
||
| 1772 | |||
| 1773 | self.resolved_profiles_dir = os.path.join(resolved_path, "profiles") |
||
| 1774 | self.profiles = dict() |
||
| 1775 | |||
| 1776 | self.resolved_groups_dir = os.path.join(resolved_path, "groups") |
||
| 1777 | self.groups = dict() |
||
| 1778 | |||
| 1779 | self.resolved_values_dir = os.path.join(resolved_path, "values") |
||
| 1780 | self.values = dict() |
||
| 1781 | |||
| 1782 | self.benchmark = None |
||
| 1783 | self.env_yaml = env_yaml |
||
| 1784 | |||
| 1785 | 2 | def find_first_groups_ids(self, start_dir): |
|
| 1786 | group_files = glob.glob(os.path.join(start_dir, "*", "group.yml")) |
||
| 1787 | group_ids = [fname.split(os.path.sep)[-2] for fname in group_files] |
||
| 1788 | return group_ids |
||
| 1789 | |||
| 1790 | 2 | def load_entities_by_id(self, filenames, destination, cls): |
|
| 1791 | for fname in filenames: |
||
| 1792 | entity = cls.from_yaml(fname, self.env_yaml) |
||
| 1793 | destination[entity.id_] = entity |
||
| 1794 | |||
| 1795 | 2 | def load_benchmark(self, directory): |
|
| 1796 | self.benchmark = Benchmark.from_yaml( |
||
| 1797 | os.path.join(directory, "benchmark.yml"), self.env_yaml, "product-name") |
||
| 1798 | self.benchmark.add_value_needed_for_ocil_clauses() |
||
| 1799 | |||
| 1800 | self.benchmark.add_profiles_from_dir(self.resolved_profiles_dir, self.env_yaml) |
||
| 1801 | |||
| 1802 | benchmark_first_groups = self.find_first_groups_ids(directory) |
||
| 1803 | for gid in benchmark_first_groups: |
||
| 1804 | self.benchmark.add_group(self.groups[gid], self.env_yaml) |
||
| 1805 | |||
| 1806 | 2 | def load_compiled_content(self): |
|
| 1807 | filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml")) |
||
| 1808 | self.load_entities_by_id(filenames, self.rules, Rule) |
||
| 1809 | |||
| 1810 | filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.yml")) |
||
| 1811 | self.load_entities_by_id(filenames, self.groups, Group) |
||
| 1812 | |||
| 1813 | filenames = glob.glob(os.path.join(self.resolved_profiles_dir, "*.yml")) |
||
| 1814 | self.load_entities_by_id(filenames, self.profiles, Profile) |
||
| 1815 | |||
| 1816 | filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.yml")) |
||
| 1817 | self.load_entities_by_id(filenames, self.values, Value) |
||
| 1818 | |||
| 1819 | for g in self.groups.values(): |
||
| 1820 | g.load_entities(self.rules, self.values, self.groups) |
||
| 1821 | |||
| 1822 | 2 | def export_benchmark_to_file(self, filename): |
|
| 1823 | return self.benchmark.to_file(filename) |
||
| 1824 |