ssg.build_cpe   F
last analyzed

Complexity

Total Complexity 82

Size/Duplication

Total Lines 424
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 293
dl 0
loc 424
ccs 0
cts 260
cp 0
rs 2
c 0
b 0
f 0
wmc 82

33 Methods

Rating   Name   Duplication   Size   Complexity  
A CPEALCheckFactRef.to_bash_conditional() 0 2 1
A ProductCPEs.__init__() 0 9 1
A ProductCPEs.get_product_cpe_names() 0 2 1
A CPEALCheckFactRef.__init__() 0 5 1
A CPEList.to_file() 0 4 1
A ProductCPEs.load_content_cpes() 0 5 2
A CPEItem.set_template_variables() 0 5 3
A CPEItem.to_xml_element() 0 13 1
A CPEItem.cpe_oval_def_id() 0 6 1
A CPEALLogicalTest.to_xml_element() 0 11 4
A CPEALCheckFactRef.to_ansible_conditional() 0 2 1
A CPEALCheckFactRef.get_base_name_of_parametrized_cpe_id() 0 7 1
A CPEItem.cpe_oval_short_def_id() 0 3 1
A CPEALCheckFactRef.enrich_with_cpe_info() 0 7 1
A CPEList.add() 0 2 1
A ProductCPEs.load_product_cpes() 0 15 4
A CPEItem.is_cpe_name() 0 3 1
A CPEALLogicalTest.enrich_with_cpe_info() 0 3 2
A ProductCPEs.get_cpe() 0 11 4
A ProductCPEs.get_cpe_name() 0 3 1
A ProductCPEs.load_cpes_from_directory_tree() 0 17 4
A ProductCPEs.add_cpe_item() 0 5 2
A ProductCPEs.get_cpe_for_fact_ref() 0 2 1
A CPEALCheckFactRef.to_xml_element() 0 6 1
A CPEALLogicalTest.to_ansible_conditional() 0 20 5
A CPEList.__init__() 0 2 1
A CPEList.to_xml_element() 0 12 3
A CPEItem.set_conditional() 0 8 3
A CPEItem.create_resolved_cpe_item_for_fact_ref() 0 22 4
A CPEItem.from_yaml() 0 8 3
A ProductCPEs.add_resolved_cpe_items_from_platform() 0 16 4
A CPEALLogicalTest.to_bash_conditional() 0 20 5
A CPEALCheckFactRef.cpe_id_is_parametrized() 0 3 1

3 Functions

Rating   Name   Duplication   Size   Complexity  
A extract_env_obj() 0 17 3
A extract_subelement() 0 15 4
A extract_referred_nodes() 0 19 5

How to fix   Complexity   

Complexity

Complex classes like ssg.build_cpe often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Common functions for building CPEs
3
"""
4
5
from __future__ import absolute_import
6
from __future__ import print_function
7
import os
8
import sys
9
import ssg.id_translate
10
11
from .constants import oval_namespace
12
from .constants import PREFIX_TO_NS
13
from .utils import required_key, apply_formatting_on_dict_values
14
from .xml import ElementTree as ET
15
from .boolean_expression import Algebra, Symbol, Function
16
from .entities.common import XCCDFEntity, Templatable
17
from .yaml import convert_string_to_bool
18
19
20
class CPEDoesNotExist(Exception):
21
    pass
22
23
24
class ProductCPEs(object):
25
    """
26
    Reads from the disk all the yaml CPEs related to a product
27
    and provides them in a structured way.
28
    """
29
30
    def __init__(self):
31
32
        self.cpes_by_id = {}
33
        self.cpes_by_name = {}
34
        self.product_cpes = {}
35
        self.platforms = {}
36
        self.cpe_oval_href = ""
37
        self.algebra = Algebra(
38
            symbol_cls=CPEALCheckFactRef, function_cls=CPEALLogicalTest)
39
40
    def load_product_cpes(self, env_yaml):
41
        self.cpe_oval_href = "ssg-" + env_yaml["product"] + "-cpe-oval.xml"
42
        try:
43
            product_cpes_list = env_yaml["cpes"]
44
            for cpe_dict_repr in product_cpes_list:
45
                for cpe_id, cpe in cpe_dict_repr.items():
46
                    # these product CPEs defined in product.yml are defined
47
                    # differently than CPEs in shared/applicability/*.yml
48
                    # therefore we have to place the ID at the place where it is expected
49
                    cpe["id_"] = cpe_id
50
                    cpe_item = CPEItem.get_instance_from_full_dict(cpe)
51
                    cpe_item.is_product_cpe = True
52
                    self.add_cpe_item(cpe_item)
53
        except KeyError as exc:
54
            raise exc("Product %s does not define 'cpes'" % (env_yaml["product"]))
55
56
    def load_content_cpes(self, env_yaml):
57
        cpes_root = required_key(env_yaml, "cpes_root")
58
        if not os.path.isabs(cpes_root):
59
            cpes_root = os.path.join(env_yaml["product_dir"], cpes_root)
60
        self.load_cpes_from_directory_tree(cpes_root, env_yaml)
61
62
    def load_cpes_from_directory_tree(self, root_path, env_yaml):
63
        for dir_item in sorted(os.listdir(root_path)):
64
            dir_item_path = os.path.join(root_path, dir_item)
65
            if not os.path.isfile(dir_item_path):
66
                continue
67
68
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
69
            if ext != '.yml':
70
                sys.stderr.write(
71
                    "Encountered file '%s' while looking for content CPEs, "
72
                    "extension '%s' is unknown. Skipping..\n"
73
                    % (dir_item, ext)
74
                )
75
                continue
76
77
            cpe_item = CPEItem.from_yaml(dir_item_path, env_yaml)
78
            self.add_cpe_item(cpe_item)
79
80
    def add_cpe_item(self, cpe_item):
81
        self.cpes_by_id[cpe_item.id_] = cpe_item
82
        self.cpes_by_name[cpe_item.name] = cpe_item
83
        if cpe_item.is_product_cpe:
84
            self.product_cpes[cpe_item.id_] = cpe_item
85
86
    def get_cpe(self, cpe_id_or_name):
87
        try:
88
            if CPEItem.is_cpe_name(cpe_id_or_name):
89
                return self.cpes_by_name[cpe_id_or_name]
90
            else:
91
                if CPEALCheckFactRef.cpe_id_is_parametrized(cpe_id_or_name):
92
                    cpe_id_or_name = CPEALCheckFactRef.get_base_name_of_parametrized_cpe_id(
93
                        cpe_id_or_name)
94
                return self.cpes_by_id[cpe_id_or_name]
95
        except KeyError:
96
            raise CPEDoesNotExist("CPE %s is not defined" % cpe_id_or_name)
97
98
    def add_resolved_cpe_items_from_platform(self, platform):
99
        for fact_ref in platform.get_fact_refs():
100
            if fact_ref.arg:  # the CPE item is parametrized
101
                try:
102
                    # if there already exists a CPE item with factref's ID
103
                    # we can just use it right away, no new CPE items need to be created
104
                    cpe = self.get_cpe_for_fact_ref(fact_ref)
105
                    fact_ref.cpe_name = cpe.name
106
                except CPEDoesNotExist:
107
                    # if the CPE item with factref's ID does not exist
108
                    # it means that we need to create a new CPE item
109
                    # which will have parameters in place
110
                    cpe = self.get_cpe(fact_ref.cpe_name)
111
                    new_cpe = cpe.create_resolved_cpe_item_for_fact_ref(fact_ref)
112
                    self.add_cpe_item(new_cpe)
113
                    fact_ref.cpe_name = new_cpe.name
114
115
    def get_cpe_for_fact_ref(self, fact_ref):
116
        return self.get_cpe(fact_ref.as_id())
117
118
    def get_cpe_name(self, cpe_id):
119
        cpe = self.get_cpe(cpe_id)
120
        return cpe.name
121
122
    def get_product_cpe_names(self):
123
        return [cpe.name for cpe in self.product_cpes.values()]
124
125
126
class CPEList(object):
127
    """
128
    Represents the cpe-list element from the CPE standard.
129
    """
130
131
    prefix = "cpe-dict"
132
    ns = PREFIX_TO_NS[prefix]
133
134
    def __init__(self):
135
        self.cpe_items = []
136
137
    def add(self, cpe_item):
138
        self.cpe_items.append(cpe_item)
139
140
    def to_xml_element(self, cpe_oval_file):
141
        cpe_list = ET.Element("{%s}cpe-list" % CPEList.ns)
142
        cpe_list.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
143
        cpe_list.set("xsi:schemaLocation",
144
                     "http://cpe.mitre.org/dictionary/2.0 "
145
                     "http://cpe.mitre.org/files/cpe-dictionary_2.1.xsd")
146
147
        self.cpe_items.sort(key=lambda cpe: cpe.name)
148
        for cpe_item in self.cpe_items:
149
            cpe_list.append(cpe_item.to_xml_element(cpe_oval_file))
150
151
        return cpe_list
152
153
    def to_file(self, file_name, cpe_oval_file):
154
        root = self.to_xml_element(cpe_oval_file)
155
        tree = ET.ElementTree(root)
156
        tree.write(file_name, encoding="utf-8")
157
158
159
class CPEItem(XCCDFEntity, Templatable):
160
    """
161
    Represents the cpe-item element from the CPE standard.
162
    """
163
164
    KEYS = dict(
165
        name=lambda: "",
166
        check_id=lambda: "",
167
        bash_conditional=lambda: "",
168
        ansible_conditional=lambda: "",
169
        is_product_cpe=lambda: False,
170
        versioned=lambda: False,
171
        args=lambda: {},
172
        ** XCCDFEntity.KEYS
173
    )
174
    KEYS.update(**Templatable.KEYS)
175
176
    MANDATORY_KEYS = [
177
        "name",
178
    ]
179
180
    prefix = "cpe-dict"
181
    ns = PREFIX_TO_NS[prefix]
182
183
    @property
184
    def cpe_oval_short_def_id(self):
185
        return self.check_id or self.id_
186
187
    @property
188
    def cpe_oval_def_id(self):
189
        translator = ssg.id_translate.IDTranslator("ssg")
190
        full_id = translator.generate_id(
191
            "{" + oval_namespace + "}definition", self.cpe_oval_short_def_id)
192
        return full_id
193
194
    def to_xml_element(self, cpe_oval_filename):
195
        cpe_item = ET.Element("{%s}cpe-item" % CPEItem.ns)
196
        cpe_item.set('name', self.name)
197
198
        cpe_item_title = ET.SubElement(cpe_item, "{%s}title" % CPEItem.ns)
199
        cpe_item_title.set('xml:lang', "en-us")
200
        cpe_item_title.text = self.title
201
202
        cpe_item_check = ET.SubElement(cpe_item, "{%s}check" % CPEItem.ns)
203
        cpe_item_check.set('system', oval_namespace)
204
        cpe_item_check.set('href', cpe_oval_filename)
205
        cpe_item_check.text = self.cpe_oval_short_def_id
206
        return cpe_item
207
208
    @classmethod
209
    def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None):
210
        cpe_item = super(CPEItem, cls).from_yaml(yaml_file, env_yaml, product_cpes)
211
        if cpe_item.is_product_cpe:
212
            cpe_item.is_product_cpe = convert_string_to_bool(cpe_item.is_product_cpe)
213
        if cpe_item.versioned:
214
            cpe_item.versioned = convert_string_to_bool(cpe_item.versioned)
215
        return cpe_item
216
217
    def set_template_variables(self, *sources):
218
        if self.is_templated():
219
            self.template["vars"] = {}
220
            for source in sources:
221
                self.template["vars"].update(source)
222
223
    def create_resolved_cpe_item_for_fact_ref(self, fact_ref):
224
        if fact_ref.has_version_specs():
225
            if not self.versioned:
226
                raise ValueError("CPE entity '{0}' does not support version specifiers: "
227
                                 "{1}".format(self.id_, fact_ref.cpe_name))
228
        try:
229
            resolved_parameters = self.args[fact_ref.arg]
230
        except KeyError:
231
            raise KeyError(
232
                "The {0} CPE item does not support the argument {1}. "
233
                "Following arguments are supported: {2}".format(
234
                    self.id_, fact_ref.arg, [a for a in self.args.keys()]))
235
        resolved_parameters.update(fact_ref.as_dict())
236
        cpe_item_as_dict = self.represent_as_dict()
237
        cpe_item_as_dict["args"] = None
238
        cpe_item_as_dict["id_"] = fact_ref.as_id()
239
        new_associated_cpe_item_as_dict = apply_formatting_on_dict_values(
240
            cpe_item_as_dict, resolved_parameters)
241
        new_associated_cpe_item = CPEItem.get_instance_from_full_dict(
242
            new_associated_cpe_item_as_dict)
243
        new_associated_cpe_item.set_template_variables(resolved_parameters)
244
        return new_associated_cpe_item
245
246
    @staticmethod
247
    def is_cpe_name(cpe_id_or_name):
248
        return cpe_id_or_name.startswith("cpe:")
249
250
    def set_conditional(self, language, content):
251
        if language == "ansible":
252
            self.ansible_conditional = content
253
        elif language == "bash":
254
            self.bash_conditional = content
255
        else:
256
            raise RuntimeError(
257
                "The language {0} is not supported as conditional for CPE".format(language))
258
259
260
class CPEALLogicalTest(Function):
261
262
    prefix = "cpe-lang"
263
    ns = PREFIX_TO_NS[prefix]
264
265
    def to_xml_element(self):
266
        cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns)
267
        cpe_test.set('operator', ('OR' if self.is_or() else 'AND'))
268
        cpe_test.set('negate', ('true' if self.is_not() else 'false'))
269
        # Logical tests must go first, therefore we separate tests and factrefs
270
        tests = [t for t in self.args if isinstance(t, CPEALLogicalTest)]
271
        factrefs = [f for f in self.args if isinstance(f, CPEALCheckFactRef)]
272
        for obj in tests + factrefs:
273
            cpe_test.append(obj.to_xml_element())
274
275
        return cpe_test
276
277
    def enrich_with_cpe_info(self, cpe_products):
278
        for arg in self.args:
279
            arg.enrich_with_cpe_info(cpe_products)
280
281
    def to_bash_conditional(self):
282
        child_bash_conds = [
283
            a.to_bash_conditional() for a in self.args
284
            if a.to_bash_conditional() != '']
285
286
        if not child_bash_conds:
287
            return ""
288
289
        cond = ""
290
        if self.is_not():
291
            cond += "! "
292
            op = " "
293
        cond += "( "
294
        if self.is_or():
295
            op = " || "
296
        elif self.is_and():
297
            op = " && "
298
        cond += op.join(child_bash_conds)
0 ignored issues
show
introduced by
The variable op does not seem to be defined in case self.is_not() on line 290 is False. Are you sure this can never be the case?
Loading history...
299
        cond += " )"
300
        return cond
301
302
    def to_ansible_conditional(self):
303
        child_ansible_conds = [
304
            a.to_ansible_conditional() for a in self.args
305
            if a.to_ansible_conditional() != '']
306
307
        if not child_ansible_conds:
308
            return ""
309
310
        cond = ""
311
        if self.is_not():
312
            cond += "not "
313
            op = " "
314
        cond += "( "
315
        if self.is_or():
316
            op = " or "
317
        elif self.is_and():
318
            op = " and "
319
        cond += op.join(child_ansible_conds)
0 ignored issues
show
introduced by
The variable op does not seem to be defined in case self.is_not() on line 311 is False. Are you sure this can never be the case?
Loading history...
320
        cond += " )"
321
        return cond
322
323
324
class CPEALCheckFactRef(Symbol):
325
326
    prefix = "cpe-lang"
327
    ns = PREFIX_TO_NS[prefix]
328
329
    def __init__(self, obj):
330
        super(CPEALCheckFactRef, self).__init__(obj)
331
        self.cpe_name = obj  # we do not want to modify original name used for platforms
332
        self.bash_conditional = ""
333
        self.ansible_conditional = ""
334
335
    def enrich_with_cpe_info(self, cpe_products):
336
        self.cpe_oval_href = cpe_products.cpe_oval_href
337
        cpe_item = cpe_products.get_cpe(self.cpe_name)
338
        self.bash_conditional = cpe_item.bash_conditional
339
        self.ansible_conditional = cpe_item.ansible_conditional
340
        self.cpe_name = cpe_products.get_cpe_name(self.cpe_name)
341
        self.cpe_oval_def_id = cpe_item.cpe_oval_def_id
342
343
    def to_xml_element(self):
344
        el = ET.Element("{%s}check-fact-ref" % CPEALCheckFactRef.ns)
345
        el.set("system", oval_namespace)
346
        el.set("href", self.cpe_oval_href)
347
        el.set("id-ref", self.cpe_oval_def_id)
348
        return el
349
350
    def to_bash_conditional(self):
351
        return self.bash_conditional
352
353
    def to_ansible_conditional(self):
354
        return self.ansible_conditional
355
356
    @staticmethod
357
    def cpe_id_is_parametrized(cpe_id):
358
        return Symbol.is_parametrized(cpe_id)
359
360
    @staticmethod
361
    def get_base_name_of_parametrized_cpe_id(cpe_id):
362
        """
363
        If given a parametrized platform name such as package[test],
364
        it returns the package part only.
365
        """
366
        return Symbol.get_base_of_parametrized_name(cpe_id)
367
368
369
def extract_subelement(objects, sub_elem_type):
370
    """
371
    From a collection of element objects, return the value of
372
    the first attribute of name sub_elem_type found.
373
374
    This is useful when the object is a single element and
375
    we wish to query some external reference identifier
376
    in the subtree of that element.
377
    """
378
379
    for obj in objects:
380
        for subelement in obj.iter():
381
            if subelement.get(sub_elem_type):
382
                sub_element = subelement.get(sub_elem_type)
383
                return sub_element
384
385
386
def extract_env_obj(objects, local_var):
387
    """
388
    From a collection of objects, return the object with id matching
389
    the object_ref of the local variable.
390
391
    NOTE: This assumes that a local variable can only reference one object.
392
    Which is not true, variables can reference multiple objects.
393
    But this assumption should work for OVAL checks for CPEs,
394
    as they are not that complicated.
395
    """
396
397
    for obj in objects:
398
        env_id = extract_subelement(local_var, 'object_ref')
399
        if env_id == obj.get('id'):
400
            return obj
401
402
    return None
403
404
405
def extract_referred_nodes(tree_with_refs, tree_with_ids, attrname):
406
    """
407
    Return the elements in tree_with_ids which are referenced
408
    from tree_with_refs via the element attribute 'attrname'.
409
    """
410
411
    reflist = []
412
    elementlist = []
413
414
    for element in tree_with_refs.iter():
415
        value = element.get(attrname)
416
        if value is not None:
417
            reflist.append(value)
418
419
    for element in tree_with_ids.iter():
420
        if element.get("id") in reflist:
421
            elementlist.append(element)
422
423
    return elementlist
424