Passed
Branch master (165750)
by Matěj
01:42
created

modules.ssgcommon.open_environment_yamls()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 2
dl 0
loc 5
rs 9.4285
c 0
b 0
f 0
1
import datetime
2
import platform
3
import subprocess
4
import re
5
import codecs
6
import jinja2
7
import os.path
8
import yaml
9
10
try:
11
    from yaml import CSafeLoader as yaml_SafeLoader
12
except ImportError:
13
    from yaml import SafeLoader as yaml_SafeLoader
14
15
16
def bool_constructor(self, node):
17
    return self.construct_scalar(node)
18
19
20
# Don't follow python bool case
21
yaml_SafeLoader.add_constructor(u'tag:yaml.org,2002:bool', bool_constructor)
22
23
24
try:
25
    from xml.etree import cElementTree as ElementTree
26
except ImportError:
27
    import cElementTree as ElementTree
28
29
30
JINJA_MACROS_DEFINITIONS = os.path.join(os.path.dirname(__file__), "macros.jinja")
31
32
xml_version = """<?xml version="1.0" encoding="UTF-8"?>"""
33
34
datastream_namespace = "http://scap.nist.gov/schema/scap/source/1.2"
35
ocil_namespace = "http://scap.nist.gov/schema/ocil/2.0"
36
oval_footer = "</oval_definitions>"
37
oval_namespace = "http://oval.mitre.org/XMLSchema/oval-definitions-5"
38
ocil_cs = "http://scap.nist.gov/schema/ocil/2"
39
xccdf_header = xml_version + "<xccdf>"
40
xccdf_footer = "</xccdf>"
41
bash_system = "urn:xccdf:fix:script:sh"
42
ansible_system = "urn:xccdf:fix:script:ansible"
43
puppet_system = "urn:xccdf:fix:script:puppet"
44
anaconda_system = "urn:redhat:anaconda:pre"
45
cce_uri = "https://nvd.nist.gov/cce/index.cfm"
46
stig_ns = "http://iase.disa.mil/stigs/Pages/stig-viewing-guidance.aspx"
47
ssg_version_uri = \
48
    "https://github.com/OpenSCAP/scap-security-guide/releases/latest"
49
OSCAP_VENDOR = "org.ssgproject"
50
OSCAP_DS_STRING = "xccdf_%s.content_benchmark_" % OSCAP_VENDOR
51
OSCAP_GROUP = "xccdf_%s.content_group_" % OSCAP_VENDOR
52
OSCAP_GROUP_PCIDSS = "xccdf_%s.content_group_pcidss-req" % OSCAP_VENDOR
53
OSCAP_GROUP_VAL = "xccdf_%s.content_group_values" % OSCAP_VENDOR
54
OSCAP_GROUP_NON_PCI = "xccdf_%s.content_group_non-pci-dss" % OSCAP_VENDOR
55
XCCDF11_NS = "http://checklists.nist.gov/xccdf/1.1"
56
XCCDF12_NS = "http://checklists.nist.gov/xccdf/1.2"
57
min_ansible_version = "2.3"
58
ansible_version_requirement_pre_task_name = \
59
    "Verify Ansible meets SCAP-Security-Guide version requirements."
60
61
oval_header = (
62
    """
63
<oval_definitions
64
    xmlns="{0}"
65
    xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5"
66
    xmlns:ind="{0}#independent"
67
    xmlns:unix="{0}#unix"
68
    xmlns:linux="{0}#linux"
69
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
70
    xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd
71
        {0} oval-definitions-schema.xsd
72
        {0}#independent independent-definitions-schema.xsd
73
        {0}#unix unix-definitions-schema.xsd
74
        {0}#linux linux-definitions-schema.xsd">"""
75
    .format(oval_namespace))
76
77
78
timestamp = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
79
80
81
PKG_MANAGER_TO_SYSTEM = {
82
    "yum": "rpm",
83
    "zypper": "rpm",
84
    "dnf": "rpm",
85
    "apt_get": "dpkg",
86
}
87
88
89
class SSGError(RuntimeError):
90
    pass
91
92
93
def oval_generated_header(product_name, schema_version, ssg_version):
94
    return xml_version + oval_header + \
95
        """
96
    <generator>
97
        <oval:product_name>%s from SCAP Security Guide</oval:product_name>
98
        <oval:product_version>ssg: %s, python: %s</oval:product_version>
99
        <oval:schema_version>%s</oval:schema_version>
100
        <oval:timestamp>%s</oval:timestamp>
101
    </generator>""" % (product_name, ssg_version, platform.python_version(),
102
                       schema_version, timestamp)
103
104
105
def subprocess_check_output(*popenargs, **kwargs):
106
    # Backport of subprocess.check_output taken from
107
    # https://gist.github.com/edufelipe/1027906
108
    #
109
    # Originally from Python 2.7 stdlib under PSF, compatible with BSD-3
110
    # Copyright (c) 2003-2005 by Peter Astrand <[email protected]>
111
    # Changes by Eduardo Felipe
112
113
    process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
114
    output, unused_err = process.communicate()
115
    retcode = process.poll()
116
    if retcode:
117
        cmd = kwargs.get("args")
118
        if cmd is None:
119
            cmd = popenargs[0]
120
        error = subprocess.CalledProcessError(retcode, cmd)
121
        error.output = output
122
        raise error
123
    return output
124
125
126
if hasattr(subprocess, "check_output"):
127
    # if available we just use the real function
128
    subprocess_check_output = subprocess.check_output
129
130
131
def get_check_content_ref_if_exists_and_not_remote(check):
132
    """
133
    Given an OVAL check element, examine the ``xccdf_ns:check-content-ref``
134
135
    If it exists and it isn't remote, pass it as the return value.
136
    Otherwise, return None.
137
138
    ..see-also:: check_content_href_is_remote
139
    """
140
    checkcontentref = check.find("./{%s}check-content-ref" % XCCDF11_NS)
141
    if checkcontentref is None:
142
        return None
143
    if check_content_href_is_remote(checkcontentref):
144
        return None
145
    else:
146
        return checkcontentref
147
148
149
def check_content_href_is_remote(check_content_ref):
150
    """
151
    Given an OVAL check-content-ref element, examine the 'href' attribute.
152
153
    If it starts with 'http://' or 'https://', return True, otherwise return False.
154
155
    Raises RuntimeError if the ``href`` element doesn't exist.
156
    """
157
    hrefattr = check_content_ref.get("href")
158
    if hrefattr is None:
159
        # @href attribute of <check-content-ref> is required by XCCDF standard
160
        msg = "Invalid OVAL <check-content-ref> detected - missing the 'href' attribute!"
161
        raise RuntimeError(msg)
162
163
    return hrefattr.startswith("http://") or hrefattr.startswith("https://")
164
165
166
def parse_xml_file(filename):
167
    """
168
    Given a filename, return the corresponding ElementTree
169
    """
170
    with open(filename, 'r') as xml_file:
171
        filestring = xml_file.read()
172
        tree = ElementTree.fromstring(filestring)
173
    return tree
174
175
176
def cce_is_valid(cceid):
177
    """
178
    IF CCE ID IS IN VALID FORM (either 'CCE-XXXX-X' or 'CCE-XXXXX-X'
179
    where each X is a digit, and the final X is a check-digit)
180
    based on Requirement A17:
181
182
    http://people.redhat.com/swells/nist-scap-validation/scap-val-requirements-1.2.html
183
    """
184
    match = re.search(r'CCE-\d{4,5}-\d', cceid)
185
    return match is not None
186
187
188
def map_elements_to_their_ids(tree, xpath_expr):
189
    """
190
    Given an ElementTree and an XPath expression,
191
    iterate through matching elements and create 1:1 id->element mapping.
192
193
    Raises AssertionError if a matching element doesn't have the ``id`` attribute.
194
195
    Returns mapping as a dictionary
196
    """
197
    aggregated = {}
198
    for element in tree.findall(xpath_expr):
199
        element_id = element.get("id")
200
        assert element_id is not None
201
        aggregated[element_id] = element
202
    return aggregated
203
204
205
class AbsolutePathFileSystemLoader(jinja2.BaseLoader):
206
    """Loads templates from the file system. This loader insists on absolute
207
    paths and fails if a relative path is provided.
208
209
    >>> loader = AbsolutePathFileSystemLoader()
210
211
    Per default the template encoding is ``'utf-8'`` which can be changed
212
    by setting the `encoding` parameter to something else.
213
    """
214
215
    def __init__(self, encoding='utf-8'):
216
        self.encoding = encoding
217
218
    def get_source(self, environment, template):
219
        if not os.path.isabs(template):
220
            raise jinja2.TemplateNotFound(template)
221
222
        f = jinja2.utils.open_if_exists(template)
223
        if f is None:
224
            raise jinja2.TemplateNotFound(template)
225
        try:
226
            contents = f.read().decode(self.encoding)
227
        finally:
228
            f.close()
229
230
        mtime = os.path.getmtime(template)
231
232
        def uptodate():
233
            try:
234
                return os.path.getmtime(template) == mtime
235
            except OSError:
236
                return False
237
        return contents, template, uptodate
238
239
240
def get_jinja_environment():
241
    if get_jinja_environment.env is None:
242
        # TODO: Choose better syntax?
243
        get_jinja_environment.env = jinja2.Environment(
244
            block_start_string="{{%",
245
            block_end_string="%}}",
246
            variable_start_string="{{{",
247
            variable_end_string="}}}",
248
            comment_start_string="{{#",
249
            comment_end_string="#}}",
250
            loader=AbsolutePathFileSystemLoader()
251
        )
252
253
    return get_jinja_environment.env
254
255
256
get_jinja_environment.env = None
257
258
259
def process_file_with_jinja(filepath, substitutions_dict):
260
    template = get_jinja_environment().get_template(filepath)
261
    return template.render(substitutions_dict)
262
263
264
def _open_yaml(stream):
265
    """
266
    Open given file-like object and parse it as YAML
267
    Return None if it contains "documentation_complete" key set to "false".
268
    """
269
    yaml_contents = yaml.load(stream, Loader=yaml_SafeLoader)
270
271
    if yaml_contents.pop("documentation_complete", "true") == "false":
272
        return None
273
274
    return yaml_contents
275
276
277
def open_and_expand_yaml(yaml_file, substitutions_dict=None):
278
    """
279
    Process the file as a template, using substitutions_dict to perform expansion.
280
    Then, process the expansion result as a YAML content.
281
282
    See also: _open_yaml
283
    """
284
    if substitutions_dict is None:
285
        substitutions_dict = dict()
286
287
    expanded_template = process_file_with_jinja(yaml_file, substitutions_dict)
288
    yaml_contents = _open_yaml(expanded_template)
289
    return yaml_contents
290
291
292
def _extract_substitutions_dict_from_template(filename):
293
    template = get_jinja_environment().get_template(filename)
294
    all_symbols = template.make_module().__dict__
295
    symbols_to_export = dict()
296
    for name, symbol in all_symbols.items():
297
        if name.startswith("_"):
298
            continue
299
        symbols_to_export[name] = symbol
300
    return symbols_to_export
301
302
303
def rename_items(original_dict, renames):
304
    renamed_macros = dict()
305
    for rename_from, rename_to in renames.items():
306
        if rename_from in original_dict:
307
            renamed_macros[rename_to] = original_dict[rename_from]
308
    return renamed_macros
309
310
311
def get_implied_properties(existing_properties):
312
    result = dict()
313
    if ("pkg_manager" in existing_properties
314
            and "pkg_system" not in existing_properties):
315
        result["pkg_system"] = PKG_MANAGER_TO_SYSTEM[existing_properties["pkg_manager"]]
316
    return result
317
318
319
def _save_rename(result, stem, prefix):
320
    result["{0}_{1}".format(prefix, stem)] = stem
321
322
323
def _identify_special_macro_mapping(existing_properties):
324
    result = dict()
325
326
    pkg_manager = existing_properties.get("pkg_manager")
327
    if pkg_manager is not None:
328
        _save_rename(result, "describe_package_install", pkg_manager)
329
        _save_rename(result, "describe_package_remove", pkg_manager)
330
331
    pkg_system = existing_properties.get("pkg_system")
332
    if pkg_system is not None:
333
        _save_rename(result, "ocil_package", pkg_system)
334
        _save_rename(result, "complete_ocil_entry_package", pkg_system)
335
336
    init_system = existing_properties.get("init_system")
337
    if init_system is not None:
338
        _save_rename(result, "describe_service_enable", init_system)
339
        _save_rename(result, "describe_service_disable", init_system)
340
        _save_rename(result, "ocil_service_enabled", init_system)
341
        _save_rename(result, "ocil_service_disabled", init_system)
342
        _save_rename(result, "describe_socket_enable", init_system)
343
        _save_rename(result, "describe_socket_disable", init_system)
344
        _save_rename(result, "complete_ocil_entry_socket_and_service_disabled", init_system)
345
346
    return result
347
348
349
def open_and_macro_expand_yaml(yaml_file, substitutions_dict=None):
350
    """
351
    Do the same as open_and_expand_yaml, but load definitions of macros
352
    so they can be expanded in the template.
353
    """
354
    if substitutions_dict is None:
355
        substitutions_dict = dict()
356
357
    try:
358
        macro_definitions = _extract_substitutions_dict_from_template(JINJA_MACROS_DEFINITIONS)
359
    except Exception as exc:
360
        msg = ("Error extracting macro definitions from {0}: {1}"
361
               .format(JINJA_MACROS_DEFINITIONS, str(exc)))
362
        raise RuntimeError(msg)
363
    mapping = _identify_special_macro_mapping(substitutions_dict)
364
    special_macros = rename_items(macro_definitions, mapping)
365
    substitutions_dict.update(macro_definitions)
366
    substitutions_dict.update(special_macros)
367
    return open_and_expand_yaml(yaml_file, substitutions_dict)
368
369
370
def open_yaml(yaml_file):
371
    """
372
    Open given file-like object and parse it as YAML
373
    without performing any kind of template processing
374
375
    See also: _open_yaml
376
    """
377
    with codecs.open(yaml_file, "r", "utf8") as stream:
378
        yaml_contents = _open_yaml(stream)
379
    return yaml_contents
380
381
382
def open_environment_yamls(build_config_yaml, product_yaml):
383
    contents = open_yaml(build_config_yaml)
384
    contents.update(open_yaml(product_yaml))
385
    contents.update(get_implied_properties(contents))
386
    return contents
387
388
389
def required_yaml_key(yaml_contents, key):
390
    if key in yaml_contents:
391
        return yaml_contents[key]
392
393
    raise ValueError("%s is required but was not found in:\n%s" %
394
                     (key, repr(yaml_contents)))
395