Passed
Pull Request — rhel8-branch (#220)
by Matěj
01:24
created

org_fedora_oscap.content_discovery   F

Complexity

Total Complexity 77

Size/Duplication

Total Lines 389
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 77
eloc 267
dl 0
loc 389
rs 2.24
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A is_network() 0 4 1

25 Methods

Rating   Name   Duplication   Size   Complexity  
A ContentBringer._fetch_files() 0 18 5
A ContentBringer.content_uri() 0 3 1
A ObtainedContent.find_expected_usable_content() 0 14 3
A ContentBringer.use_downloaded_content() 0 11 2
A ContentBringer.reduce_files() 0 13 5
A ObtainedContent._datastream_content() 0 6 3
B ContentBringer._gather_available_files() 0 28 6
A ObtainedContent.record_verification() 0 7 1
A ContentBringer.get_preferred_tailoring() 0 7 3
A ObtainedContent.add_content_archive() 0 7 1
A ObtainedContent._assign_content_type() 0 8 2
A ContentBringer._verify_fingerprint() 0 14 3
A ObtainedContent._xccdf_content() 0 7 5
A ObtainedContent.add_file() 0 11 5
A ObtainedContent.select_main_usable_content() 0 10 3
A ContentBringer.get_preferred_content() 0 6 2
A ContentBringer._start_actual_fetch() 0 26 4
A ContentBringer.use_system_content() 0 4 1
A ContentBringer.fetch_content() 0 16 1
A ContentBringer.__init__() 0 11 1
A ContentBringer.filter_discovered_content() 0 17 4
A ContentBringer.finish_content_fetch() 0 31 3
B ContentBringer._finish_actual_fetch() 0 28 7
A ContentBringer.get_content_type() 0 7 3
A ObtainedContent.__init__() 0 9 1

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.content_discovery often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import threading
2
import logging
3
import pathlib
4
import shutil
5
import os
6
from glob import glob
7
8
from pyanaconda.core import constants
9
from pyanaconda.threading import threadMgr
10
from pykickstart.errors import KickstartValueError
11
12
from org_fedora_oscap import data_fetch, utils
13
from org_fedora_oscap import common
14
from org_fedora_oscap import content_handling
15
from org_fedora_oscap.content_handling import CONTENT_TYPES
16
17
from org_fedora_oscap.common import _
18
19
log = logging.getLogger("anaconda")
20
21
22
def is_network(scheme):
23
    return any(
24
        scheme.startswith(net_prefix)
25
        for net_prefix in data_fetch.NET_URL_PREFIXES)
26
27
28
class ContentBringer:
29
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
30
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
31
32
    def __init__(self, addon_data):
33
        self.content_uri_scheme = ""
34
        self.content_uri_path = ""
35
        self.fetched_content = ""
36
37
        self.activity_lock = threading.Lock()
38
        self.now_fetching_or_processing = False
39
40
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
41
42
        self._addon_data = addon_data
43
44
    def get_content_type(self, url):
45
        if url.endswith(".rpm"):
46
            return "rpm"
47
        elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES):
48
            return "archive"
49
        else:
50
            return "file"
51
52
    @property
53
    def content_uri(self):
54
        return self.content_uri_scheme + "://" + self.content_uri_path
55
56
    @content_uri.setter
57
    def content_uri(self, uri):
58
        scheme, path = uri.split("://", 1)
59
        self.content_uri_path = path
60
        self.content_uri_scheme = scheme
61
62
    def fetch_content(self, what_if_fail, ca_certs_path=""):
63
        """
64
        Initiate fetch of the content into an appropriate directory
65
66
        Args:
67
            what_if_fail: Callback accepting exception as an argument that
68
                should handle them in the calling layer.
69
            ca_certs_path: Path to the HTTPS certificate file
70
        """
71
        self.content_uri = self._addon_data.content_url
72
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
73
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
74
        fetching_thread_name = self._fetch_files(
75
            self.content_uri_scheme, self.content_uri_path,
76
            self.CONTENT_DOWNLOAD_LOCATION, ca_certs_path, what_if_fail)
77
        return fetching_thread_name
78
79
    def _fetch_files(self, scheme, path, destdir, ca_certs_path, what_if_fail):
80
        with self.activity_lock:
81
            if self.now_fetching_or_processing:
82
                msg = "Strange, it seems that we are already fetching something."
83
                log.warn(msg)
84
                return
85
            self.now_fetching_or_processing = True
86
87
        fetching_thread_name = None
88
        try:
89
            fetching_thread_name = self._start_actual_fetch(scheme, path, destdir, ca_certs_path)
90
        except Exception as exc:
91
            with self.activity_lock:
92
                self.now_fetching_or_processing = False
93
            what_if_fail(exc)
94
95
        # We are not finished yet with the fetch
96
        return fetching_thread_name
97
98
    def _start_actual_fetch(self, scheme, path, destdir, ca_certs_path):
99
        fetching_thread_name = None
100
        url = scheme + "://" + path
101
102
        if "/" not in path:
103
            msg = f"Missing the path component of the '{url}' URL"
104
            raise KickstartValueError(msg)
105
        basename = path.rsplit("/", 1)[1]
106
        if not basename:
107
            msg = f"Unable to deduce basename from the '{url}' URL"
108
            raise KickstartValueError(msg)
109
110
        dest = destdir / basename
111
112
        if is_network(scheme):
113
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
114
                url,
115
                dest,
116
                ca_certs_path
117
            )
118
        else:  # invalid schemes are handled down the road
119
            fetching_thread_name = data_fetch.fetch_local_data(
120
                url,
121
                dest,
122
            )
123
        return fetching_thread_name
124
125
    def finish_content_fetch(self, fetching_thread_name, fingerprint, report_callback, dest_filename,
126
                             what_if_fail):
127
        """
128
        Finish any ongoing fetch and analyze what has been fetched.
129
130
        After the fetch is completed, it analyzes verifies fetched content if applicable,
131
        analyzes it and compiles into an instance of ObtainedContent.
132
133
        Args:
134
            fetching_thread_name: Name of the fetching thread
135
                or None if we are only after the analysis
136
            fingerprint: A checksum for downloaded file verification
137
            report_callback: Means for the method to send user-relevant messages outside
138
            dest_filename: The target of the fetch operation. Can be falsy -
139
                in this case there is no content filename defined
140
            what_if_fail: Callback accepting exception as an argument
141
                that should handle them in the calling layer.
142
143
        Returns:
144
            Instance of ObtainedContent if everything went well, or None.
145
        """
146
        try:
147
            content = self._finish_actual_fetch(fetching_thread_name, fingerprint, report_callback, dest_filename)
148
        except Exception as exc:
149
            what_if_fail(exc)
150
            content = None
151
        finally:
152
            with self.activity_lock:
153
                self.now_fetching_or_processing = False
154
155
        return content
156
157
    def _verify_fingerprint(self, dest_filename, fingerprint=""):
158
        if not fingerprint:
159
            return
160
161
        hash_obj = utils.get_hashing_algorithm(fingerprint)
162
        digest = utils.get_file_fingerprint(dest_filename,
163
                                            hash_obj)
164
        if digest != fingerprint:
165
            log.error(
166
                f"File {dest_filename} failed integrity check - assumed a "
167
                f"{hash_obj.name} hash and '{fingerprint}', got '{digest}'"
168
            )
169
            msg = _(f"Integrity check of the content failed - {hash_obj.name} hash didn't match")
170
            raise content_handling.ContentCheckError(msg)
171
172
    def filter_discovered_content(self, labelled_files):
173
        expected_path = self._addon_data.content_path
174
        categories = (CONTENT_TYPES["DATASTREAM"], CONTENT_TYPES["XCCDF_CHECKLIST"])
175
        if expected_path:
176
            labelled_files = self.reduce_files(labelled_files, expected_path, categories)
177
178
        expected_path = self._addon_data.tailoring_path
179
        categories = (CONTENT_TYPES["TAILORING"], )
180
        if expected_path:
181
            labelled_files = self.reduce_files(labelled_files, expected_path, categories)
182
183
        expected_path = self._addon_data.cpe_path
184
        categories = (CONTENT_TYPES["CPE_DICT"], )
185
        if expected_path:
186
            labelled_files = self.reduce_files(labelled_files, expected_path, categories)
187
188
        return labelled_files
189
190
    def reduce_files(self, labelled_files, expected_path, categories):
191
        reduced_files = dict()
192
        if expected_path not in labelled_files:
193
            msg = (
194
                f"Expected a file {expected_path} to be part of the supplied content, "
195
                f"but it was not the case, got only {list(labelled_files.keys())}"
196
            )
197
            raise RuntimeError(msg)
198
        for path, label in labelled_files.items():
199
            if label in categories and path != expected_path:
200
                continue
201
            reduced_files[path] = label
202
        return reduced_files
203
204
    def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
205
        threadMgr.wait(wait_for)
206
        actually_fetched_content = wait_for is not None
207
208
        if fingerprint and dest_filename:
209
            self._verify_fingerprint(dest_filename, fingerprint)
210
211
        fpaths = self._gather_available_files(actually_fetched_content, dest_filename)
212
213
        structured_content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION)
214
        content_type = self.get_content_type(str(dest_filename))
215
        if content_type in ("archive", "rpm"):
216
            structured_content.add_content_archive(dest_filename)
217
218
        labelled_filenames = content_handling.identify_files(fpaths)
219
        labelled_relative_filenames = {
220
            os.path.relpath(path, self.CONTENT_DOWNLOAD_LOCATION): label
221
            for path, label in labelled_filenames.items()}
222
        labelled_relative_filenames = self.filter_discovered_content(labelled_relative_filenames)
223
224
        for rel_fname, label in labelled_relative_filenames.items():
225
            fname = self.CONTENT_DOWNLOAD_LOCATION / rel_fname
226
            structured_content.add_file(str(fname), label)
227
228
        if fingerprint and dest_filename:
229
            structured_content.record_verification(dest_filename)
230
231
        return structured_content
232
233
    def _gather_available_files(self, actually_fetched_content, dest_filename):
234
        fpaths = []
235
        if not actually_fetched_content:
236
            if not dest_filename:  # using scap-security-guide
237
                fpaths = [self.DEFAULT_SSG_DATA_STREAM_PATH]
238
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
239
                fpaths = pathlib.Path(self.CONTENT_DOWNLOAD_LOCATION).rglob("*")
240
                fpaths = [str(p) for p in fpaths if p.is_file()]
241
        else:
242
            dest_filename = pathlib.Path(dest_filename)
243
            # RPM is an archive at this phase
244
            content_type = self.get_content_type(str(dest_filename))
245
            if content_type in ("archive", "rpm"):
246
                try:
247
                    fpaths = common.extract_data(
248
                        str(dest_filename),
249
                        str(dest_filename.parent)
250
                    )
251
                except common.ExtractionError as err:
252
                    msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}"
253
                    log.error(msg)
254
                    raise err
255
256
            elif content_type == "file":
257
                fpaths = [str(dest_filename)]
258
            else:
259
                raise common.OSCAPaddonError("Unsupported content type")
260
        return fpaths
261
262
    def use_downloaded_content(self, content):
263
        preferred_content = self.get_preferred_content(content)
264
265
        # We know that we have ended up with a datastream-like content,
266
        # but if we can't convert an archive to a datastream.
267
        # self._addon_data.content_type = "datastream"
268
        self._addon_data.content_path = str(preferred_content.relative_to(content.root))
269
270
        preferred_tailoring = self.get_preferred_tailoring(content)
271
        if content.tailoring:
272
            self._addon_data.tailoring_path = str(preferred_tailoring.relative_to(content.root))
273
274
    def use_system_content(self, content=None):
275
        self._addon_data.clear_all()
276
        self._addon_data.content_type = "scap-security-guide"
277
        self._addon_data.content_path = common.get_ssg_path()
278
279
    def get_preferred_content(self, content):
280
        if self._addon_data.content_path:
281
            preferred_content = content.find_expected_usable_content(self._addon_data.content_path)
282
        else:
283
            preferred_content = content.select_main_usable_content()
284
        return preferred_content
285
286
    def get_preferred_tailoring(self, content):
287
        tailoring_path = self._addon_data.tailoring_path
288
        if tailoring_path:
289
            if tailoring_path != str(content.tailoring.relative_to(content.root)):
290
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
291
                raise content_handling.ContentHandlingError(msg)
292
        return content.tailoring
293
294
295
class ObtainedContent:
296
    """
297
    This class aims to assist the gathered files discovery -
298
    the addon can downloaded files directly, or they can be extracted for an archive.
299
    The class enables user to quickly understand what is available,
300
    and whether the current set of contents is usable for further processing.
301
    """
302
    def __init__(self, root):
303
        self.labelled_files = dict()
304
        self.datastream = ""
305
        self.xccdf = ""
306
        self.ovals = []
307
        self.tailoring = ""
308
        self.archive = ""
309
        self.verified = ""
310
        self.root = pathlib.Path(root)
311
312
    def record_verification(self, path):
313
        """
314
        Declare a file as verified (typically by means of a checksum)
315
        """
316
        path = pathlib.Path(path)
317
        assert path in self.labelled_files
318
        self.verified = path
319
320
    def add_content_archive(self, fname):
321
        """
322
        If files come from an archive, record this information using this function.
323
        """
324
        path = pathlib.Path(fname)
325
        self.labelled_files[path] = None
326
        self.archive = path
327
328
    def _assign_content_type(self, attribute_name, new_value):
329
        old_value = getattr(self, attribute_name)
330
        if old_value:
331
            msg = (
332
                f"When dealing with {attribute_name}, "
333
                f"there was already the {old_value.name} when setting the new {new_value.name}")
334
            raise content_handling.ContentHandlingError(msg)
335
        setattr(self, attribute_name, new_value)
336
337
    def add_file(self, fname, label):
338
        path = pathlib.Path(fname)
339
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
340
            self._assign_content_type("tailoring", path)
341
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
342
            self._assign_content_type("datastream", path)
343
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
344
            self.ovals.append(path)
345
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
346
            self._assign_content_type("xccdf", path)
347
        self.labelled_files[path] = label
348
349
    def _datastream_content(self):
350
        if not self.datastream:
351
            return None
352
        if not self.datastream.exists():
353
            return None
354
        return self.datastream
355
356
    def _xccdf_content(self):
357
        if not self.xccdf or not self.ovals:
358
            return None
359
        some_ovals_exist = any([path.exists() for path in self.ovals])
360
        if not (self.xccdf.exists() and some_ovals_exist):
361
            return None
362
        return self.xccdf
363
364
    def find_expected_usable_content(self, relative_expected_content_path):
365
        content_path = self.root / relative_expected_content_path
366
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
367
368
        if content_path in eligible_main_content:
369
            return content_path
370
        else:
371
            if not content_path.exists():
372
                msg = f"Couldn't find '{content_path}' among the available content"
373
            else:
374
                msg = (
375
                    f"File '{content_path}' is not a valid datastream "
376
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
377
            raise content_handling.ContentHandlingError(msg)
378
379
    def select_main_usable_content(self):
380
        if self._datastream_content():
381
            return self._datastream_content()
382
        elif self._xccdf_content():
383
            return self._xccdf_content()
384
        else:
385
            msg = (
386
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
387
                "among the available content")
388
            raise content_handling.ContentHandlingError(msg)
389