Passed
Pull Request — rhel8-branch (#148)
by Matěj
01:54
created

org_fedora_oscap.content_handling   F

Complexity

Total Complexity 63

Size/Duplication

Total Lines 498
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 63
eloc 227
dl 0
loc 498
rs 3.36
c 0
b 0
f 0

5 Functions

Rating   Name   Duplication   Size   Complexity  
A oscap_text_itr_get_text() 0 17 2
A identify_files() 0 4 2
A get_doc_type() 0 19 5
D explore_content_files() 0 42 12
A parse_HTML_from_content() 0 13 1

13 Methods

Rating   Name   Duplication   Size   Complexity  
B DataStreamHandler.get_profiles() 0 69 6
A BenchmarkHandler.profiles() 0 5 1
A DataStreamHandler.get_checklists() 0 17 2
A ParseHTMLContent.handle_data() 0 2 1
A ParseHTMLContent.handle_endtag() 0 5 3
A ParseHTMLContent.handle_starttag() 0 7 4
A ParseHTMLContent.__init__() 0 3 1
A DataStreamHandler.__del__() 0 6 2
C BenchmarkHandler.__init__() 0 72 10
A DataStreamHandler.get_data_streams_checklists() 0 14 1
A DataStreamHandler.get_data_streams() 0 11 1
B DataStreamHandler.__init__() 0 62 8
A ParseHTMLContent.get_content() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.content_handling often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2013  Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
# Red Hat Author(s): Vratislav Podzimek <[email protected]>
19
#
20
21
"""
22
Module with various classes for SCAP content processing and retrieving data
23
from it.
24
25
"""
26
27
import os.path
28
29
import multiprocessing
30
from collections import namedtuple, OrderedDict
31
from openscap_api import OSCAP
32
from pyanaconda.core.util import execReadlines
33
try:
34
    from html.parser import HTMLParser
35
except ImportError:
36
    from HTMLParser import HTMLParser
37
38
import logging
39
log = logging.getLogger("anaconda")
40
41
42
CONTENT_TYPES = dict(
43
    DATASTREAM="Source Data Stream",
44
    XCCDF_CHECKLIST="XCCDF Checklist",
45
    OVAL="OVAL Definitions",
46
    CPE_DICT="CPE Dictionary",
47
    TAILORING="XCCDF Tailoring",
48
)
49
50
51
class ContentHandlingError(Exception):
52
    """Exception class for errors related to SCAP content handling."""
53
54
    pass
55
56
57
class DataStreamHandlingError(ContentHandlingError):
58
    """Exception class for errors related to data stream handling."""
59
60
    pass
61
62
63
class BenchmarkHandlingError(ContentHandlingError):
64
    """Exception class for errors related to benchmark handling."""
65
66
    pass
67
68
69
class ContentCheckError(ContentHandlingError):
70
    """Exception class for errors related to content (integrity,...) checking.
71
    
72
    """
73
74
    pass
75
76
77
class ParseHTMLContent(HTMLParser):
78
    """Parser class for HTML tags within content"""
79
80
    def __init__(self):
81
        HTMLParser.__init__(self)
82
        self.content = ""
83
84
    def handle_starttag(self, tag, attrs):
85
        if tag == "html:ul":
86
            self.content += "\n"
87
        elif tag == "html:li":
88
            self.content += "\n"
89
        elif tag == "html:br":
90
            self.content += "\n"
91
92
    def handle_endtag(self, tag):
93
        if tag == "html:ul":
94
            self.content += "\n"
95
        elif tag == "html:li":
96
            self.content += "\n"
97
98
    def handle_data(self, data):
99
        self.content += data.strip()
100
101
    def get_content(self):
102
        return self.content
103
104
105
def parse_HTML_from_content(content):
106
    """This is a very simple HTML to text parser.
107
108
    HTML tags will be removed while trying to maintain readability
109
    of content.
110
111
    :param content: content whose HTML tags will be parsed
112
    :return: content without HTML tags
113
    """
114
115
    parser = ParseHTMLContent()
116
    parser.feed(content)
117
    return parser.get_content()
118
119
120
# namedtuple class (not a constant, pylint!) for info about a XCCDF profile
121
# pylint: disable-msg=C0103
122
ProfileInfo = namedtuple("ProfileInfo", ["id", "title", "description"])
123
124
# namedtuple class for info about content files found
125
# pylint: disable-msg=C0103
126
ContentFiles = namedtuple("ContentFiles", ["xccdf", "cpe", "tailoring"])
127
128
129
def oscap_text_itr_get_text(itr):
130
    """
131
    Helper function for getting a text from the oscap_text_iterator.
132
133
    :param itr: oscap_text_iterator to get the text from
134
    :type itr: oscap_text_iterator
135
    :return: text gotten from the iterator
136
    :rtype: str
137
138
    """
139
140
    ret = ""
141
    while OSCAP.oscap_text_iterator_has_more(itr):
142
        text_item = OSCAP.oscap_text_iterator_next(itr)
143
        ret += OSCAP.oscap_text_get_text(text_item)
144
145
    return ret
146
147
148
def identify_files(fpaths):
149
    with multiprocessing.Pool(os.cpu_count()) as p:
150
        labels = p.map(get_doc_type, fpaths)
151
    return {path: label for (path, label) in zip(fpaths, labels)}
152
153
154
def get_doc_type(file_path):
155
    content_type = "unknown"
156
    try:
157
        for line in execReadlines("oscap", ["info", file_path]):
158
            if line.startswith("Document type:"):
159
                _prefix, _sep, type_info = line.partition(":")
160
                content_type = type_info.strip()
161
                break
162
    except OSError:
163
        # 'oscap info' exitted with a non-zero exit code -> unknown doc
164
        # type
165
        pass
166
    except UnicodeDecodeError:
167
        # 'oscap info' supplied weird output, which happens when it tries
168
        # to explain why it can't examine e.g. a JPG.
169
        return None
170
    log.info("OSCAP addon: Identified {file_path} as {content_type}"
171
             .format(file_path=file_path, content_type=content_type))
172
    return content_type
173
174
175
def explore_content_files(fpaths):
176
    """
177
    Function for finding content files in a list of file paths. SIMPLY PICKS
178
    THE FIRST USABLE CONTENT FILE OF A PARTICULAR TYPE AND JUST PREFERS DATA
179
    STREAMS OVER STANDALONE BENCHMARKS.
180
181
    :param fpaths: a list of file paths to search for content files in
182
    :type fpaths: [str]
183
    :return: a tuple containing the content handling class and an ContentFiles
184
             instance containing the file names of the XCCDF file, CPE
185
             dictionary and tailoring file or "" in place of those items if not
186
             found
187
    :rtype: (class, ContentFiles)
188
189
    """
190
    xccdf_file = ""
191
    cpe_file = ""
192
    tailoring_file = ""
193
    found_ds = False
194
    content_class = None
195
196
    for fpath in fpaths:
197
        doc_type = get_doc_type(fpath)
198
        if not doc_type:
199
            continue
200
201
        # prefer DS over standalone XCCDF
202
        if doc_type == "Source Data Stream" and (not xccdf_file or not found_ds):
203
            xccdf_file = fpath
204
            content_class = DataStreamHandler
205
            found_ds = True
206
        elif doc_type == "XCCDF Checklist" and not xccdf_file:
207
            xccdf_file = fpath
208
            content_class = BenchmarkHandler
209
        elif doc_type == "CPE Dictionary" and not cpe_file:
210
            cpe_file = fpath
211
        elif doc_type == "XCCDF Tailoring" and not tailoring_file:
212
            tailoring_file = fpath
213
214
    # TODO: raise exception if no xccdf_file is found?
215
    files = ContentFiles(xccdf_file, cpe_file, tailoring_file)
216
    return (content_class, files)
217
218
219
class DataStreamHandler(object):
220
    """
221
    Class for handling data streams in the data stream collection and
222
    retrieving data from it. For example a list of data stream indices,
223
    checklists in a given data stream of profiles.
224
225
    """
226
227
    def __init__(self, dsc_file_path, tailoring_file_path=""):
228
        """
229
        Constructor for the DataStreamHandler class.
230
231
        :param dsc_file_path: path to a file with a data stream collection
232
        :type dsc_file_path: str
233
        :param tailoring_file_path: path to a tailoring file
234
        :type tailoring_file_path: str
235
236
        """
237
238
        # is used to speed up getting lists of profiles
239
        self._profiles_cache = dict()
240
241
        if not os.path.exists(dsc_file_path):
242
            msg = "Invalid file path: '%s'" % dsc_file_path
243
            raise DataStreamHandlingError(msg)
244
245
        self._dsc_file_path = dsc_file_path
246
247
        # create an XCCDF session for the file
248
        self._session = OSCAP.xccdf_session_new(dsc_file_path)
249
        if not self._session:
250
            msg = "'%s' is not a valid SCAP content file" % dsc_file_path
251
            raise DataStreamHandlingError(msg)
252
        if OSCAP.xccdf_session_load(self._session) != 0:
253
            raise DataStreamHandlingError(OSCAP.oscap_err_desc())
254
255
        if tailoring_file_path:
256
            OSCAP.xccdf_session_set_user_tailoring_file(self._session,
257
                                                        tailoring_file_path)
258
259
        if not OSCAP.xccdf_session_is_sds(self._session):
260
            msg = "'%s' is not a data stream collection" % dsc_file_path
261
            raise DataStreamHandlingError(msg)
262
263
        # dictionary holding the items gathered from DSC processing
264
        self._items = OrderedDict()
265
266
        # create an sds index for the content
267
        self._sds_idx = OSCAP.xccdf_session_get_sds_idx(self._session)
268
269
        # iterate over streams and get checklists from each stream
270
        streams_itr = OSCAP.ds_sds_index_get_streams(self._sds_idx)
271
        while OSCAP.ds_stream_index_iterator_has_more(streams_itr):
272
            stream_idx = OSCAP.ds_stream_index_iterator_next(streams_itr)
273
274
            # will be used to store the checklists for streams
275
            stream_id = OSCAP.ds_stream_index_get_id(stream_idx)
276
            checklists = []
277
278
            # iterate over checklists and append their ids to the list
279
            chklist_itr = OSCAP.ds_stream_index_get_checklists(stream_idx)
280
            while OSCAP.oscap_string_iterator_has_more(chklist_itr):
281
                checklists.append(OSCAP.oscap_string_iterator_next(chklist_itr))
282
283
            # store the list of checklist for the current stream
284
            self._items[stream_id] = checklists
285
286
            OSCAP.oscap_string_iterator_free(chklist_itr)
287
288
        OSCAP.ds_stream_index_iterator_free(streams_itr)
289
290
    def __del__(self):
291
        """Destructor for the DataStreamHandler class."""
292
293
        if '_session' in locals():
294
            # we should free the session
295
            OSCAP.xccdf_session_free(self._session)
296
297
    def get_data_streams(self):
298
        """
299
        Method to get a list of data streams found in the data stream
300
        collection.
301
302
        :return: list of data stream IDs
303
        :rtype: list of strings
304
305
        """
306
307
        return list(self._items.keys())
308
309
    def get_data_streams_checklists(self):
310
        """
311
        Method to get data streams and their checklists found in the data
312
        stream collection.
313
314
        :return: a dictionary consisting of the IDs of the data streams as keys
315
                 and lists of their checklists' IDs as values
316
        :rtype: dict(str -> list of strings)
317
318
        """
319
320
        # easy, we already have exactly what should be returned, just create a
321
        # copy, so that the caller cannot modify our internal attributes
322
        return dict(self._items)
323
324
    def get_checklists(self, data_stream_id):
325
        """
326
        Method to get a list of checklists found in the data stream given by
327
        the data_stream_id.
328
329
        :param data_stream_id: ID of the data stream to get checklists from
330
        :type data_stream_id: str
331
        :return: list of checklist IDs found in the data stream given by the ID
332
        :rtype: list of strings
333
334
        """
335
336
        if data_stream_id not in self._items:
337
            msg = "Invalid data stream id given: '%s'" % data_stream_id
338
            raise DataStreamHandlingError(msg)
339
340
        return self._items[data_stream_id]
341
342
    def get_profiles(self, data_stream_id, checklist_id):
343
        """
344
        Method to get a list of profiles defined in the checklist given by the
345
        checklist_id that is defined in the data stream given by the
346
        data_stream_id.
347
348
        :param data_stream_id: ID of the data stream to get checklists from
349
        :type data_stream_id: str
350
        :param checklist_id: ID of the checklist to get profiles from
351
        :type checklist_id: str
352
        :return: list of profiles found in the checklist
353
        :rtype: list of ProfileInfo instances
354
355
        """
356
357
        cache_id = "%s;%s" % (data_stream_id, checklist_id)
358
        if cache_id in self._profiles_cache:
359
            # found in cache, return the value
360
            return self._profiles_cache[cache_id]
361
362
        # not found in the cache, needs to be gathered
363
364
        # set the data stream and component (checklist) for the session
365
        OSCAP.xccdf_session_free(self._session)
366
367
        self._session = OSCAP.xccdf_session_new(self._dsc_file_path)
368
        if not self._session:
369
            msg = "'%s' is not a valid SCAP content file" % self._dsc_file_path
370
            raise DataStreamHandlingError(msg)
371
372
        OSCAP.xccdf_session_set_datastream_id(self._session, data_stream_id)
373
        OSCAP.xccdf_session_set_component_id(self._session, checklist_id)
374
        if OSCAP.xccdf_session_load(self._session) != 0:
375
            raise DataStreamHandlingError(OSCAP.oscap_err_desc())
376
377
        # get the benchmark (checklist)
378
        policy_model = OSCAP.xccdf_session_get_policy_model(self._session)
379
380
        default_policy = OSCAP.xccdf_policy_new(policy_model, None)
381
        default_rules_count = OSCAP.xccdf_policy_get_selected_rules_count(default_policy)
382
383
        # will hold items for the profiles for the speficied DS and checklist
384
        profiles = []
385
386
        if default_rules_count > 0:
387
            profiles.append(ProfileInfo("default", "Default",
388
                            "The implicit XCCDF profile. Usually, the default contains no rules."))
389
390
        benchmark = OSCAP.xccdf_policy_model_get_benchmark(policy_model)
391
392
        # iterate over the profiles in the benchmark and store them
393
        profile_itr = OSCAP.xccdf_benchmark_get_profiles(benchmark)
394
        while OSCAP.xccdf_profile_iterator_has_more(profile_itr):
395
            profile = OSCAP.xccdf_profile_iterator_next(profile_itr)
396
397
            id_ = OSCAP.xccdf_profile_get_id(profile)
398
            title = oscap_text_itr_get_text(OSCAP.xccdf_profile_get_title(profile))
399
            desc = parse_HTML_from_content(
400
                oscap_text_itr_get_text(OSCAP.xccdf_profile_get_description(profile)))
401
            info = ProfileInfo(id_, title, desc)
402
403
            profiles.append(info)
404
405
        OSCAP.xccdf_profile_iterator_free(profile_itr)
406
407
        # cache the result
408
        self._profiles_cache[cache_id] = profiles
409
410
        return profiles
411
412
413
class BenchmarkHandler(object):
414
    """
415
    Class for handling XCCDF benchmark and retrieving data from it (mainly the
416
    list of profiles).
417
418
    """
419
420
    def __init__(self, xccdf_file_path, tailoring_file_path=""):
421
        """
422
        Constructor for the BenchmarkHandler class.
423
424
        :param xccdf_file_path: path to a file with an XCCDF benchmark
425
        :type xccdf_file_path: str
426
        :param tailoring_file_path: path to a tailoring file
427
        :type tailoring_file_path: str
428
        """
429
430
        if not os.path.exists(xccdf_file_path):
431
            msg = "Invalid file path: '%s'" % xccdf_file_path
432
            raise BenchmarkHandlingError(msg)
433
434
        session = OSCAP.xccdf_session_new(xccdf_file_path)
435
        if not session:
436
            msg = "'%s' is not a valid SCAP content file" % xccdf_file_path
437
            raise BenchmarkHandlingError(msg)
438
439
        if tailoring_file_path:
440
            OSCAP.xccdf_session_set_user_tailoring_file(session,
441
                                                        tailoring_file_path)
442
        if OSCAP.xccdf_session_load(session) != 0:
443
            raise BenchmarkHandlingError(OSCAP.oscap_err_desc())
444
445
        # get the benchmark object
446
        policy_model = OSCAP.xccdf_session_get_policy_model(session)
447
        benchmark = OSCAP.xccdf_policy_model_get_benchmark(policy_model)
448
449
        default_policy = OSCAP.xccdf_policy_new(policy_model, None)
450
        default_rules_count = OSCAP.xccdf_policy_get_selected_rules_count(default_policy)
451
452
        # stores a list of profiles in the benchmark
453
        self._profiles = []
454
455
        if default_rules_count > 0:
456
            self._profiles.append(
457
                ProfileInfo(
458
                    "default", "Default",
459
                    "The implicit XCCDF profile. Usually, the default contains no rules."))
460
461
        if not benchmark:
462
            msg = "Not a valid benchmark file: '%s'" % xccdf_file_path
463
            raise BenchmarkHandlingError(msg)
464
465
        # iterate over the profiles in the benchmark and store them
466
        profile_itr = OSCAP.xccdf_benchmark_get_profiles(benchmark)
467
        while OSCAP.xccdf_profile_iterator_has_more(profile_itr):
468
            profile = OSCAP.xccdf_profile_iterator_next(profile_itr)
469
470
            id_ = OSCAP.xccdf_profile_get_id(profile)
471
            title = oscap_text_itr_get_text(OSCAP.xccdf_profile_get_title(profile))
472
            desc = parse_HTML_from_content(oscap_text_itr_get_text(OSCAP.xccdf_profile_get_description(profile)))
473
            info = ProfileInfo(id_, title, desc)
474
475
            self._profiles.append(info)
476
477
        if tailoring_file_path:
478
            tailoring = OSCAP.xccdf_policy_model_get_tailoring(policy_model)
479
            profile_itr = OSCAP.xccdf_tailoring_get_profiles(tailoring)
480
            while OSCAP.xccdf_profile_iterator_has_more(profile_itr):
481
                profile = OSCAP.xccdf_profile_iterator_next(profile_itr)
482
483
                id_ = OSCAP.xccdf_profile_get_id(profile)
484
                title = oscap_text_itr_get_text(OSCAP.xccdf_profile_get_title(profile))
485
                desc = parse_HTML_from_content(oscap_text_itr_get_text(OSCAP.xccdf_profile_get_description(profile)))
486
                info = ProfileInfo(id_, title, desc)
487
488
                self._profiles.append(info)
489
490
        OSCAP.xccdf_profile_iterator_free(profile_itr)
491
        OSCAP.xccdf_session_free(session)
492
493
    @property
494
    def profiles(self):
495
        """Property for the list of profiles defined in the benchmark."""
496
497
        return self._profiles
498