ssg.entities.common   F
last analyzed

Complexity

Total Complexity 81

Size/Duplication

Total Lines 472
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 282
dl 0
loc 472
ccs 0
cts 239
cp 0
rs 2
c 0
b 0
f 0
wmc 81

5 Functions

Rating   Name   Duplication   Size   Complexity  
A dump_yaml_preferably_in_original_order() 0 8 3
A extract_reference_from_product_specific_label() 0 24 5
A make_items_product_specific() 0 37 5
A derive_id_from_file_name() 0 2 1
A add_sub_element() 0 36 3

22 Methods

Rating   Name   Duplication   Size   Complexity  
A Templatable.is_templated() 0 2 1
A XCCDFEntity.__init__() 0 4 1
A XCCDFEntity._assign_defaults() 0 6 3
A SelectionHandler.apply_selection() 0 20 5
A XCCDFEntity.to_xml_element() 0 2 1
A SelectionHandler.selections() 0 14 2
A Templatable.make_template_product_specific() 0 15 2
A Templatable.get_template_vars() 0 16 2
A SelectionHandler._subtract_refinements() 0 11 4
A Templatable.get_template_context() 0 8 1
A XCCDFEntity.represent_as_dict() 0 14 5
A XCCDFEntity.get_instance_from_full_dict() 0 12 2
A Templatable.extract_configured_backend_lang() 0 17 5
B XCCDFEntity.parse_yaml_into_processed_dict() 0 40 6
A XCCDFEntity.from_yaml() 0 22 4
A SelectionHandler.update_with() 0 13 1
A SelectionHandler.__init__() 0 6 1
A Templatable.__init__() 0 2 1
A Templatable.get_template_name() 0 8 3
A XCCDFEntity.process_input_dict() 0 27 5
A XCCDFEntity.to_file() 0 4 1
A XCCDFEntity.dump_yaml() 0 5 2

How to fix   Complexity   

Complexity

Complex classes like ssg.entities.common often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import absolute_import
2
from __future__ import print_function
3
4
import os
5
import yaml
6
from collections import defaultdict
7
from copy import deepcopy
8
9
from ..xml import ElementTree as ET, add_xhtml_namespace
10
from ..yaml import DocumentationNotComplete, open_and_macro_expand
11
from ..shims import unicode_func
12
13
from ..constants import (
14
    xhtml_namespace,
15
    XCCDF_REFINABLE_PROPERTIES,
16
    XCCDF12_NS,
17
    OSCAP_VALUE,
18
    GLOBAL_REFERENCES
19
)
20
21
22
def extract_reference_from_product_specific_label(items_dict, full_label, value, allow_overwrites):
23
    label = full_label.split("@")[0]
24
25
    if label in GLOBAL_REFERENCES:
26
        msg = (
27
            "You cannot use product-qualified for the '{item_u}' reference. "
28
            "Please remove the product-qualifier and merge values with the "
29
            "existing reference if there is any. Original line: {item_q}: {value_q}"
30
            .format(item_u=label, item_q=full_label, value_q=value)
31
        )
32
        raise ValueError(msg)
33
34
    if not allow_overwrites and label in items_dict and value != items_dict[label]:
35
        msg = (
36
            "There is a product-qualified '{item_q}' item, "
37
            "but also an unqualified '{item_u}' item "
38
            "and those two differ in value - "
39
            "'{value_q}' vs '{value_u}' respectively."
40
            .format(item_q=full_label, item_u=label,
41
                    value_q=value, value_u=items_dict[label])
42
        )
43
        raise ValueError(msg)
44
45
    return label
46
47
48
def make_items_product_specific(items_dict, product_suffix, allow_overwrites=False):
49
    """
50
    Function will normalize dictionary values for a specific product, by either
51
    removing product qualifier from the key (reference@product: value -> reference: value),
52
    or by dropping irrelevant entries (reference@other_product: value).
53
54
    Qualified entries always take precedence over generic ones.
55
56
    In case when `allow_overwrites` is set to False even qualified entry won't be allowed
57
    to replace generic one and Exception will be thrown.
58
59
    :param items_dict: Input dictionary.
60
    :param product_suffix: The product to be normalized against.
61
    :param allow_overwrites: Controls if the function should replace value from a non-qualified
62
                             label with a qualified one.
63
    :return: New, normalized dictionary.
64
    """
65
    new_items = dict()
66
    for full_label, value in items_dict.items():
67
        if "@" not in full_label:
68
            # Current full_label is a generic reference, and we should NEVER overwrite an entry
69
            # that came from a product-qualified reference earlier.
70
            if full_label not in new_items:
71
                new_items[full_label] = value
72
            continue
73
74
        # This procedure should occur before matching product_suffix with the product qualifier
75
        # present in the reference, so it catches problems even for products that are not
76
        # being built at the moment
77
        label = extract_reference_from_product_specific_label(items_dict, full_label, value,
78
                                                              allow_overwrites)
79
80
        if not full_label.endswith(product_suffix):
81
            continue
82
83
        new_items[label] = value
84
    return new_items
85
86
87
def add_sub_element(parent, tag, ns, data):
88
    """
89
    Creates a new child element under parent with tag tag, and sets
90
    data as the content under the tag. In particular, data is a string
91
    to be parsed as an XML tree, allowing sub-elements of children to be
92
    added.
93
94
    If data should not be parsed as an XML tree, either escape the contents
95
    before passing into this function, or use ElementTree.SubElement().
96
97
    Returns the newly created subelement of type tag.
98
    """
99
    namespaced_data = add_xhtml_namespace(data)
100
    # This is used because our YAML data contain XML and XHTML elements
101
    # ET.SubElement() escapes the < > characters by &lt; and &gt;
102
    # and therefore it does not add child elements
103
    # we need to do a hack instead
104
    # TODO: Remove this function after we move to Markdown everywhere in SSG
105
    ustr = unicode_func('<{0} xmlns="{3}" xmlns:xhtml="{2}">{1}</{0}>').format(
106
        tag, namespaced_data, xhtml_namespace, ns)
107
108
    try:
109
        element = ET.fromstring(ustr.encode("utf-8"))
110
    except Exception:
111
        msg = ("Error adding subelement to an element '{0}' from string: '{1}'"
112
               .format(parent.tag, ustr))
113
        raise RuntimeError(msg)
114
115
    # Apart from HTML and XML elements the rule descriptions and similar
116
    # also contain <xccdf:sub> elements, where we need to add the prefix
117
    # to create a full reference.
118
    for x in element.findall(".//{%s}sub" % XCCDF12_NS):
119
        x.set("idref", OSCAP_VALUE + x.get("idref"))
120
        x.set("use", "legacy")
121
    parent.append(element)
122
    return element
123
124
125
def derive_id_from_file_name(filename):
126
    return os.path.splitext(filename)[0]
127
128
129
def dump_yaml_preferably_in_original_order(dictionary, file_object):
130
    try:
131
        return yaml.dump(dictionary, file_object, indent=4, sort_keys=False)
132
    except TypeError as exc:
133
        # Older versions of libyaml don't understand the sort_keys kwarg
134
        if "sort_keys" not in str(exc):
135
            raise exc
136
        return yaml.dump(dictionary, file_object, indent=4)
137
138
139
class XCCDFEntity(object):
140
    """
141
    This class can load itself from a YAML with Jinja macros,
142
    and it can also save itself to YAML.
143
144
    It is supposed to work with the content in the project,
145
    when entities are defined in the benchmark tree,
146
    and they are compiled into flat YAMLs to the build directory.
147
    """
148
    KEYS = dict(
149
            id_=lambda: "",
150
            title=lambda: "",
151
            definition_location=lambda: "",
152
    )
153
154
    MANDATORY_KEYS = set()
155
156
    GENERIC_FILENAME = ""
157
    ID_LABEL = "id"
158
159
    def __init__(self, id_):
160
        super(XCCDFEntity, self).__init__()
161
        self._assign_defaults()
162
        self.id_ = id_
163
164
    def _assign_defaults(self):
165
        for key, default in self.KEYS.items():
166
            default_val = default()
167
            if isinstance(default_val, RuntimeError):
168
                default_val = None
169
            setattr(self, key, default_val)
170
171
    @classmethod
172
    def get_instance_from_full_dict(cls, data):
173
        """
174
        Given a defining dictionary, produce an instance
175
        by treating all dict elements as attributes.
176
177
        Extend this if you want tight control over the instance creation process.
178
        """
179
        entity = cls(data["id_"])
180
        for key, value in data.items():
181
            setattr(entity, key, value)
182
        return entity
183
184
    @classmethod
185
    def process_input_dict(cls, input_contents, env_yaml, product_cpes=None):
186
        """
187
        Take the contents of the definition as a dictionary, and
188
        add defaults or raise errors if a required member is not present.
189
190
        Extend this if you want to add, remove or alter the result
191
        that will constitute the new instance.
192
        """
193
        data = dict()
194
195
        for key, default in cls.KEYS.items():
196
            if key in input_contents:
197
                if input_contents[key] is not None:
198
                    data[key] = input_contents[key]
199
                del input_contents[key]
200
                continue
201
202
            if key not in cls.MANDATORY_KEYS:
203
                data[key] = cls.KEYS[key]()
204
            else:
205
                msg = (
206
                    "Key '{key}' is mandatory for definition of '{class_name}'."
207
                    .format(key=key, class_name=cls.__name__))
208
                raise ValueError(msg)
209
210
        return data
211
212
    @classmethod
213
    def parse_yaml_into_processed_dict(cls, yaml_file, env_yaml=None, product_cpes=None):
214
        """
215
        Given yaml filename and environment info, produce a dictionary
216
        that defines the instance to be created.
217
        This wraps :meth:`process_input_dict` and it adds generic keys on the top:
218
219
        - `id_` as the entity ID that is deduced either from the file name,
220
          or from the parent directory name.
221
        - `definition_location` is the original location where the entity got defined.
222
        """
223
        file_basename = os.path.basename(yaml_file)
224
        entity_id = derive_id_from_file_name(file_basename)
225
        if file_basename == cls.GENERIC_FILENAME:
226
            entity_id = os.path.basename(os.path.dirname(yaml_file))
227
228
        if env_yaml:
229
            env_yaml[cls.ID_LABEL] = entity_id
230
        yaml_data = open_and_macro_expand(yaml_file, env_yaml)
231
232
        try:
233
            processed_data = cls.process_input_dict(yaml_data, env_yaml, product_cpes)
234
        except ValueError as exc:
235
            msg = (
236
                "Error processing {yaml_file}: {exc}"
237
                .format(yaml_file=yaml_file, exc=str(exc)))
238
            raise ValueError(msg)
239
240
        if yaml_data:
241
            msg = (
242
                "Unparsed YAML data in '{yaml_file}': {keys}"
243
                .format(yaml_file=yaml_file, keys=list(yaml_data.keys())))
244
            raise RuntimeError(msg)
245
246
        if not processed_data.get("definition_location", ""):
247
            processed_data["definition_location"] = yaml_file
248
249
        processed_data["id_"] = entity_id
250
251
        return processed_data
252
253
    @classmethod
254
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
255
        yaml_file = os.path.normpath(yaml_file)
256
257
        local_env_yaml = None
258
        if env_yaml:
259
            local_env_yaml = dict()
260
            local_env_yaml.update(env_yaml)
261
262
        try:
263
            data_dict = cls.parse_yaml_into_processed_dict(yaml_file, local_env_yaml, product_cpes)
264
        except DocumentationNotComplete as exc:
265
            raise
266
        except Exception as exc:
267
            msg = (
268
                "Error loading a {class_name} from {filename}: {error}"
269
                .format(class_name=cls.__name__, filename=yaml_file, error=str(exc)))
270
            raise RuntimeError(msg)
271
272
        result = cls.get_instance_from_full_dict(data_dict)
273
274
        return result
275
276
    def represent_as_dict(self):
277
        """
278
        Produce a dict representation of the class.
279
280
        Extend this method if you need the representation to be different from the object.
281
        """
282
        data = dict()
283
        for key in self.KEYS:
284
            value = getattr(self, key)
285
            if value or True:
286
                data[key] = getattr(self, key)
287
        if "id_" in data:
288
            del data["id_"]
289
        return data
290
291
    def dump_yaml(self, file_name, documentation_complete=True):
292
        to_dump = self.represent_as_dict()
293
        to_dump["documentation_complete"] = documentation_complete
294
        with open(file_name, "w+") as f:
295
            dump_yaml_preferably_in_original_order(to_dump, f)
296
297
    def to_xml_element(self):
298
        raise NotImplementedError()
299
300
    def to_file(self, file_name):
301
        root = self.to_xml_element()
302
        tree = ET.ElementTree(root)
303
        tree.write(file_name)
304
305
306
class SelectionHandler(object):
307
    def __init__(self):
308
        self.refine_rules = defaultdict(list)
309
        self.variables = dict()
310
        self.unselected = []
311
        self.unselected_groups = []
312
        self.selected = []
313
314
    @property
315
    def selections(self):
316
        selections = []
317
        for item in self.selected:
318
            selections.append(str(item))
319
        for item in self.unselected:
320
            selections.append("!"+str(item))
321
        for varname in self.variables.keys():
322
            selections.append(varname+"="+self.variables.get(varname))
323
        for rule, refinements in self.refine_rules.items():
324
            for prop, val in refinements:
325
                selections.append("{rule}.{property}={value}"
326
                                  .format(rule=rule, property=prop, value=val))
327
        return selections
328
329
    @selections.setter
330
    def selections(self, entries):
331
        for item in entries:
332
            self.apply_selection(item)
333
334
    def apply_selection(self, item):
335
        if "." in item:
336
            rule, refinement = item.split(".", 1)
337
            property_, value = refinement.split("=", 1)
338
            if property_ not in XCCDF_REFINABLE_PROPERTIES:
339
                msg = ("Property '{property_}' cannot be refined. "
340
                       "Rule properties that can be refined are {refinables}. "
341
                       "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'."
342
                       .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES,
343
                               rule_id=rule, value=value, profile=self.id_)
344
                       )
345
                raise ValueError(msg)
346
            self.refine_rules[rule].append((property_, value))
347
        elif "=" in item:
348
            varname, value = item.split("=", 1)
349
            self.variables[varname] = value
350
        elif item.startswith("!"):
351
            self.unselected.append(item[1:])
352
        else:
353
            self.selected.append(item)
354
355
    def _subtract_refinements(self, extended_refinements):
356
        """
357
        Given a dict of rule refinements from the extended profile,
358
        "undo" every refinement prefixed with '!' in this profile.
359
        """
360
        for rule, refinements in list(self.refine_rules.items()):
361
            if rule.startswith("!"):
362
                for prop, val in refinements:
363
                    extended_refinements[rule[1:]].remove((prop, val))
364
                del self.refine_rules[rule]
365
        return extended_refinements
366
367
    def update_with(self, rhs):
368
        extended_selects = set(rhs.selected)
369
        extra_selections = extended_selects.difference(set(self.selected))
370
        self.selected.extend(list(extra_selections))
371
372
        updated_variables = dict(rhs.variables)
373
        updated_variables.update(self.variables)
374
        self.variables = updated_variables
375
376
        extended_refinements = deepcopy(rhs.refine_rules)
377
        updated_refinements = self._subtract_refinements(extended_refinements)
378
        updated_refinements.update(self.refine_rules)
379
        self.refine_rules = updated_refinements
380
381
382
class Templatable(object):
383
    """
384
    The Templatable is a mix-in sidekick for XCCDFEntity-based classes
385
    that have templates. It contains methods used by the template Builder
386
    class.
387
388
    Methods `get_template_context` and `get_template_vars` are subject for
389
    overloading by XCCDFEntity subclasses that want to customize template
390
    input.
391
    """
392
393
    KEYS = dict(
394
        template=lambda: None,
395
    )
396
397
    def __init__(self):
398
        pass
399
400
    def is_templated(self):
401
        return isinstance(self.template, dict)
402
403
    def get_template_name(self):
404
        if not self.is_templated():
405
            return None
406
        try:
407
            return self.template["name"]
408
        except KeyError:
409
            raise ValueError(
410
                "Templatable {0} is missing template name under template key".format(self))
411
412
    def get_template_context(self, env_yaml):
413
        # TODO: The first two variables, 'rule_id' and 'rule_title' are expected by some
414
        #       templates and macros even if they are not rendered in a rule context.
415
        #       Better name for these variables are 'entity_id' and 'entity_title'.
416
        return {
417
            "rule_id": self.id_,
418
            "rule_title": self.title,
419
            "products": env_yaml["product"],
420
        }
421
422
    def get_template_vars(self, env_yaml):
423
        if "vars" not in self.template:
424
            raise ValueError(
425
                "Templatable {0} does not contain mandatory 'vars:' key under "
426
                "'template:' key.".format(self))
427
        template_vars = self.template["vars"]
428
429
        # Add the rule ID which will be used in template preprocessors (template.py)
430
        # as a unique sub-element for a variety of composite IDs.
431
        # TODO: The name _rule_id is a legacy from the era when rule was the only
432
        #       context for a template. Preprocessors implicitly depend on this name.
433
        #       A better name is '_entity_id' (as in XCCDF Entity).
434
        template_vars["_rule_id"] = self.id_
435
436
        return make_items_product_specific(template_vars, env_yaml["product"],
437
                                           allow_overwrites=True)
438
439
    def extract_configured_backend_lang(self, avail_langs):
440
        """
441
        Returns list of languages that should be generated
442
        based on the Templatable's template option `template.backends`.
443
        """
444
        if not self.is_templated():
445
            return []
446
447
        if "backends" in self.template:
448
            backends = self.template["backends"]
449
            for lang in backends:
450
                if lang not in avail_langs:
451
                    raise RuntimeError("Templatable {0} wants to generate unknown language '{1}"
452
                                       .format(self, lang))
453
            return [lang for name, lang in avail_langs.items() if backends.get(name, "on") == "on"]
454
455
        return avail_langs.values()
456
457
    def make_template_product_specific(self, product):
458
        if not self.is_templated():
459
            return
460
461
        product_suffix = "@{0}".format(product)
462
463
        not_specific_vars = self.template.get("vars", dict())
464
        specific_vars = make_items_product_specific(
465
            not_specific_vars, product_suffix, True)
466
        self.template["vars"] = specific_vars
467
468
        not_specific_backends = self.template.get("backends", dict())
469
        specific_backends = make_items_product_specific(
470
            not_specific_backends, product_suffix, True)
471
        self.template["backends"] = specific_backends
472