1
|
|
|
""" |
2
|
|
|
Common functions for building CPEs |
3
|
|
|
""" |
4
|
|
|
|
5
|
|
|
from __future__ import absolute_import |
6
|
|
|
from __future__ import print_function |
7
|
|
|
import os |
8
|
|
|
import sys |
9
|
|
|
import ssg.id_translate |
10
|
|
|
|
11
|
|
|
from .constants import oval_namespace |
12
|
|
|
from .constants import PREFIX_TO_NS |
13
|
|
|
from .utils import required_key, apply_formatting_on_dict_values |
14
|
|
|
from .xml import ElementTree as ET |
15
|
|
|
from .boolean_expression import Algebra, Symbol, Function |
16
|
|
|
from .entities.common import XCCDFEntity, Templatable |
17
|
|
|
from .yaml import convert_string_to_bool |
18
|
|
|
|
19
|
|
|
|
20
|
|
|
class CPEDoesNotExist(Exception): |
21
|
|
|
pass |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
class ProductCPEs(object): |
25
|
|
|
""" |
26
|
|
|
Reads from the disk all the yaml CPEs related to a product |
27
|
|
|
and provides them in a structured way. |
28
|
|
|
""" |
29
|
|
|
|
30
|
|
|
def __init__(self): |
31
|
|
|
|
32
|
|
|
self.cpes_by_id = {} |
33
|
|
|
self.cpes_by_name = {} |
34
|
|
|
self.product_cpes = {} |
35
|
|
|
self.platforms = {} |
36
|
|
|
self.cpe_oval_href = "" |
37
|
|
|
self.algebra = Algebra( |
38
|
|
|
symbol_cls=CPEALCheckFactRef, function_cls=CPEALLogicalTest) |
39
|
|
|
|
40
|
|
|
def load_product_cpes(self, env_yaml): |
41
|
|
|
self.cpe_oval_href = "ssg-" + env_yaml["product"] + "-cpe-oval.xml" |
42
|
|
|
try: |
43
|
|
|
product_cpes_list = env_yaml["cpes"] |
44
|
|
|
for cpe_dict_repr in product_cpes_list: |
45
|
|
|
for cpe_id, cpe in cpe_dict_repr.items(): |
46
|
|
|
# these product CPEs defined in product.yml are defined |
47
|
|
|
# differently than CPEs in shared/applicability/*.yml |
48
|
|
|
# therefore we have to place the ID at the place where it is expected |
49
|
|
|
cpe["id_"] = cpe_id |
50
|
|
|
cpe_item = CPEItem.get_instance_from_full_dict(cpe) |
51
|
|
|
cpe_item.is_product_cpe = True |
52
|
|
|
self.add_cpe_item(cpe_item) |
53
|
|
|
except KeyError as exc: |
54
|
|
|
raise exc("Product %s does not define 'cpes'" % (env_yaml["product"])) |
55
|
|
|
|
56
|
|
|
def load_content_cpes(self, env_yaml): |
57
|
|
|
cpes_root = required_key(env_yaml, "cpes_root") |
58
|
|
|
if not os.path.isabs(cpes_root): |
59
|
|
|
cpes_root = os.path.join(env_yaml["product_dir"], cpes_root) |
60
|
|
|
self.load_cpes_from_directory_tree(cpes_root, env_yaml) |
61
|
|
|
|
62
|
|
|
def load_cpes_from_directory_tree(self, root_path, env_yaml): |
63
|
|
|
for dir_item in sorted(os.listdir(root_path)): |
64
|
|
|
dir_item_path = os.path.join(root_path, dir_item) |
65
|
|
|
if not os.path.isfile(dir_item_path): |
66
|
|
|
continue |
67
|
|
|
|
68
|
|
|
_, ext = os.path.splitext(os.path.basename(dir_item_path)) |
69
|
|
|
if ext != '.yml': |
70
|
|
|
sys.stderr.write( |
71
|
|
|
"Encountered file '%s' while looking for content CPEs, " |
72
|
|
|
"extension '%s' is unknown. Skipping..\n" |
73
|
|
|
% (dir_item, ext) |
74
|
|
|
) |
75
|
|
|
continue |
76
|
|
|
|
77
|
|
|
cpe_item = CPEItem.from_yaml(dir_item_path, env_yaml) |
78
|
|
|
self.add_cpe_item(cpe_item) |
79
|
|
|
|
80
|
|
|
def add_cpe_item(self, cpe_item): |
81
|
|
|
self.cpes_by_id[cpe_item.id_] = cpe_item |
82
|
|
|
self.cpes_by_name[cpe_item.name] = cpe_item |
83
|
|
|
if cpe_item.is_product_cpe: |
84
|
|
|
self.product_cpes[cpe_item.id_] = cpe_item |
85
|
|
|
|
86
|
|
|
def get_cpe(self, cpe_id_or_name): |
87
|
|
|
try: |
88
|
|
|
if CPEItem.is_cpe_name(cpe_id_or_name): |
89
|
|
|
return self.cpes_by_name[cpe_id_or_name] |
90
|
|
|
else: |
91
|
|
|
if CPEALCheckFactRef.cpe_id_is_parametrized(cpe_id_or_name): |
92
|
|
|
cpe_id_or_name = CPEALCheckFactRef.get_base_name_of_parametrized_cpe_id( |
93
|
|
|
cpe_id_or_name) |
94
|
|
|
return self.cpes_by_id[cpe_id_or_name] |
95
|
|
|
except KeyError: |
96
|
|
|
raise CPEDoesNotExist("CPE %s is not defined" % cpe_id_or_name) |
97
|
|
|
|
98
|
|
|
def add_resolved_cpe_items_from_platform(self, platform): |
99
|
|
|
for fact_ref in platform.get_fact_refs(): |
100
|
|
|
if fact_ref.arg: # the CPE item is parametrized |
101
|
|
|
try: |
102
|
|
|
# if there already exists a CPE item with factref's ID |
103
|
|
|
# we can just use it right away, no new CPE items need to be created |
104
|
|
|
cpe = self.get_cpe_for_fact_ref(fact_ref) |
105
|
|
|
fact_ref.cpe_name = cpe.name |
106
|
|
|
except CPEDoesNotExist: |
107
|
|
|
# if the CPE item with factref's ID does not exist |
108
|
|
|
# it means that we need to create a new CPE item |
109
|
|
|
# which will have parameters in place |
110
|
|
|
cpe = self.get_cpe(fact_ref.cpe_name) |
111
|
|
|
new_cpe = cpe.create_resolved_cpe_item_for_fact_ref(fact_ref) |
112
|
|
|
self.add_cpe_item(new_cpe) |
113
|
|
|
fact_ref.cpe_name = new_cpe.name |
114
|
|
|
|
115
|
|
|
def get_cpe_for_fact_ref(self, fact_ref): |
116
|
|
|
return self.get_cpe(fact_ref.as_id()) |
117
|
|
|
|
118
|
|
|
def get_cpe_name(self, cpe_id): |
119
|
|
|
cpe = self.get_cpe(cpe_id) |
120
|
|
|
return cpe.name |
121
|
|
|
|
122
|
|
|
def get_product_cpe_names(self): |
123
|
|
|
return [cpe.name for cpe in self.product_cpes.values()] |
124
|
|
|
|
125
|
|
|
|
126
|
|
|
class CPEList(object): |
127
|
|
|
""" |
128
|
|
|
Represents the cpe-list element from the CPE standard. |
129
|
|
|
""" |
130
|
|
|
|
131
|
|
|
prefix = "cpe-dict" |
132
|
|
|
ns = PREFIX_TO_NS[prefix] |
133
|
|
|
|
134
|
|
|
def __init__(self): |
135
|
|
|
self.cpe_items = [] |
136
|
|
|
|
137
|
|
|
def add(self, cpe_item): |
138
|
|
|
self.cpe_items.append(cpe_item) |
139
|
|
|
|
140
|
|
|
def to_xml_element(self, cpe_oval_file): |
141
|
|
|
cpe_list = ET.Element("{%s}cpe-list" % CPEList.ns) |
142
|
|
|
cpe_list.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance") |
143
|
|
|
cpe_list.set("xsi:schemaLocation", |
144
|
|
|
"http://cpe.mitre.org/dictionary/2.0 " |
145
|
|
|
"http://cpe.mitre.org/files/cpe-dictionary_2.1.xsd") |
146
|
|
|
|
147
|
|
|
self.cpe_items.sort(key=lambda cpe: cpe.name) |
148
|
|
|
for cpe_item in self.cpe_items: |
149
|
|
|
cpe_list.append(cpe_item.to_xml_element(cpe_oval_file)) |
150
|
|
|
|
151
|
|
|
return cpe_list |
152
|
|
|
|
153
|
|
|
def to_file(self, file_name, cpe_oval_file): |
154
|
|
|
root = self.to_xml_element(cpe_oval_file) |
155
|
|
|
tree = ET.ElementTree(root) |
156
|
|
|
tree.write(file_name, encoding="utf-8") |
157
|
|
|
|
158
|
|
|
|
159
|
|
|
class CPEItem(XCCDFEntity, Templatable): |
160
|
|
|
""" |
161
|
|
|
Represents the cpe-item element from the CPE standard. |
162
|
|
|
""" |
163
|
|
|
|
164
|
|
|
KEYS = dict( |
165
|
|
|
name=lambda: "", |
166
|
|
|
check_id=lambda: "", |
167
|
|
|
bash_conditional=lambda: "", |
168
|
|
|
ansible_conditional=lambda: "", |
169
|
|
|
is_product_cpe=lambda: False, |
170
|
|
|
versioned=lambda: False, |
171
|
|
|
args=lambda: {}, |
172
|
|
|
** XCCDFEntity.KEYS |
173
|
|
|
) |
174
|
|
|
KEYS.update(**Templatable.KEYS) |
175
|
|
|
|
176
|
|
|
MANDATORY_KEYS = [ |
177
|
|
|
"name", |
178
|
|
|
] |
179
|
|
|
|
180
|
|
|
prefix = "cpe-dict" |
181
|
|
|
ns = PREFIX_TO_NS[prefix] |
182
|
|
|
|
183
|
|
|
@property |
184
|
|
|
def cpe_oval_short_def_id(self): |
185
|
|
|
return self.check_id or self.id_ |
186
|
|
|
|
187
|
|
|
@property |
188
|
|
|
def cpe_oval_def_id(self): |
189
|
|
|
translator = ssg.id_translate.IDTranslator("ssg") |
190
|
|
|
full_id = translator.generate_id( |
191
|
|
|
"{" + oval_namespace + "}definition", self.cpe_oval_short_def_id) |
192
|
|
|
return full_id |
193
|
|
|
|
194
|
|
|
def to_xml_element(self, cpe_oval_filename): |
195
|
|
|
cpe_item = ET.Element("{%s}cpe-item" % CPEItem.ns) |
196
|
|
|
cpe_item.set('name', self.name) |
197
|
|
|
|
198
|
|
|
cpe_item_title = ET.SubElement(cpe_item, "{%s}title" % CPEItem.ns) |
199
|
|
|
cpe_item_title.set('xml:lang', "en-us") |
200
|
|
|
cpe_item_title.text = self.title |
201
|
|
|
|
202
|
|
|
cpe_item_check = ET.SubElement(cpe_item, "{%s}check" % CPEItem.ns) |
203
|
|
|
cpe_item_check.set('system', oval_namespace) |
204
|
|
|
cpe_item_check.set('href', cpe_oval_filename) |
205
|
|
|
cpe_item_check.text = self.cpe_oval_short_def_id |
206
|
|
|
return cpe_item |
207
|
|
|
|
208
|
|
|
@classmethod |
209
|
|
|
def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None): |
210
|
|
|
cpe_item = super(CPEItem, cls).from_yaml(yaml_file, env_yaml, product_cpes) |
211
|
|
|
if cpe_item.is_product_cpe: |
212
|
|
|
cpe_item.is_product_cpe = convert_string_to_bool(cpe_item.is_product_cpe) |
213
|
|
|
if cpe_item.versioned: |
214
|
|
|
cpe_item.versioned = convert_string_to_bool(cpe_item.versioned) |
215
|
|
|
return cpe_item |
216
|
|
|
|
217
|
|
|
def set_template_variables(self, *sources): |
218
|
|
|
if self.is_templated(): |
219
|
|
|
self.template["vars"] = {} |
220
|
|
|
for source in sources: |
221
|
|
|
self.template["vars"].update(source) |
222
|
|
|
|
223
|
|
|
def create_resolved_cpe_item_for_fact_ref(self, fact_ref): |
224
|
|
|
if fact_ref.has_version_specs(): |
225
|
|
|
if not self.versioned: |
226
|
|
|
raise ValueError("CPE entity '{0}' does not support version specifiers: " |
227
|
|
|
"{1}".format(self.id_, fact_ref.cpe_name)) |
228
|
|
|
try: |
229
|
|
|
resolved_parameters = self.args[fact_ref.arg] |
230
|
|
|
except KeyError: |
231
|
|
|
raise KeyError( |
232
|
|
|
"The {0} CPE item does not support the argument {1}. " |
233
|
|
|
"Following arguments are supported: {2}".format( |
234
|
|
|
self.id_, fact_ref.arg, [a for a in self.args.keys()])) |
235
|
|
|
resolved_parameters.update(fact_ref.as_dict()) |
236
|
|
|
cpe_item_as_dict = self.represent_as_dict() |
237
|
|
|
cpe_item_as_dict["args"] = None |
238
|
|
|
cpe_item_as_dict["id_"] = fact_ref.as_id() |
239
|
|
|
new_associated_cpe_item_as_dict = apply_formatting_on_dict_values( |
240
|
|
|
cpe_item_as_dict, resolved_parameters) |
241
|
|
|
new_associated_cpe_item = CPEItem.get_instance_from_full_dict( |
242
|
|
|
new_associated_cpe_item_as_dict) |
243
|
|
|
new_associated_cpe_item.set_template_variables(resolved_parameters) |
244
|
|
|
return new_associated_cpe_item |
245
|
|
|
|
246
|
|
|
@staticmethod |
247
|
|
|
def is_cpe_name(cpe_id_or_name): |
248
|
|
|
return cpe_id_or_name.startswith("cpe:") |
249
|
|
|
|
250
|
|
|
def set_conditional(self, language, content): |
251
|
|
|
if language == "ansible": |
252
|
|
|
self.ansible_conditional = content |
253
|
|
|
elif language == "bash": |
254
|
|
|
self.bash_conditional = content |
255
|
|
|
else: |
256
|
|
|
raise RuntimeError( |
257
|
|
|
"The language {0} is not supported as conditional for CPE".format(language)) |
258
|
|
|
|
259
|
|
|
|
260
|
|
|
class CPEALLogicalTest(Function): |
261
|
|
|
|
262
|
|
|
prefix = "cpe-lang" |
263
|
|
|
ns = PREFIX_TO_NS[prefix] |
264
|
|
|
|
265
|
|
|
def to_xml_element(self): |
266
|
|
|
cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns) |
267
|
|
|
cpe_test.set('operator', ('OR' if self.is_or() else 'AND')) |
268
|
|
|
cpe_test.set('negate', ('true' if self.is_not() else 'false')) |
269
|
|
|
# Logical tests must go first, therefore we separate tests and factrefs |
270
|
|
|
tests = [t for t in self.args if isinstance(t, CPEALLogicalTest)] |
271
|
|
|
factrefs = [f for f in self.args if isinstance(f, CPEALCheckFactRef)] |
272
|
|
|
for obj in tests + factrefs: |
273
|
|
|
cpe_test.append(obj.to_xml_element()) |
274
|
|
|
|
275
|
|
|
return cpe_test |
276
|
|
|
|
277
|
|
|
def enrich_with_cpe_info(self, cpe_products): |
278
|
|
|
for arg in self.args: |
279
|
|
|
arg.enrich_with_cpe_info(cpe_products) |
280
|
|
|
|
281
|
|
|
def to_bash_conditional(self): |
282
|
|
|
child_bash_conds = [ |
283
|
|
|
a.to_bash_conditional() for a in self.args |
284
|
|
|
if a.to_bash_conditional() != ''] |
285
|
|
|
|
286
|
|
|
if not child_bash_conds: |
287
|
|
|
return "" |
288
|
|
|
|
289
|
|
|
cond = "" |
290
|
|
|
if self.is_not(): |
291
|
|
|
cond += "! " |
292
|
|
|
op = " " |
293
|
|
|
cond += "( " |
294
|
|
|
if self.is_or(): |
295
|
|
|
op = " || " |
296
|
|
|
elif self.is_and(): |
297
|
|
|
op = " && " |
298
|
|
|
cond += op.join(child_bash_conds) |
|
|
|
|
299
|
|
|
cond += " )" |
300
|
|
|
return cond |
301
|
|
|
|
302
|
|
|
def to_ansible_conditional(self): |
303
|
|
|
child_ansible_conds = [ |
304
|
|
|
a.to_ansible_conditional() for a in self.args |
305
|
|
|
if a.to_ansible_conditional() != ''] |
306
|
|
|
|
307
|
|
|
if not child_ansible_conds: |
308
|
|
|
return "" |
309
|
|
|
|
310
|
|
|
cond = "" |
311
|
|
|
if self.is_not(): |
312
|
|
|
cond += "not " |
313
|
|
|
op = " " |
314
|
|
|
cond += "( " |
315
|
|
|
if self.is_or(): |
316
|
|
|
op = " or " |
317
|
|
|
elif self.is_and(): |
318
|
|
|
op = " and " |
319
|
|
|
cond += op.join(child_ansible_conds) |
|
|
|
|
320
|
|
|
cond += " )" |
321
|
|
|
return cond |
322
|
|
|
|
323
|
|
|
|
324
|
|
|
class CPEALCheckFactRef(Symbol): |
325
|
|
|
|
326
|
|
|
prefix = "cpe-lang" |
327
|
|
|
ns = PREFIX_TO_NS[prefix] |
328
|
|
|
|
329
|
|
|
def __init__(self, obj): |
330
|
|
|
super(CPEALCheckFactRef, self).__init__(obj) |
331
|
|
|
self.cpe_name = obj # we do not want to modify original name used for platforms |
332
|
|
|
self.bash_conditional = "" |
333
|
|
|
self.ansible_conditional = "" |
334
|
|
|
|
335
|
|
|
def enrich_with_cpe_info(self, cpe_products): |
336
|
|
|
self.cpe_oval_href = cpe_products.cpe_oval_href |
337
|
|
|
cpe_item = cpe_products.get_cpe(self.cpe_name) |
338
|
|
|
self.bash_conditional = cpe_item.bash_conditional |
339
|
|
|
self.ansible_conditional = cpe_item.ansible_conditional |
340
|
|
|
self.cpe_name = cpe_products.get_cpe_name(self.cpe_name) |
341
|
|
|
self.cpe_oval_def_id = cpe_item.cpe_oval_def_id |
342
|
|
|
|
343
|
|
|
def to_xml_element(self): |
344
|
|
|
el = ET.Element("{%s}check-fact-ref" % CPEALCheckFactRef.ns) |
345
|
|
|
el.set("system", oval_namespace) |
346
|
|
|
el.set("href", self.cpe_oval_href) |
347
|
|
|
el.set("id-ref", self.cpe_oval_def_id) |
348
|
|
|
return el |
349
|
|
|
|
350
|
|
|
def to_bash_conditional(self): |
351
|
|
|
return self.bash_conditional |
352
|
|
|
|
353
|
|
|
def to_ansible_conditional(self): |
354
|
|
|
return self.ansible_conditional |
355
|
|
|
|
356
|
|
|
@staticmethod |
357
|
|
|
def cpe_id_is_parametrized(cpe_id): |
358
|
|
|
return Symbol.is_parametrized(cpe_id) |
359
|
|
|
|
360
|
|
|
@staticmethod |
361
|
|
|
def get_base_name_of_parametrized_cpe_id(cpe_id): |
362
|
|
|
""" |
363
|
|
|
If given a parametrized platform name such as package[test], |
364
|
|
|
it returns the package part only. |
365
|
|
|
""" |
366
|
|
|
return Symbol.get_base_of_parametrized_name(cpe_id) |
367
|
|
|
|
368
|
|
|
|
369
|
|
|
def extract_subelement(objects, sub_elem_type): |
370
|
|
|
""" |
371
|
|
|
From a collection of element objects, return the value of |
372
|
|
|
the first attribute of name sub_elem_type found. |
373
|
|
|
|
374
|
|
|
This is useful when the object is a single element and |
375
|
|
|
we wish to query some external reference identifier |
376
|
|
|
in the subtree of that element. |
377
|
|
|
""" |
378
|
|
|
|
379
|
|
|
for obj in objects: |
380
|
|
|
for subelement in obj.iter(): |
381
|
|
|
if subelement.get(sub_elem_type): |
382
|
|
|
sub_element = subelement.get(sub_elem_type) |
383
|
|
|
return sub_element |
384
|
|
|
|
385
|
|
|
|
386
|
|
|
def extract_env_obj(objects, local_var): |
387
|
|
|
""" |
388
|
|
|
From a collection of objects, return the object with id matching |
389
|
|
|
the object_ref of the local variable. |
390
|
|
|
|
391
|
|
|
NOTE: This assumes that a local variable can only reference one object. |
392
|
|
|
Which is not true, variables can reference multiple objects. |
393
|
|
|
But this assumption should work for OVAL checks for CPEs, |
394
|
|
|
as they are not that complicated. |
395
|
|
|
""" |
396
|
|
|
|
397
|
|
|
for obj in objects: |
398
|
|
|
env_id = extract_subelement(local_var, 'object_ref') |
399
|
|
|
if env_id == obj.get('id'): |
400
|
|
|
return obj |
401
|
|
|
|
402
|
|
|
return None |
403
|
|
|
|
404
|
|
|
|
405
|
|
|
def extract_referred_nodes(tree_with_refs, tree_with_ids, attrname): |
406
|
|
|
""" |
407
|
|
|
Return the elements in tree_with_ids which are referenced |
408
|
|
|
from tree_with_refs via the element attribute 'attrname'. |
409
|
|
|
""" |
410
|
|
|
|
411
|
|
|
reflist = [] |
412
|
|
|
elementlist = [] |
413
|
|
|
|
414
|
|
|
for element in tree_with_refs.iter(): |
415
|
|
|
value = element.get(attrname) |
416
|
|
|
if value is not None: |
417
|
|
|
reflist.append(value) |
418
|
|
|
|
419
|
|
|
for element in tree_with_ids.iter(): |
420
|
|
|
if element.get("id") in reflist: |
421
|
|
|
elementlist.append(element) |
422
|
|
|
|
423
|
|
|
return elementlist |
424
|
|
|
|