1 | """ |
||
2 | Common functions for building CPEs |
||
3 | """ |
||
4 | |||
5 | from __future__ import absolute_import |
||
6 | from __future__ import print_function |
||
7 | import os |
||
8 | import sys |
||
9 | import ssg.id_translate |
||
10 | |||
11 | from .constants import oval_namespace |
||
12 | from .constants import PREFIX_TO_NS |
||
13 | from .utils import required_key, apply_formatting_on_dict_values |
||
14 | from .xml import ElementTree as ET |
||
15 | from .boolean_expression import Algebra, Symbol, Function |
||
16 | from .entities.common import XCCDFEntity, Templatable |
||
17 | from .yaml import convert_string_to_bool |
||
18 | |||
19 | |||
20 | class CPEDoesNotExist(Exception): |
||
21 | pass |
||
22 | |||
23 | |||
24 | class ProductCPEs(object): |
||
25 | """ |
||
26 | Reads from the disk all the yaml CPEs related to a product |
||
27 | and provides them in a structured way. |
||
28 | """ |
||
29 | |||
30 | def __init__(self): |
||
31 | |||
32 | self.cpes_by_id = {} |
||
33 | self.cpes_by_name = {} |
||
34 | self.product_cpes = {} |
||
35 | self.platforms = {} |
||
36 | self.cpe_oval_href = "" |
||
37 | self.algebra = Algebra( |
||
38 | symbol_cls=CPEALCheckFactRef, function_cls=CPEALLogicalTest) |
||
39 | |||
40 | def load_product_cpes(self, env_yaml): |
||
41 | self.cpe_oval_href = "ssg-" + env_yaml["product"] + "-cpe-oval.xml" |
||
42 | try: |
||
43 | product_cpes_list = env_yaml["cpes"] |
||
44 | for cpe_dict_repr in product_cpes_list: |
||
45 | for cpe_id, cpe in cpe_dict_repr.items(): |
||
46 | # these product CPEs defined in product.yml are defined |
||
47 | # differently than CPEs in shared/applicability/*.yml |
||
48 | # therefore we have to place the ID at the place where it is expected |
||
49 | cpe["id_"] = cpe_id |
||
50 | cpe_item = CPEItem.get_instance_from_full_dict(cpe) |
||
51 | cpe_item.is_product_cpe = True |
||
52 | self.add_cpe_item(cpe_item) |
||
53 | except KeyError as exc: |
||
54 | raise exc("Product %s does not define 'cpes'" % (env_yaml["product"])) |
||
55 | |||
56 | def load_content_cpes(self, env_yaml): |
||
57 | cpes_root = required_key(env_yaml, "cpes_root") |
||
58 | if not os.path.isabs(cpes_root): |
||
59 | cpes_root = os.path.join(env_yaml["product_dir"], cpes_root) |
||
60 | self.load_cpes_from_directory_tree(cpes_root, env_yaml) |
||
61 | |||
62 | def load_cpes_from_directory_tree(self, root_path, env_yaml): |
||
63 | for dir_item in sorted(os.listdir(root_path)): |
||
64 | dir_item_path = os.path.join(root_path, dir_item) |
||
65 | if not os.path.isfile(dir_item_path): |
||
66 | continue |
||
67 | |||
68 | _, ext = os.path.splitext(os.path.basename(dir_item_path)) |
||
69 | if ext != '.yml': |
||
70 | sys.stderr.write( |
||
71 | "Encountered file '%s' while looking for content CPEs, " |
||
72 | "extension '%s' is unknown. Skipping..\n" |
||
73 | % (dir_item, ext) |
||
74 | ) |
||
75 | continue |
||
76 | |||
77 | cpe_item = CPEItem.from_yaml(dir_item_path, env_yaml) |
||
78 | self.add_cpe_item(cpe_item) |
||
79 | |||
80 | def add_cpe_item(self, cpe_item): |
||
81 | self.cpes_by_id[cpe_item.id_] = cpe_item |
||
82 | self.cpes_by_name[cpe_item.name] = cpe_item |
||
83 | if cpe_item.is_product_cpe: |
||
84 | self.product_cpes[cpe_item.id_] = cpe_item |
||
85 | |||
86 | def get_cpe(self, cpe_id_or_name): |
||
87 | try: |
||
88 | if CPEItem.is_cpe_name(cpe_id_or_name): |
||
89 | return self.cpes_by_name[cpe_id_or_name] |
||
90 | else: |
||
91 | if CPEALCheckFactRef.cpe_id_is_parametrized(cpe_id_or_name): |
||
92 | cpe_id_or_name = CPEALCheckFactRef.get_base_name_of_parametrized_cpe_id( |
||
93 | cpe_id_or_name) |
||
94 | return self.cpes_by_id[cpe_id_or_name] |
||
95 | except KeyError: |
||
96 | raise CPEDoesNotExist("CPE %s is not defined" % cpe_id_or_name) |
||
97 | |||
98 | def add_resolved_cpe_items_from_platform(self, platform): |
||
99 | for fact_ref in platform.get_fact_refs(): |
||
100 | if fact_ref.arg: # the CPE item is parametrized |
||
101 | try: |
||
102 | # if there already exists a CPE item with factref's ID |
||
103 | # we can just use it right away, no new CPE items need to be created |
||
104 | cpe = self.get_cpe_for_fact_ref(fact_ref) |
||
105 | fact_ref.cpe_name = cpe.name |
||
106 | except CPEDoesNotExist: |
||
107 | # if the CPE item with factref's ID does not exist |
||
108 | # it means that we need to create a new CPE item |
||
109 | # which will have parameters in place |
||
110 | cpe = self.get_cpe(fact_ref.cpe_name) |
||
111 | new_cpe = cpe.create_resolved_cpe_item_for_fact_ref(fact_ref) |
||
112 | self.add_cpe_item(new_cpe) |
||
113 | fact_ref.cpe_name = new_cpe.name |
||
114 | |||
115 | def get_cpe_for_fact_ref(self, fact_ref): |
||
116 | return self.get_cpe(fact_ref.as_id()) |
||
117 | |||
118 | def get_cpe_name(self, cpe_id): |
||
119 | cpe = self.get_cpe(cpe_id) |
||
120 | return cpe.name |
||
121 | |||
122 | def get_product_cpe_names(self): |
||
123 | return [cpe.name for cpe in self.product_cpes.values()] |
||
124 | |||
125 | |||
126 | class CPEList(object): |
||
127 | """ |
||
128 | Represents the cpe-list element from the CPE standard. |
||
129 | """ |
||
130 | |||
131 | prefix = "cpe-dict" |
||
132 | ns = PREFIX_TO_NS[prefix] |
||
133 | |||
134 | def __init__(self): |
||
135 | self.cpe_items = [] |
||
136 | |||
137 | def add(self, cpe_item): |
||
138 | self.cpe_items.append(cpe_item) |
||
139 | |||
140 | def to_xml_element(self, cpe_oval_file): |
||
141 | cpe_list = ET.Element("{%s}cpe-list" % CPEList.ns) |
||
142 | cpe_list.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance") |
||
143 | cpe_list.set("xsi:schemaLocation", |
||
144 | "http://cpe.mitre.org/dictionary/2.0 " |
||
145 | "http://cpe.mitre.org/files/cpe-dictionary_2.1.xsd") |
||
146 | |||
147 | self.cpe_items.sort(key=lambda cpe: cpe.name) |
||
148 | for cpe_item in self.cpe_items: |
||
149 | cpe_list.append(cpe_item.to_xml_element(cpe_oval_file)) |
||
150 | |||
151 | return cpe_list |
||
152 | |||
153 | def to_file(self, file_name, cpe_oval_file): |
||
154 | root = self.to_xml_element(cpe_oval_file) |
||
155 | tree = ET.ElementTree(root) |
||
156 | tree.write(file_name, encoding="utf-8") |
||
157 | |||
158 | |||
159 | class CPEItem(XCCDFEntity, Templatable): |
||
160 | """ |
||
161 | Represents the cpe-item element from the CPE standard. |
||
162 | """ |
||
163 | |||
164 | KEYS = dict( |
||
165 | name=lambda: "", |
||
166 | check_id=lambda: "", |
||
167 | bash_conditional=lambda: "", |
||
168 | ansible_conditional=lambda: "", |
||
169 | is_product_cpe=lambda: False, |
||
170 | versioned=lambda: False, |
||
171 | args=lambda: {}, |
||
172 | ** XCCDFEntity.KEYS |
||
173 | ) |
||
174 | KEYS.update(**Templatable.KEYS) |
||
175 | |||
176 | MANDATORY_KEYS = [ |
||
177 | "name", |
||
178 | ] |
||
179 | |||
180 | prefix = "cpe-dict" |
||
181 | ns = PREFIX_TO_NS[prefix] |
||
182 | |||
183 | @property |
||
184 | def cpe_oval_short_def_id(self): |
||
185 | return self.check_id or self.id_ |
||
186 | |||
187 | @property |
||
188 | def cpe_oval_def_id(self): |
||
189 | translator = ssg.id_translate.IDTranslator("ssg") |
||
190 | full_id = translator.generate_id( |
||
191 | "{" + oval_namespace + "}definition", self.cpe_oval_short_def_id) |
||
192 | return full_id |
||
193 | |||
194 | def to_xml_element(self, cpe_oval_filename): |
||
195 | cpe_item = ET.Element("{%s}cpe-item" % CPEItem.ns) |
||
196 | cpe_item.set('name', self.name) |
||
197 | |||
198 | cpe_item_title = ET.SubElement(cpe_item, "{%s}title" % CPEItem.ns) |
||
199 | cpe_item_title.set('xml:lang', "en-us") |
||
200 | cpe_item_title.text = self.title |
||
201 | |||
202 | cpe_item_check = ET.SubElement(cpe_item, "{%s}check" % CPEItem.ns) |
||
203 | cpe_item_check.set('system', oval_namespace) |
||
204 | cpe_item_check.set('href', cpe_oval_filename) |
||
205 | cpe_item_check.text = self.cpe_oval_short_def_id |
||
206 | return cpe_item |
||
207 | |||
208 | @classmethod |
||
209 | def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None): |
||
210 | cpe_item = super(CPEItem, cls).from_yaml(yaml_file, env_yaml, product_cpes) |
||
211 | if cpe_item.is_product_cpe: |
||
212 | cpe_item.is_product_cpe = convert_string_to_bool(cpe_item.is_product_cpe) |
||
213 | if cpe_item.versioned: |
||
214 | cpe_item.versioned = convert_string_to_bool(cpe_item.versioned) |
||
215 | return cpe_item |
||
216 | |||
217 | def set_template_variables(self, *sources): |
||
218 | if self.is_templated(): |
||
219 | self.template["vars"] = {} |
||
220 | for source in sources: |
||
221 | self.template["vars"].update(source) |
||
222 | |||
223 | def create_resolved_cpe_item_for_fact_ref(self, fact_ref): |
||
224 | if fact_ref.has_version_specs(): |
||
225 | if not self.versioned: |
||
226 | raise ValueError("CPE entity '{0}' does not support version specifiers: " |
||
227 | "{1}".format(self.id_, fact_ref.cpe_name)) |
||
228 | try: |
||
229 | resolved_parameters = self.args[fact_ref.arg] |
||
230 | except KeyError: |
||
231 | raise KeyError( |
||
232 | "The {0} CPE item does not support the argument {1}. " |
||
233 | "Following arguments are supported: {2}".format( |
||
234 | self.id_, fact_ref.arg, [a for a in self.args.keys()])) |
||
235 | resolved_parameters.update(fact_ref.as_dict()) |
||
236 | cpe_item_as_dict = self.represent_as_dict() |
||
237 | cpe_item_as_dict["args"] = None |
||
238 | cpe_item_as_dict["id_"] = fact_ref.as_id() |
||
239 | new_associated_cpe_item_as_dict = apply_formatting_on_dict_values( |
||
240 | cpe_item_as_dict, resolved_parameters) |
||
241 | new_associated_cpe_item = CPEItem.get_instance_from_full_dict( |
||
242 | new_associated_cpe_item_as_dict) |
||
243 | new_associated_cpe_item.set_template_variables(resolved_parameters) |
||
244 | return new_associated_cpe_item |
||
245 | |||
246 | @staticmethod |
||
247 | def is_cpe_name(cpe_id_or_name): |
||
248 | return cpe_id_or_name.startswith("cpe:") |
||
249 | |||
250 | def set_conditional(self, language, content): |
||
251 | if language == "ansible": |
||
252 | self.ansible_conditional = content |
||
253 | elif language == "bash": |
||
254 | self.bash_conditional = content |
||
255 | else: |
||
256 | raise RuntimeError( |
||
257 | "The language {0} is not supported as conditional for CPE".format(language)) |
||
258 | |||
259 | |||
260 | class CPEALLogicalTest(Function): |
||
261 | |||
262 | prefix = "cpe-lang" |
||
263 | ns = PREFIX_TO_NS[prefix] |
||
264 | |||
265 | def to_xml_element(self): |
||
266 | cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns) |
||
267 | cpe_test.set('operator', ('OR' if self.is_or() else 'AND')) |
||
268 | cpe_test.set('negate', ('true' if self.is_not() else 'false')) |
||
269 | # Logical tests must go first, therefore we separate tests and factrefs |
||
270 | tests = [t for t in self.args if isinstance(t, CPEALLogicalTest)] |
||
271 | factrefs = [f for f in self.args if isinstance(f, CPEALCheckFactRef)] |
||
272 | for obj in tests + factrefs: |
||
273 | cpe_test.append(obj.to_xml_element()) |
||
274 | |||
275 | return cpe_test |
||
276 | |||
277 | def enrich_with_cpe_info(self, cpe_products): |
||
278 | for arg in self.args: |
||
279 | arg.enrich_with_cpe_info(cpe_products) |
||
280 | |||
281 | def to_bash_conditional(self): |
||
282 | child_bash_conds = [ |
||
283 | a.to_bash_conditional() for a in self.args |
||
284 | if a.to_bash_conditional() != ''] |
||
285 | |||
286 | if not child_bash_conds: |
||
287 | return "" |
||
288 | |||
289 | cond = "" |
||
290 | if self.is_not(): |
||
291 | cond += "! " |
||
292 | op = " " |
||
293 | cond += "( " |
||
294 | if self.is_or(): |
||
295 | op = " || " |
||
296 | elif self.is_and(): |
||
297 | op = " && " |
||
298 | cond += op.join(child_bash_conds) |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
299 | cond += " )" |
||
300 | return cond |
||
301 | |||
302 | def to_ansible_conditional(self): |
||
303 | child_ansible_conds = [ |
||
304 | a.to_ansible_conditional() for a in self.args |
||
305 | if a.to_ansible_conditional() != ''] |
||
306 | |||
307 | if not child_ansible_conds: |
||
308 | return "" |
||
309 | |||
310 | cond = "" |
||
311 | if self.is_not(): |
||
312 | cond += "not " |
||
313 | op = " " |
||
314 | cond += "( " |
||
315 | if self.is_or(): |
||
316 | op = " or " |
||
317 | elif self.is_and(): |
||
318 | op = " and " |
||
319 | cond += op.join(child_ansible_conds) |
||
0 ignored issues
–
show
|
|||
320 | cond += " )" |
||
321 | return cond |
||
322 | |||
323 | |||
324 | class CPEALCheckFactRef(Symbol): |
||
325 | |||
326 | prefix = "cpe-lang" |
||
327 | ns = PREFIX_TO_NS[prefix] |
||
328 | |||
329 | def __init__(self, obj): |
||
330 | super(CPEALCheckFactRef, self).__init__(obj) |
||
331 | self.cpe_name = obj # we do not want to modify original name used for platforms |
||
332 | self.bash_conditional = "" |
||
333 | self.ansible_conditional = "" |
||
334 | |||
335 | def enrich_with_cpe_info(self, cpe_products): |
||
336 | self.cpe_oval_href = cpe_products.cpe_oval_href |
||
337 | cpe_item = cpe_products.get_cpe(self.cpe_name) |
||
338 | self.bash_conditional = cpe_item.bash_conditional |
||
339 | self.ansible_conditional = cpe_item.ansible_conditional |
||
340 | self.cpe_name = cpe_products.get_cpe_name(self.cpe_name) |
||
341 | self.cpe_oval_def_id = cpe_item.cpe_oval_def_id |
||
342 | |||
343 | def to_xml_element(self): |
||
344 | el = ET.Element("{%s}check-fact-ref" % CPEALCheckFactRef.ns) |
||
345 | el.set("system", oval_namespace) |
||
346 | el.set("href", self.cpe_oval_href) |
||
347 | el.set("id-ref", self.cpe_oval_def_id) |
||
348 | return el |
||
349 | |||
350 | def to_bash_conditional(self): |
||
351 | return self.bash_conditional |
||
352 | |||
353 | def to_ansible_conditional(self): |
||
354 | return self.ansible_conditional |
||
355 | |||
356 | @staticmethod |
||
357 | def cpe_id_is_parametrized(cpe_id): |
||
358 | return Symbol.is_parametrized(cpe_id) |
||
359 | |||
360 | @staticmethod |
||
361 | def get_base_name_of_parametrized_cpe_id(cpe_id): |
||
362 | """ |
||
363 | If given a parametrized platform name such as package[test], |
||
364 | it returns the package part only. |
||
365 | """ |
||
366 | return Symbol.get_base_of_parametrized_name(cpe_id) |
||
367 | |||
368 | |||
369 | def extract_subelement(objects, sub_elem_type): |
||
370 | """ |
||
371 | From a collection of element objects, return the value of |
||
372 | the first attribute of name sub_elem_type found. |
||
373 | |||
374 | This is useful when the object is a single element and |
||
375 | we wish to query some external reference identifier |
||
376 | in the subtree of that element. |
||
377 | """ |
||
378 | |||
379 | for obj in objects: |
||
380 | for subelement in obj.iter(): |
||
381 | if subelement.get(sub_elem_type): |
||
382 | sub_element = subelement.get(sub_elem_type) |
||
383 | return sub_element |
||
384 | |||
385 | |||
386 | def extract_env_obj(objects, local_var): |
||
387 | """ |
||
388 | From a collection of objects, return the object with id matching |
||
389 | the object_ref of the local variable. |
||
390 | |||
391 | NOTE: This assumes that a local variable can only reference one object. |
||
392 | Which is not true, variables can reference multiple objects. |
||
393 | But this assumption should work for OVAL checks for CPEs, |
||
394 | as they are not that complicated. |
||
395 | """ |
||
396 | |||
397 | for obj in objects: |
||
398 | env_id = extract_subelement(local_var, 'object_ref') |
||
399 | if env_id == obj.get('id'): |
||
400 | return obj |
||
401 | |||
402 | return None |
||
403 | |||
404 | |||
405 | def extract_referred_nodes(tree_with_refs, tree_with_ids, attrname): |
||
406 | """ |
||
407 | Return the elements in tree_with_ids which are referenced |
||
408 | from tree_with_refs via the element attribute 'attrname'. |
||
409 | """ |
||
410 | |||
411 | reflist = [] |
||
412 | elementlist = [] |
||
413 | |||
414 | for element in tree_with_refs.iter(): |
||
415 | value = element.get(attrname) |
||
416 | if value is not None: |
||
417 | reflist.append(value) |
||
418 | |||
419 | for element in tree_with_ids.iter(): |
||
420 | if element.get("id") in reflist: |
||
421 | elementlist.append(element) |
||
422 | |||
423 | return elementlist |
||
424 |