Test Failed
Push — master ( 36ef11...81e955 )
by Jan
03:06 queued 24s
created

CPEALLogicalTest.enrich_with_cpe_info()   A

Complexity

Conditions 2

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 3
nop 2
dl 0
loc 3
ccs 3
cts 3
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
"""
2
Common functions for building CPEs
3
"""
4
5 2
from __future__ import absolute_import
6 2
from __future__ import print_function
7 2
import os
8 2
import sys
9
10 2
from .constants import oval_namespace
11 2
from .constants import PREFIX_TO_NS
12 2
from .utils import merge_dicts, required_key
13 2
from .xml import ElementTree as ET
14 2
from .yaml import open_and_macro_expand
15 2
from .boolean_expression import Algebra, Symbol, Function
16
17 2
class CPEDoesNotExist(Exception):
18 2
    pass
19
20 2
class ProductCPEs(object):
21
    """
22
    Reads from the disk all the yaml CPEs related to a product
23
    and provides them in a structured way.
24
    """
25
26 2
    def __init__(self, product_yaml):
27 2
        self.product_yaml = product_yaml
28
29 2
        self.cpes_by_id = {}
30 2
        self.cpes_by_name = {}
31 2
        self.product_cpes = {}
32 2
        self.platforms = {}
33 2
        self.algebra = Algebra(symbol_cls=CPEALFactRef, function_cls=CPEALLogicalTest)
34
35 2
        self.load_product_cpes()
36 2
        self.load_content_cpes()
37
38 2
    def _load_cpes_list(self, map_, cpes_list):
39 2
        for cpe in cpes_list:
40 2
            for cpe_id in cpe.keys():
41 2
                map_[cpe_id] = CPEItem(cpe[cpe_id])
42
43 2
    def load_product_cpes(self):
44 2
        try:
45 2
            product_cpes_list = self.product_yaml["cpes"]
46 2
            self._load_cpes_list(self.product_cpes, product_cpes_list)
47
48
        except KeyError:
49
            print("Product %s does not define 'cpes'" % (self.product_yaml["product"]))
50
            raise
51
52 2
    def load_content_cpes(self):
53
54 2
        cpes_root = required_key(self.product_yaml, "cpes_root")
55
        # we have to "absolutize" the paths the right way, relative to the product_yaml path
56 2
        if not os.path.isabs(cpes_root):
57 2
            cpes_root = os.path.join(self.product_yaml["product_dir"], cpes_root)
58
59 2
        for dir_item in sorted(os.listdir(cpes_root)):
60 2
            dir_item_path = os.path.join(cpes_root, dir_item)
61 2
            if not os.path.isfile(dir_item_path):
62
                continue
63
64 2
            _, ext = os.path.splitext(os.path.basename(dir_item_path))
65 2
            if ext != '.yml':
66
                sys.stderr.write(
67
                    "Encountered file '%s' while looking for content CPEs, "
68
                    "extension '%s' is unknown. Skipping..\n"
69
                    % (dir_item, ext)
70
                )
71
                continue
72
73
            # Get past "cpes" key, which was added for readability of the content
74 2
            cpes_list = open_and_macro_expand(dir_item_path, self.product_yaml)["cpes"]
75 2
            self._load_cpes_list(self.cpes_by_id, cpes_list)
76
77
        # Add product_cpes to map of CPEs by ID
78 2
        self.cpes_by_id = merge_dicts(self.cpes_by_id, self.product_cpes)
79
80
        # Generate a CPE map by name,
81
        # so that we can easily reference them by CPE Name
82
        # Note: After the shorthand is generated,
83
        # all references to CPEs are by its name
84 2
        for cpe_id, cpe in self.cpes_by_id.items():
85 2
            self.cpes_by_name[cpe.name] = cpe
86
87
88 2
    def _is_name(self, ref):
89 2
        return ref.startswith("cpe:")
90
91 2
    def get_cpe(self, ref):
92 2
        try:
93 2
            if self._is_name(ref):
94 2
                return self.cpes_by_name[ref]
95
            else:
96 2
                return self.cpes_by_id[ref]
97 2
        except KeyError:
98 2
            raise CPEDoesNotExist("CPE %s is not defined in %s" %(ref, self.product_yaml["cpes_root"]))
99
100
101 2
    def get_cpe_name(self, cpe_id):
102 2
        cpe = self.get_cpe(cpe_id)
103 2
        return cpe.name
104
105 2
    def get_product_cpe_names(self):
106 1
        return [ cpe.name for cpe in self.product_cpes.values() ]
107
108
109
110 2
class CPEList(object):
111
    """
112
    Represents the cpe-list element from the CPE standard.
113
    """
114
115 2
    prefix = "cpe-dict"
116 2
    ns = PREFIX_TO_NS[prefix]
117
118 2
    def __init__(self):
119
        self.cpe_items = []
120
121 2
    def add(self, cpe_item):
122
        self.cpe_items.append(cpe_item)
123
124 2
    def to_xml_element(self, cpe_oval_file):
125
        cpe_list = ET.Element("{%s}cpe-list" % CPEList.ns)
126
        cpe_list.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
127
        cpe_list.set("xsi:schemaLocation",
128
                     "http://cpe.mitre.org/dictionary/2.0 "
129
                     "http://cpe.mitre.org/files/cpe-dictionary_2.1.xsd")
130
131
        self.cpe_items.sort(key=lambda cpe: cpe.name)
132
        for cpe_item in self.cpe_items:
133
            cpe_list.append(cpe_item.to_xml_element(cpe_oval_file))
134
135
        return cpe_list
136
137 2
    def to_file(self, file_name, cpe_oval_file):
138
        root = self.to_xml_element(cpe_oval_file)
139
        tree = ET.ElementTree(root)
140
        tree.write(file_name, encoding="utf-8")
141
142
143 2
class CPEItem(object):
144
    """
145
    Represents the cpe-item element from the CPE standard.
146
    """
147
148 2
    prefix = "cpe-dict"
149 2
    ns = PREFIX_TO_NS[prefix]
150
151 2
    def __init__(self, cpeitem_data):
152
153 2
        self.name = cpeitem_data["name"]
154 2
        self.title = cpeitem_data["title"]
155 2
        self.check_id = cpeitem_data["check_id"]
156 2
        self.bash_conditional = cpeitem_data.get("bash_conditional", "")
157 2
        self.ansible_conditional = cpeitem_data.get("ansible_conditional", "")
158
159 2
    def to_xml_element(self, cpe_oval_filename):
160
        cpe_item = ET.Element("{%s}cpe-item" % CPEItem.ns)
161
        cpe_item.set('name', self.name)
162
163
        cpe_item_title = ET.SubElement(cpe_item, "{%s}title" % CPEItem.ns)
164
        cpe_item_title.set('xml:lang', "en-us")
165
        cpe_item_title.text = self.title
166
167
        cpe_item_check = ET.SubElement(cpe_item, "{%s}check" % CPEItem.ns)
168
        cpe_item_check.set('system', oval_namespace)
169
        cpe_item_check.set('href', cpe_oval_filename)
170
        cpe_item_check.text = self.check_id
171
        return cpe_item
172
173
174 2
class CPEALLogicalTest(Function):
175
176 2
    prefix = "cpe-lang"
177 2
    ns = PREFIX_TO_NS[prefix]
178
179
180 2
    def to_xml_element(self):
181 2
        cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns)
182 2
        cpe_test.set('operator', ('OR' if self.is_or() else 'AND'))
183 2
        cpe_test.set('negate', ('true' if self.is_not() else 'false'))
184
        # logical tests must go first, therefore we separate tests and factrefs
185 2
        tests = [t for t in self.args if isinstance(t, CPEALLogicalTest)]
186 2
        factrefs = [f for f in self.args if isinstance(f, CPEALFactRef)]
187 2
        for obj in tests + factrefs:
188 2
            cpe_test.append(obj.to_xml_element())
189
190 2
        return cpe_test
191
192 2
    def enrich_with_cpe_info(self, cpe_products):
193 2
        for arg in self.args:
194 2
            arg.enrich_with_cpe_info(cpe_products)
195
196 2
    def to_bash_conditional(self):
197 2
        cond = ""
198 2
        if self.is_not():
199 2
            cond += "! "
200 2
            op = " "
201 2
        cond += "( "
202 2
        child_bash_conds = [
203
            a.to_bash_conditional() for a in self.args
204
            if a.to_bash_conditional() != '']
205 2
        if self.is_or():
206 2
            op = " || "
207 2
        elif self.is_and():
208 2
            op = " && "
209 2
        cond += op.join(child_bash_conds)
0 ignored issues
show
introduced by
The variable op does not seem to be defined in case self.is_not() on line 198 is False. Are you sure this can never be the case?
Loading history...
210 2
        cond += " )"
211 2
        return cond
212
213 2
    def to_ansible_conditional(self):
214 2
        cond = ""
215 2
        if self.is_not():
216 2
            cond += "not "
217 2
            op = " "
218 2
        cond += "( "
219 2
        child_ansible_conds = [
220
            a.to_ansible_conditional() for a in self.args
221
            if a.to_ansible_conditional() != '']
222 2
        if self.is_or():
223 2
            op = " or "
224 2
        elif self.is_and():
225 2
            op = " and "
226 2
        cond += op.join(child_ansible_conds)
0 ignored issues
show
introduced by
The variable op does not seem to be defined in case self.is_not() on line 215 is False. Are you sure this can never be the case?
Loading history...
227 2
        cond += " )"
228 2
        return cond
229
230
231 2
class CPEALFactRef (Symbol):
232
233 2
    prefix = "cpe-lang"
234 2
    ns = PREFIX_TO_NS[prefix]
235
236 2
    def __init__(self, obj):
237 2
        super(CPEALFactRef, self).__init__(obj)
238 2
        self.cpe_name = obj  # we do not want to modify original name used for platforms
239 2
        self.bash_conditional = ""
240 2
        self.ansible_conditional = ""
241
242 2
    def enrich_with_cpe_info(self, cpe_products):
243 2
        self.bash_conditional = cpe_products.get_cpe(self.cpe_name).bash_conditional
244 2
        self.ansible_conditional = cpe_products.get_cpe(self.cpe_name).ansible_conditional
245 2
        self.cpe_name = cpe_products.get_cpe_name(self.cpe_name)
246
247 2
    def to_xml_element(self):
248 2
        cpe_factref = ET.Element("{%s}fact-ref" % CPEALFactRef.ns)
249 2
        cpe_factref.set('name', self.cpe_name)
250
251 2
        return cpe_factref
252
253 2
    def to_bash_conditional(self):
254 2
        return self.bash_conditional
255
256 2
    def to_ansible_conditional(self):
257 2
        return self.ansible_conditional
258
259 2
def extract_subelement(objects, sub_elem_type):
260
    """
261
    From a collection of element objects, return the value of
262
    the first attribute of name sub_elem_type found.
263
264
    This is useful when the object is a single element and
265
    we wish to query some external reference identifier
266
    in the subtree of that element.
267
    """
268
269 2
    for obj in objects:
270
        # decide on usage of .iter or .getiterator method of elementtree class.
271
        # getiterator is deprecated in Python 3.9, but iter is not available in
272
        # older versions
273 2
        if getattr(obj, "iter", None) == None:
274
            obj_iterator = obj.getiterator()
275
        else:
276 2
            obj_iterator = obj.iter()
277 2
        for subelement in obj_iterator:
278 2
            if subelement.get(sub_elem_type):
279 2
                sub_element = subelement.get(sub_elem_type)
280 2
                return sub_element
281
282
283 2
def extract_env_obj(objects, local_var):
284
    """
285
    From a collection of objects, return the object with id matching
286
    the object_ref of the local variable.
287
288
    NOTE: This assumes that a local variable can only reference one object.
289
    Which is not true, variables can reference multiple objects.
290
    But this assumption should work for OVAL checks for CPEs,
291
    as they are not that complicated.
292
    """
293
294 2
    for obj in objects:
295 2
        env_id = extract_subelement(local_var, 'object_ref')
296 2
        if env_id == obj.get('id'):
297 2
            return obj
298
299 2
    return None
300
301
302 2
def extract_referred_nodes(tree_with_refs, tree_with_ids, attrname):
303
    """
304
    Return the elements in tree_with_ids which are referenced
305
    from tree_with_refs via the element attribute 'attrname'.
306
    """
307
308 2
    reflist = []
309 2
    elementlist = []
310
311
312
    # decide on usage of .iter or .getiterator method of elementtree class.
313
    # getiterator is deprecated in Python 3.9, but iter is not available in
314
    # older versions
315 2
    if getattr(tree_with_refs, "iter", None) == None:
316
        tree_with_refs_iterator = tree_with_refs.getiterator()
317
    else:
318 2
        tree_with_refs_iterator = tree_with_refs.iter()
319 2
    for element in tree_with_refs_iterator:
320 2
        value = element.get(attrname)
321 2
        if value is not None:
322 2
            reflist.append(value)
323
324
    # decide on usage of .iter or .getiterator method of elementtree class.
325
    # getiterator is deprecated in Python 3.9, but iter is not available in
326
    # older versions
327 2
    if getattr(tree_with_ids, "iter", None) == None:
328
        tree_with_ids_iterator = tree_with_ids.getiterator()
329
    else:
330 2
        tree_with_ids_iterator = tree_with_ids.iter()
331 2
    for element in tree_with_ids_iterator:
332 2
        if element.get("id") in reflist:
333 2
            elementlist.append(element)
334
335
    return elementlist
336