Test Failed
Pull Request — master (#7613)
by Matěj
02:12
created

ssg.build_cpe   F

Complexity

Total Complexity 69

Size/Duplication

Total Lines 389
Duplicated Lines 0 %

Test Coverage

Coverage 55.91%

Importance

Changes 0
Metric Value
eloc 228
dl 0
loc 389
ccs 123
cts 220
cp 0.5591
rs 2.88
c 0
b 0
f 0
wmc 69

29 Methods

Rating   Name   Duplication   Size   Complexity  
A CPEList.to_file() 0 4 1
A CPEList.add() 0 2 1
A CPEItem.__init__() 0 5 1
A CPEList.__init__() 0 2 1
A CPEList.to_xml_element() 0 12 3
B ProductCPEs.load_content_cpes() 0 34 6
A ProductCPEs.__init__() 0 11 1
A ProductCPEs.get_product_cpe_names() 0 2 1
A ProductCPEs.load_product_cpes() 0 8 2
A ProductCPEs._is_name() 0 2 1
A ProductCPEs._load_cpes_list() 0 4 3
A ProductCPEs.get_cpe() 0 8 3
A ProductCPEs.get_cpe_name() 0 3 1
A CPEALLogicalTest.__eq__() 0 11 4
A CPEALPlatform.add_test() 0 2 1
A CPEALLogicalTest.add_object() 0 2 1
A CPEItem.to_xml_element() 0 13 1
A CPEALLogicalTest.to_xml_element() 0 11 2
A CPEALPlatformSpecification.__init__() 0 2 1
A CPEALPlatformSpecification.to_xml_element() 0 6 2
A CPEALFactRef.__eq__() 0 5 2
A CPEALPlatform.to_xml_element() 0 5 1
A CPEALLogicalTest.get_objects() 0 2 1
A CPEALFactRef.to_xml_element() 0 5 1
A CPEALPlatform.__eq__() 0 5 2
A CPEALPlatformSpecification.add_platform() 0 6 2
A CPEALFactRef.__init__() 0 2 1
A CPEALLogicalTest.__init__() 0 4 1
A CPEALPlatform.__init__() 0 3 1

6 Functions

Rating   Name   Duplication   Size   Complexity  
A extract_env_obj() 0 17 3
A extract_subelement() 0 22 5
B extract_referred_nodes() 0 34 7
A parse_platform_line() 0 11 3
A parse_platform_definition() 0 12 1
A convert_platform_to_id() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like ssg.build_cpe often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Common functions for building CPEs
3
"""
4
5 2
from __future__ import absolute_import
6 2
from __future__ import print_function
7 2
import os
8 2
import sys
9
10 2
from .constants import oval_namespace
11 2
from .constants import PREFIX_TO_NS
12 2
from .utils import merge_dicts, required_key
13 2
from .xml import ElementTree as ET
14 2
from .yaml import open_raw
15
16
17 2
class CPEDoesNotExist(Exception):
18 2
    pass
19
20
21 2
class ProductCPEs(object):
22
    """
23
    Reads from the disk all the yaml CPEs related to a product
24
    and provides them in a structured way.
25
    """
26
27 2
    def __init__(self, product_yaml):
28 2
        self.product_yaml = product_yaml
29
30 2
        self.cpes_by_id = {}
31 2
        self.cpes_by_name = {}
32 2
        self.product_cpes = {}
33 2
        self.cpe_platforms = {}
34 2
        self.cpe_platform_specification = CPEALPlatformSpecification()
35
36 2
        self.load_product_cpes()
37 2
        self.load_content_cpes()
38
39 2
    def _load_cpes_list(self, map_, cpes_list):
40 2
        for cpe in cpes_list:
41 2
            for cpe_id in cpe.keys():
42 2
                map_[cpe_id] = CPEItem(cpe[cpe_id])
43
44 2
    def load_product_cpes(self):
45 2
        try:
46 2
            product_cpes_list = self.product_yaml["cpes"]
47 2
            self._load_cpes_list(self.product_cpes, product_cpes_list)
48
49
        except KeyError:
50
            print("Product %s does not define 'cpes'" % (self.product_yaml["product"]))
51
            raise
52
53 2
    def load_content_cpes(self):
54
55 2
        cpes_root = required_key(self.product_yaml, "cpes_root")
56
        # we have to "absolutize" the paths the right way, relative to the product_yaml path
57 2
        if not os.path.isabs(cpes_root):
58 2
            cpes_root = os.path.join(self.product_yaml["product_dir"], cpes_root)
59
60 2
        for dir_item in sorted(os.listdir(cpes_root)):
61 2
            dir_item_path = os.path.join(cpes_root, dir_item)
62 2
            if not os.path.isfile(dir_item_path):
63
                continue
64
65 2
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
66 2
            if ext != '.yml':
67
                sys.stderr.write(
68
                    "Encountered file '%s' while looking for content CPEs, "
69
                    "extension '%s' is unknown. Skipping..\n"
70
                    % (dir_item, ext)
71
                )
72
                continue
73
74
            # Get past "cpes" key, which was added for readability of the content
75 2
            cpes_list = open_raw(dir_item_path)["cpes"]
76 2
            self._load_cpes_list(self.cpes_by_id, cpes_list)
77
78
        # Add product_cpes to map of CPEs by ID
79 2
        self.cpes_by_id = merge_dicts(self.cpes_by_id, self.product_cpes)
80
81
        # Generate a CPE map by name,
82
        # so that we can easily reference them by CPE Name
83
        # Note: After the shorthand is generated,
84
        # all references to CPEs are by its name
85 2
        for cpe_id, cpe in self.cpes_by_id.items():
86 2
            self.cpes_by_name[cpe.name] = cpe
87
88
89 2
    def _is_name(self, ref):
90
        return ref.startswith("cpe:")
91
92 2
    def get_cpe(self, ref):
93
        try:
94
            if self._is_name(ref):
95
                return self.cpes_by_name[ref]
96
            else:
97
                return self.cpes_by_id[ref]
98
        except KeyError:
99
            raise CPEDoesNotExist("CPE %s is not defined in %s" %(ref, self.product_yaml["cpes_root"]))
100
101
102 2
    def get_cpe_name(self, cpe_id):
103
        cpe = self.get_cpe(cpe_id)
104
        return cpe.name
105
106 2
    def get_product_cpe_names(self):
107 1
        return [ cpe.name for cpe in self.product_cpes.values() ]
108
109
110
111 2
class CPEList(object):
112
    """
113
    Represents the cpe-list element from the CPE standard.
114
    """
115
116 2
    prefix = "cpe-dict"
117 2
    ns = PREFIX_TO_NS[prefix]
118
119 2
    def __init__(self):
120
        self.cpe_items = []
121
122 2
    def add(self, cpe_item):
123
        self.cpe_items.append(cpe_item)
124
125 2
    def to_xml_element(self, cpe_oval_file):
126
        cpe_list = ET.Element("{%s}cpe-list" % CPEList.ns)
127
        cpe_list.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
128
        cpe_list.set("xsi:schemaLocation",
129
                     "http://cpe.mitre.org/dictionary/2.0 "
130
                     "http://cpe.mitre.org/files/cpe-dictionary_2.1.xsd")
131
132
        self.cpe_items.sort(key=lambda cpe: cpe.name)
133
        for cpe_item in self.cpe_items:
134
            cpe_list.append(cpe_item.to_xml_element(cpe_oval_file))
135
136
        return cpe_list
137
138 2
    def to_file(self, file_name, cpe_oval_file):
139
        root = self.to_xml_element(cpe_oval_file)
140
        tree = ET.ElementTree(root)
141
        tree.write(file_name, encoding="utf-8")
142
143
144 2
class CPEItem(object):
145
    """
146
    Represents the cpe-item element from the CPE standard.
147
    """
148
149 2
    prefix = "cpe-dict"
150 2
    ns = PREFIX_TO_NS[prefix]
151
152 2
    def __init__(self, cpeitem_data):
153
154 2
        self.name = cpeitem_data["name"]
155 2
        self.title = cpeitem_data["title"]
156 2
        self.check_id = cpeitem_data["check_id"]
157
158 2
    def to_xml_element(self, cpe_oval_filename):
159
        cpe_item = ET.Element("{%s}cpe-item" % CPEItem.ns)
160
        cpe_item.set('name', self.name)
161
162
        cpe_item_title = ET.SubElement(cpe_item, "{%s}title" % CPEItem.ns)
163
        cpe_item_title.set('xml:lang', "en-us")
164
        cpe_item_title.text = self.title
165
166
        cpe_item_check = ET.SubElement(cpe_item, "{%s}check" % CPEItem.ns)
167
        cpe_item_check.set('system', oval_namespace)
168
        cpe_item_check.set('href', cpe_oval_filename)
169
        cpe_item_check.text = self.check_id
170
        return cpe_item
171
172
173 2
class CPEALPlatformSpecification(object):
174
175 2
    prefix = "cpe-lang"
176 2
    ns = PREFIX_TO_NS[prefix]
177
178 2
    def __init__(self):
179 2
        self.platforms = []
180
181 2
    def add_platform(self, platform):
182
        """
183
        we check if semantically equal platform is not already in the list of platforms
184
        """
185
        if platform not in self.platforms:
186
            self.platforms.append(platform)
187
188 2
    def to_xml_element(self):
189
        cpe_platform_spec = ET.Element(
190
            "{%s}platform-specification" % CPEALPlatformSpecification.ns)
191
        for platform in self.platforms:
192
            cpe_platform_spec.append(platform.to_xml_element())
193
        return cpe_platform_spec
194
195
196 2
class CPEALPlatform(object):
197
198 2
    prefix = "cpe-lang"
199 2
    ns = PREFIX_TO_NS[prefix]
200
201 2
    def __init__(self, id):
202
        self.id = id
203
        self.test = None
204
205 2
    def add_test(self, test):
206
        self.test = test
207
208 2
    def to_xml_element(self):
209
        cpe_platform = ET.Element("{%s}platform" % CPEALPlatform.ns)
210
        cpe_platform.set('id', self.id)
211
        cpe_platform.append(self.test.to_xml_element())
212
        return cpe_platform
213
214
215 2
    def __eq__(self, other):
216
        if not isinstance(other, CPEALPlatform):
217
            return False
218
        else:
219
            return self.test == other.test
220
221
222 2
class CPEALLogicalTest(object):
223
224 2
    prefix = "cpe-lang"
225 2
    ns = PREFIX_TO_NS[prefix]
226
227 2
    def __init__(self, operator, negate):
228
        self.operator = operator
229
        self.negate = negate
230
        self.objects = []
231
232 2
    def __eq__(self, other):
233
        if not isinstance(other, CPEALLogicalTest):
234
            return False
235
        else:
236
            if self.operator == other.operator and self.negate == other.negate:
237
                diff = [
238
                    i for i in self.objects + other.objects
239
                    if i not in self.objects or i not in other.objects]
240
                return (not diff)
241
            else:
242
                return False
243
244 2
    def to_xml_element(self):
245
        cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns)
246
        cpe_test.set('operator', self.operator)
247
        cpe_test.set('negate', self.negate)
248
        # logical tests must go first, therefore we spearate tests and factrefs
249
        tests = [t for t in self.objects if isinstance(t, CPEALLogicalTest)]
250
        factrefs = [f for f in self.objects if isinstance(f, CPEALFactRef)]
251
        for obj in tests + factrefs:
252
            cpe_test.append(obj.to_xml_element())
253
254
        return cpe_test
255
256 2
    def add_object(self, object):
257
        self.objects.append(object)
258
259 2
    def get_objects(self):
260
        return self.objects
261
262
263 2
class CPEALFactRef (object):
264
265 2
    prefix = "cpe-lang"
266 2
    ns = PREFIX_TO_NS[prefix]
267
268 2
    def __init__(self, name):
269
        self.name = name
270
271 2
    def __eq__(self, other):
272
        if not isinstance(other, CPEALFactRef):
273
            return False
274
        else:
275
            return self.name == other.name
276
277 2
    def to_xml_element(self):
278
        cpe_factref = ET.Element("{%s}fact-ref" % CPEALFactRef.ns)
279
        cpe_factref.set('name', self.name)
280
281
        return cpe_factref
282
283 2
def extract_subelement(objects, sub_elem_type):
284
    """
285
    From a collection of element objects, return the value of
286
    the first attribute of name sub_elem_type found.
287
288
    This is useful when the object is a single element and
289
    we wish to query some external reference identifier
290
    in the subtree of that element.
291
    """
292
293 2
    for obj in objects:
294
        # decide on usage of .iter or .getiterator method of elementtree class.
295
        # getiterator is deprecated in Python 3.9, but iter is not available in
296
        # older versions
297 2
        if getattr(obj, "iter", None) == None:
298
            obj_iterator = obj.getiterator()
299
        else:
300 2
            obj_iterator = obj.iter()
301 2
        for subelement in obj_iterator:
302 2
            if subelement.get(sub_elem_type):
303 2
                sub_element = subelement.get(sub_elem_type)
304 2
                return sub_element
305
306
307 2
def extract_env_obj(objects, local_var):
308
    """
309
    From a collection of objects, return the object with id matching
310
    the object_ref of the local variable.
311
312
    NOTE: This assumes that a local variable can only reference one object.
313
    Which is not true, variables can reference multiple objects.
314
    But this assumption should work for OVAL checks for CPEs,
315
    as they are not that complicated.
316
    """
317
318 2
    for obj in objects:
319 2
        env_id = extract_subelement(local_var, 'object_ref')
320 2
        if env_id == obj.get('id'):
321 2
            return obj
322
323 2
    return None
324
325
326 2
def extract_referred_nodes(tree_with_refs, tree_with_ids, attrname):
327
    """
328
    Return the elements in tree_with_ids which are referenced
329
    from tree_with_refs via the element attribute 'attrname'.
330
    """
331
332 2
    reflist = []
333 2
    elementlist = []
334
335
336
    # decide on usage of .iter or .getiterator method of elementtree class.
337
    # getiterator is deprecated in Python 3.9, but iter is not available in
338
    # older versions
339 2
    if getattr(tree_with_refs, "iter", None) == None:
340
        tree_with_refs_iterator = tree_with_refs.getiterator()
341
    else:
342 2
        tree_with_refs_iterator = tree_with_refs.iter()
343 2
    for element in tree_with_refs_iterator:
344 2
        value = element.get(attrname)
345 2
        if value is not None:
346 2
            reflist.append(value)
347
348
    # decide on usage of .iter or .getiterator method of elementtree class.
349
    # getiterator is deprecated in Python 3.9, but iter is not available in
350
    # older versions
351 2
    if getattr(tree_with_ids, "iter", None) == None:
352
        tree_with_ids_iterator = tree_with_ids.getiterator()
353
    else:
354 2
        tree_with_ids_iterator = tree_with_ids.iter()
355 2
    for element in tree_with_ids_iterator:
356 2
        if element.get("id") in reflist:
357 2
            elementlist.append(element)
358
359 2
    return elementlist
360
361 2
def parse_platform_line(platform_line, product_cpe):
362
    # remove spaces
363
    platform_line = platform_line.replace(" ", "")
364
    if "&" in platform_line:
365
        raise (NotImplementedError("not implemented yet"))
366
    elif "!" in platform_line:
367
        raise (NotImplementedError("not implemented yet"))
368
    else:
369
        # the line should contain a CPEAL ref name
370
        cpealfactref = CPEALFactRef(product_cpe.get_cpe_name(platform_line))
371
        return cpealfactref
372
373 2
def convert_platform_to_id(platform):
374
    id = platform.replace(" ", "")
375
    return id
376
377 2
def parse_platform_definition(platform_line, product_cpes):
378
    """
379
    This function takes one line of platform definition from yaml file and returns a
380
    CPE platform with appropriate tests and factrefs.
381
    """
382
    # let's construct the platform id
383
    id = "cpe_platform_" + convert_platform_to_id(platform_line)
384
    platform = CPEALPlatform(id)
385
    initial_test = CPEALLogicalTest(operator="OR", negate="false")
386
    platform.add_test(initial_test)
387
    initial_test.add_object(parse_platform_line(platform_line, product_cpes))
388
    return platform
389