Passed
Pull Request — rhel8-branch (#148)
by Matěj
01:06
created

BenchmarkHandler.profiles()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""
22
Module with various classes for SCAP content processing and retrieving data
23
from it.
24
25
"""
26
27
import os.path
28
29
import multiprocessing
30
from collections import namedtuple, OrderedDict
31
from openscap_api import OSCAP
32
from pyanaconda.core.util import execReadlines
33
try:
34
    from html.parser import HTMLParser
35
except ImportError:
36
    from HTMLParser import HTMLParser
37
38
39
CONTENT_TYPES = dict(
40
    DATASTREAM="Source Data Stream",
41
    XCCDF_CHECKLIST="XCCDF Checklist",
42
    OVAL="OVAL Definitions",
43
    CPE_DICT="CPE Dictionary",
44
    TAILORING="XCCDF Tailoring",
45
)
46
47
48
class ContentHandlingError(Exception):
49
    """Exception class for errors related to SCAP content handling."""
50
51
    pass
52
53
54
class DataStreamHandlingError(ContentHandlingError):
55
    """Exception class for errors related to data stream handling."""
56
57
    pass
58
59
60
class BenchmarkHandlingError(ContentHandlingError):
61
    """Exception class for errors related to benchmark handling."""
62
63
    pass
64
65
66
class ContentCheckError(ContentHandlingError):
67
    """Exception class for errors related to content (integrity,...) checking.
68
    
69
    """
70
71
    pass
72
73
74
class ParseHTMLContent(HTMLParser):
75
    """Parser class for HTML tags within content"""
76
77
    def __init__(self):
78
        HTMLParser.__init__(self)
79
        self.content = ""
80
81
    def handle_starttag(self, tag, attrs):
82
        if tag == "html:ul":
83
            self.content += "\n"
84
        elif tag == "html:li":
85
            self.content += "\n"
86
        elif tag == "html:br":
87
            self.content += "\n"
88
89
    def handle_endtag(self, tag):
90
        if tag == "html:ul":
91
            self.content += "\n"
92
        elif tag == "html:li":
93
            self.content += "\n"
94
95
    def handle_data(self, data):
96
        self.content += data.strip()
97
98
    def get_content(self):
99
        return self.content
100
101
102
def parse_HTML_from_content(content):
103
    """This is a very simple HTML to text parser.
104
105
    HTML tags will be removed while trying to maintain readability
106
    of content.
107
108
    :param content: content whose HTML tags will be parsed
109
    :return: content without HTML tags
110
    """
111
112
    parser = ParseHTMLContent()
113
    parser.feed(content)
114
    return parser.get_content()
115
116
117
# namedtuple class (not a constant, pylint!) for info about a XCCDF profile
118
# pylint: disable-msg=C0103
119
ProfileInfo = namedtuple("ProfileInfo", ["id", "title", "description"])
120
121
# namedtuple class for info about content files found
122
# pylint: disable-msg=C0103
123
ContentFiles = namedtuple("ContentFiles", ["xccdf", "cpe", "tailoring"])
124
125
126
def oscap_text_itr_get_text(itr):
127
    """
128
    Helper function for getting a text from the oscap_text_iterator.
129
130
    :param itr: oscap_text_iterator to get the text from
131
    :type itr: oscap_text_iterator
132
    :return: text gotten from the iterator
133
    :rtype: str
134
135
    """
136
137
    ret = ""
138
    while OSCAP.oscap_text_iterator_has_more(itr):
139
        text_item = OSCAP.oscap_text_iterator_next(itr)
140
        ret += OSCAP.oscap_text_get_text(text_item)
141
142
    return ret
143
144
145
def identify_files(fpaths):
146
    with multiprocessing.Pool(os.cpu_count()) as p:
147
        labels = p.map(get_doc_type, fpaths)
148
    return {path: label for (path, label) in zip(fpaths, labels)}
149
150
151
def get_doc_type(file_path):
152
    try:
153
        for line in execReadlines("oscap", ["info", file_path]):
154
            if line.startswith("Document type:"):
155
                _prefix, _sep, type_info = line.partition(":")
156
                return type_info.strip()
157
    except OSError:
158
        # 'oscap info' exitted with a non-zero exit code -> unknown doc
159
        # type
160
        return None
161
    except UnicodeDecodeError:
162
        # 'oscap info' supplied weird output, which happens when it tries
163
        # to explain why it can't examine e.g. a JPG.
164
        return None
165
166
167
def explore_content_files(fpaths):
168
    """
169
    Function for finding content files in a list of file paths. SIMPLY PICKS
170
    THE FIRST USABLE CONTENT FILE OF A PARTICULAR TYPE AND JUST PREFERS DATA
171
    STREAMS OVER STANDALONE BENCHMARKS.
172
173
    :param fpaths: a list of file paths to search for content files in
174
    :type fpaths: [str]
175
    :return: a tuple containing the content handling class and an ContentFiles
176
             instance containing the file names of the XCCDF file, CPE
177
             dictionary and tailoring file or "" in place of those items if not
178
             found
179
    :rtype: (class, ContentFiles)
180
181
    """
182
    xccdf_file = ""
183
    cpe_file = ""
184
    tailoring_file = ""
185
    found_ds = False
186
    content_class = None
187
188
    for fpath in fpaths:
189
        doc_type = get_doc_type(fpath)
190
        if not doc_type:
191
            continue
192
193
        # prefer DS over standalone XCCDF
194
        if doc_type == "Source Data Stream" and (not xccdf_file or not found_ds):
195
            xccdf_file = fpath
196
            content_class = DataStreamHandler
197
            found_ds = True
198
        elif doc_type == "XCCDF Checklist" and not xccdf_file:
199
            xccdf_file = fpath
200
            content_class = BenchmarkHandler
201
        elif doc_type == "CPE Dictionary" and not cpe_file:
202
            cpe_file = fpath
203
        elif doc_type == "XCCDF Tailoring" and not tailoring_file:
204
            tailoring_file = fpath
205
206
    # TODO: raise exception if no xccdf_file is found?
207
    files = ContentFiles(xccdf_file, cpe_file, tailoring_file)
208
    return (content_class, files)
209
210
211
class DataStreamHandler(object):
212
    """
213
    Class for handling data streams in the data stream collection and
214
    retrieving data from it. For example a list of data stream indices,
215
    checklists in a given data stream of profiles.
216
217
    """
218
219
    def __init__(self, dsc_file_path, tailoring_file_path=""):
220
        """
221
        Constructor for the DataStreamHandler class.
222
223
        :param dsc_file_path: path to a file with a data stream collection
224
        :type dsc_file_path: str
225
        :param tailoring_file_path: path to a tailoring file
226
        :type tailoring_file_path: str
227
228
        """
229
230
        # is used to speed up getting lists of profiles
231
        self._profiles_cache = dict()
232
233
        if not os.path.exists(dsc_file_path):
234
            msg = "Invalid file path: '%s'" % dsc_file_path
235
            raise DataStreamHandlingError(msg)
236
237
        self._dsc_file_path = dsc_file_path
238
239
        # create an XCCDF session for the file
240
        self._session = OSCAP.xccdf_session_new(dsc_file_path)
241
        if not self._session:
242
            msg = "'%s' is not a valid SCAP content file" % dsc_file_path
243
            raise DataStreamHandlingError(msg)
244
        if OSCAP.xccdf_session_load(self._session) != 0:
245
            raise DataStreamHandlingError(OSCAP.oscap_err_desc())
246
247
        if tailoring_file_path:
248
            OSCAP.xccdf_session_set_user_tailoring_file(self._session,
249
                                                        tailoring_file_path)
250
251
        if not OSCAP.xccdf_session_is_sds(self._session):
252
            msg = "'%s' is not a data stream collection" % dsc_file_path
253
            raise DataStreamHandlingError(msg)
254
255
        # dictionary holding the items gathered from DSC processing
256
        self._items = OrderedDict()
257
258
        # create an sds index for the content
259
        self._sds_idx = OSCAP.xccdf_session_get_sds_idx(self._session)
260
261
        # iterate over streams and get checklists from each stream
262
        streams_itr = OSCAP.ds_sds_index_get_streams(self._sds_idx)
263
        while OSCAP.ds_stream_index_iterator_has_more(streams_itr):
264
            stream_idx = OSCAP.ds_stream_index_iterator_next(streams_itr)
265
266
            # will be used to store the checklists for streams
267
            stream_id = OSCAP.ds_stream_index_get_id(stream_idx)
268
            checklists = []
269
270
            # iterate over checklists and append their ids to the list
271
            chklist_itr = OSCAP.ds_stream_index_get_checklists(stream_idx)
272
            while OSCAP.oscap_string_iterator_has_more(chklist_itr):
273
                checklists.append(OSCAP.oscap_string_iterator_next(chklist_itr))
274
275
            # store the list of checklist for the current stream
276
            self._items[stream_id] = checklists
277
278
            OSCAP.oscap_string_iterator_free(chklist_itr)
279
280
        OSCAP.ds_stream_index_iterator_free(streams_itr)
281
282
    def __del__(self):
283
        """Destructor for the DataStreamHandler class."""
284
285
        if '_session' in locals():
286
            # we should free the session
287
            OSCAP.xccdf_session_free(self._session)
288
289
    def get_data_streams(self):
290
        """
291
        Method to get a list of data streams found in the data stream
292
        collection.
293
294
        :return: list of data stream IDs
295
        :rtype: list of strings
296
297
        """
298
299
        return list(self._items.keys())
300
301
    def get_data_streams_checklists(self):
302
        """
303
        Method to get data streams and their checklists found in the data
304
        stream collection.
305
306
        :return: a dictionary consisting of the IDs of the data streams as keys
307
                 and lists of their checklists' IDs as values
308
        :rtype: dict(str -> list of strings)
309
310
        """
311
312
        # easy, we already have exactly what should be returned, just create a
313
        # copy, so that the caller cannot modify our internal attributes
314
        return dict(self._items)
315
316
    def get_checklists(self, data_stream_id):
317
        """
318
        Method to get a list of checklists found in the data stream given by
319
        the data_stream_id.
320
321
        :param data_stream_id: ID of the data stream to get checklists from
322
        :type data_stream_id: str
323
        :return: list of checklist IDs found in the data stream given by the ID
324
        :rtype: list of strings
325
326
        """
327
328
        if data_stream_id not in self._items:
329
            msg = "Invalid data stream id given: '%s'" % data_stream_id
330
            raise DataStreamHandlingError(msg)
331
332
        return self._items[data_stream_id]
333
334
    def get_profiles(self, data_stream_id, checklist_id):
335
        """
336
        Method to get a list of profiles defined in the checklist given by the
337
        checklist_id that is defined in the data stream given by the
338
        data_stream_id.
339
340
        :param data_stream_id: ID of the data stream to get checklists from
341
        :type data_stream_id: str
342
        :param checklist_id: ID of the checklist to get profiles from
343
        :type checklist_id: str
344
        :return: list of profiles found in the checklist
345
        :rtype: list of ProfileInfo instances
346
347
        """
348
349
        cache_id = "%s;%s" % (data_stream_id, checklist_id)
350
        if cache_id in self._profiles_cache:
351
            # found in cache, return the value
352
            return self._profiles_cache[cache_id]
353
354
        # not found in the cache, needs to be gathered
355
356
        # set the data stream and component (checklist) for the session
357
        OSCAP.xccdf_session_free(self._session)
358
359
        self._session = OSCAP.xccdf_session_new(self._dsc_file_path)
360
        if not self._session:
361
            msg = "'%s' is not a valid SCAP content file" % self._dsc_file_path
362
            raise DataStreamHandlingError(msg)
363
364
        OSCAP.xccdf_session_set_datastream_id(self._session, data_stream_id)
365
        OSCAP.xccdf_session_set_component_id(self._session, checklist_id)
366
        if OSCAP.xccdf_session_load(self._session) != 0:
367
            raise DataStreamHandlingError(OSCAP.oscap_err_desc())
368
369
        # get the benchmark (checklist)
370
        policy_model = OSCAP.xccdf_session_get_policy_model(self._session)
371
372
        default_policy = OSCAP.xccdf_policy_new(policy_model, None)
373
        default_rules_count = OSCAP.xccdf_policy_get_selected_rules_count(default_policy)
374
375
        # will hold items for the profiles for the speficied DS and checklist
376
        profiles = []
377
378
        if default_rules_count > 0:
379
            profiles.append(ProfileInfo("default", "Default",
380
                            "The implicit XCCDF profile. Usually, the default contains no rules."))
381
382
        benchmark = OSCAP.xccdf_policy_model_get_benchmark(policy_model)
383
384
        # iterate over the profiles in the benchmark and store them
385
        profile_itr = OSCAP.xccdf_benchmark_get_profiles(benchmark)
386
        while OSCAP.xccdf_profile_iterator_has_more(profile_itr):
387
            profile = OSCAP.xccdf_profile_iterator_next(profile_itr)
388
389
            id_ = OSCAP.xccdf_profile_get_id(profile)
390
            title = oscap_text_itr_get_text(OSCAP.xccdf_profile_get_title(profile))
391
            desc = parse_HTML_from_content(
392
                oscap_text_itr_get_text(OSCAP.xccdf_profile_get_description(profile)))
393
            info = ProfileInfo(id_, title, desc)
394
395
            profiles.append(info)
396
397
        OSCAP.xccdf_profile_iterator_free(profile_itr)
398
399
        # cache the result
400
        self._profiles_cache[cache_id] = profiles
401
402
        return profiles
403
404
405
class BenchmarkHandler(object):
406
    """
407
    Class for handling XCCDF benchmark and retrieving data from it (mainly the
408
    list of profiles).
409
410
    """
411
412
    def __init__(self, xccdf_file_path, tailoring_file_path=""):
413
        """
414
        Constructor for the BenchmarkHandler class.
415
416
        :param xccdf_file_path: path to a file with an XCCDF benchmark
417
        :type xccdf_file_path: str
418
        :param tailoring_file_path: path to a tailoring file
419
        :type tailoring_file_path: str
420
        """
421
422
        if not os.path.exists(xccdf_file_path):
423
            msg = "Invalid file path: '%s'" % xccdf_file_path
424
            raise BenchmarkHandlingError(msg)
425
426
        session = OSCAP.xccdf_session_new(xccdf_file_path)
427
        if not session:
428
            msg = "'%s' is not a valid SCAP content file" % xccdf_file_path
429
            raise BenchmarkHandlingError(msg)
430
431
        if tailoring_file_path:
432
            OSCAP.xccdf_session_set_user_tailoring_file(session,
433
                                                        tailoring_file_path)
434
        if OSCAP.xccdf_session_load(session) != 0:
435
            raise BenchmarkHandlingError(OSCAP.oscap_err_desc())
436
437
        # get the benchmark object
438
        policy_model = OSCAP.xccdf_session_get_policy_model(session)
439
        benchmark = OSCAP.xccdf_policy_model_get_benchmark(policy_model)
440
441
        default_policy = OSCAP.xccdf_policy_new(policy_model, None)
442
        default_rules_count = OSCAP.xccdf_policy_get_selected_rules_count(default_policy)
443
444
        # stores a list of profiles in the benchmark
445
        self._profiles = []
446
447
        if default_rules_count > 0:
448
            self._profiles.append(
449
                ProfileInfo(
450
                    "default", "Default",
451
                    "The implicit XCCDF profile. Usually, the default contains no rules."))
452
453
        if not benchmark:
454
            msg = "Not a valid benchmark file: '%s'" % xccdf_file_path
455
            raise BenchmarkHandlingError(msg)
456
457
        # iterate over the profiles in the benchmark and store them
458
        profile_itr = OSCAP.xccdf_benchmark_get_profiles(benchmark)
459
        while OSCAP.xccdf_profile_iterator_has_more(profile_itr):
460
            profile = OSCAP.xccdf_profile_iterator_next(profile_itr)
461
462
            id_ = OSCAP.xccdf_profile_get_id(profile)
463
            title = oscap_text_itr_get_text(OSCAP.xccdf_profile_get_title(profile))
464
            desc = parse_HTML_from_content(oscap_text_itr_get_text(OSCAP.xccdf_profile_get_description(profile)))
465
            info = ProfileInfo(id_, title, desc)
466
467
            self._profiles.append(info)
468
469
        if tailoring_file_path:
470
            tailoring = OSCAP.xccdf_policy_model_get_tailoring(policy_model)
471
            profile_itr = OSCAP.xccdf_tailoring_get_profiles(tailoring)
472
            while OSCAP.xccdf_profile_iterator_has_more(profile_itr):
473
                profile = OSCAP.xccdf_profile_iterator_next(profile_itr)
474
475
                id_ = OSCAP.xccdf_profile_get_id(profile)
476
                title = oscap_text_itr_get_text(OSCAP.xccdf_profile_get_title(profile))
477
                desc = parse_HTML_from_content(oscap_text_itr_get_text(OSCAP.xccdf_profile_get_description(profile)))
478
                info = ProfileInfo(id_, title, desc)
479
480
                self._profiles.append(info)
481
482
        OSCAP.xccdf_profile_iterator_free(profile_itr)
483
        OSCAP.xccdf_session_free(session)
484
485
    @property
486
    def profiles(self):
487
        """Property for the list of profiles defined in the benchmark."""
488
489
        return self._profiles
490