Passed
Pull Request — rhel8-branch (#220)
by Matěj
02:04
created

org_fedora_oscap.content_discovery   F

Complexity

Total Complexity 78

Size/Duplication

Total Lines 398
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 78
eloc 273
dl 0
loc 398
rs 2.16
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A is_network() 0 4 1

26 Methods

Rating   Name   Duplication   Size   Complexity  
A ContentBringer._fetch_files() 0 18 5
A ContentBringer.content_uri() 0 3 1
A ContentBringer._verify_fingerprint() 0 14 3
A ContentBringer._start_actual_fetch() 0 26 4
A ContentBringer.fetch_content() 0 16 1
A ContentBringer.__init__() 0 11 1
A ContentBringer.finish_content_fetch() 0 31 3
A ContentBringer.get_content_type() 0 7 3
A ObtainedContent.find_expected_usable_content() 0 14 3
A ContentBringer.use_downloaded_content() 0 11 2
A ContentBringer.reduce_files() 0 13 5
A ObtainedContent._datastream_content() 0 6 3
B ContentBringer._gather_available_files() 0 28 6
A ContentBringer.get_preferred_tailoring() 0 7 3
A ObtainedContent.record_verification() 0 7 1
A ObtainedContent.add_content_archive() 0 7 1
A ObtainedContent._assign_content_type() 0 8 2
A ContentBringer.allow_one_expected_tailoring_or_no_tailoring() 0 11 2
A ObtainedContent._xccdf_content() 0 7 5
A ObtainedContent.add_file() 0 11 5
A ObtainedContent.select_main_usable_content() 0 10 3
A ContentBringer.get_preferred_content() 0 6 2
A ContentBringer.use_system_content() 0 4 1
A ContentBringer.filter_discovered_content() 0 14 3
B ContentBringer._finish_actual_fetch() 0 28 7
A ObtainedContent.__init__() 0 9 1

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.content_discovery often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import threading
2
import logging
3
import pathlib
4
import shutil
5
import os
6
from glob import glob
7
8
from pyanaconda.core import constants
9
from pyanaconda.threading import threadMgr
10
from pykickstart.errors import KickstartValueError
11
12
from org_fedora_oscap import data_fetch, utils
13
from org_fedora_oscap import common
14
from org_fedora_oscap import content_handling
15
from org_fedora_oscap.content_handling import CONTENT_TYPES
16
17
from org_fedora_oscap.common import _
18
19
log = logging.getLogger("anaconda")
20
21
22
def is_network(scheme):
23
    return any(
24
        scheme.startswith(net_prefix)
25
        for net_prefix in data_fetch.NET_URL_PREFIXES)
26
27
28
class ContentBringer:
29
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
30
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
31
32
    def __init__(self, addon_data):
33
        self.content_uri_scheme = ""
34
        self.content_uri_path = ""
35
        self.fetched_content = ""
36
37
        self.activity_lock = threading.Lock()
38
        self.now_fetching_or_processing = False
39
40
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
41
42
        self._addon_data = addon_data
43
44
    def get_content_type(self, url):
45
        if url.endswith(".rpm"):
46
            return "rpm"
47
        elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES):
48
            return "archive"
49
        else:
50
            return "file"
51
52
    @property
53
    def content_uri(self):
54
        return self.content_uri_scheme + "://" + self.content_uri_path
55
56
    @content_uri.setter
57
    def content_uri(self, uri):
58
        scheme, path = uri.split("://", 1)
59
        self.content_uri_path = path
60
        self.content_uri_scheme = scheme
61
62
    def fetch_content(self, what_if_fail, ca_certs_path=""):
63
        """
64
        Initiate fetch of the content into an appropriate directory
65
66
        Args:
67
            what_if_fail: Callback accepting exception as an argument that
68
                should handle them in the calling layer.
69
            ca_certs_path: Path to the HTTPS certificate file
70
        """
71
        self.content_uri = self._addon_data.content_url
72
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
73
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
74
        fetching_thread_name = self._fetch_files(
75
            self.content_uri_scheme, self.content_uri_path,
76
            self.CONTENT_DOWNLOAD_LOCATION, ca_certs_path, what_if_fail)
77
        return fetching_thread_name
78
79
    def _fetch_files(self, scheme, path, destdir, ca_certs_path, what_if_fail):
80
        with self.activity_lock:
81
            if self.now_fetching_or_processing:
82
                msg = "Strange, it seems that we are already fetching something."
83
                log.warn(msg)
84
                return
85
            self.now_fetching_or_processing = True
86
87
        fetching_thread_name = None
88
        try:
89
            fetching_thread_name = self._start_actual_fetch(scheme, path, destdir, ca_certs_path)
90
        except Exception as exc:
91
            with self.activity_lock:
92
                self.now_fetching_or_processing = False
93
            what_if_fail(exc)
94
95
        # We are not finished yet with the fetch
96
        return fetching_thread_name
97
98
    def _start_actual_fetch(self, scheme, path, destdir, ca_certs_path):
99
        fetching_thread_name = None
100
        url = scheme + "://" + path
101
102
        if "/" not in path:
103
            msg = f"Missing the path component of the '{url}' URL"
104
            raise KickstartValueError(msg)
105
        basename = path.rsplit("/", 1)[1]
106
        if not basename:
107
            msg = f"Unable to deduce basename from the '{url}' URL"
108
            raise KickstartValueError(msg)
109
110
        dest = destdir / basename
111
112
        if is_network(scheme):
113
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
114
                url,
115
                dest,
116
                ca_certs_path
117
            )
118
        else:  # invalid schemes are handled down the road
119
            fetching_thread_name = data_fetch.fetch_local_data(
120
                url,
121
                dest,
122
            )
123
        return fetching_thread_name
124
125
    def finish_content_fetch(self, fetching_thread_name, fingerprint, report_callback, dest_filename,
126
                             what_if_fail):
127
        """
128
        Finish any ongoing fetch and analyze what has been fetched.
129
130
        After the fetch is completed, it analyzes verifies fetched content if applicable,
131
        analyzes it and compiles into an instance of ObtainedContent.
132
133
        Args:
134
            fetching_thread_name: Name of the fetching thread
135
                or None if we are only after the analysis
136
            fingerprint: A checksum for downloaded file verification
137
            report_callback: Means for the method to send user-relevant messages outside
138
            dest_filename: The target of the fetch operation. Can be falsy -
139
                in this case there is no content filename defined
140
            what_if_fail: Callback accepting exception as an argument
141
                that should handle them in the calling layer.
142
143
        Returns:
144
            Instance of ObtainedContent if everything went well, or None.
145
        """
146
        try:
147
            content = self._finish_actual_fetch(fetching_thread_name, fingerprint, report_callback, dest_filename)
148
        except Exception as exc:
149
            what_if_fail(exc)
150
            content = None
151
        finally:
152
            with self.activity_lock:
153
                self.now_fetching_or_processing = False
154
155
        return content
156
157
    def _verify_fingerprint(self, dest_filename, fingerprint=""):
158
        if not fingerprint:
159
            return
160
161
        hash_obj = utils.get_hashing_algorithm(fingerprint)
162
        digest = utils.get_file_fingerprint(dest_filename,
163
                                            hash_obj)
164
        if digest != fingerprint:
165
            log.error(
166
                f"File {dest_filename} failed integrity check - assumed a "
167
                f"{hash_obj.name} hash and '{fingerprint}', got '{digest}'"
168
            )
169
            msg = _(f"Integrity check of the content failed - {hash_obj.name} hash didn't match")
170
            raise content_handling.ContentCheckError(msg)
171
172
    def allow_one_expected_tailoring_or_no_tailoring(self, labelled_files):
173
        expected_tailoring = self._addon_data.tailoring_path
174
        tailoring_label = CONTENT_TYPES["TAILORING"]
175
        if expected_tailoring:
176
            labelled_files = self.reduce_files(labelled_files, expected_tailoring, [tailoring_label])
177
        else:
178
            labelled_files = {
179
                path: label for path, label in labelled_files.items()
180
                if label != tailoring_label
181
            }
182
        return labelled_files
183
184
    def filter_discovered_content(self, labelled_files):
185
        expected_path = self._addon_data.content_path
186
        categories = (CONTENT_TYPES["DATASTREAM"], CONTENT_TYPES["XCCDF_CHECKLIST"])
187
        if expected_path:
188
            labelled_files = self.reduce_files(labelled_files, expected_path, categories)
189
190
        labelled_files = self.allow_one_expected_tailoring_or_no_tailoring(labelled_files)
191
192
        expected_path = self._addon_data.cpe_path
193
        categories = (CONTENT_TYPES["CPE_DICT"], )
194
        if expected_path:
195
            labelled_files = self.reduce_files(labelled_files, expected_path, categories)
196
197
        return labelled_files
198
199
    def reduce_files(self, labelled_files, expected_path, categories):
200
        reduced_files = dict()
201
        if expected_path not in labelled_files:
202
            msg = (
203
                f"Expected a file {expected_path} to be part of the supplied content, "
204
                f"but it was not the case, got only {list(labelled_files.keys())}"
205
            )
206
            raise RuntimeError(msg)
207
        for path, label in labelled_files.items():
208
            if label in categories and path != expected_path:
209
                continue
210
            reduced_files[path] = label
211
        return reduced_files
212
213
    def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
214
        threadMgr.wait(wait_for)
215
        actually_fetched_content = wait_for is not None
216
217
        if fingerprint and dest_filename:
218
            self._verify_fingerprint(dest_filename, fingerprint)
219
220
        fpaths = self._gather_available_files(actually_fetched_content, dest_filename)
221
222
        structured_content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION)
223
        content_type = self.get_content_type(str(dest_filename))
224
        if content_type in ("archive", "rpm"):
225
            structured_content.add_content_archive(dest_filename)
226
227
        labelled_filenames = content_handling.identify_files(fpaths)
228
        labelled_relative_filenames = {
229
            os.path.relpath(path, self.CONTENT_DOWNLOAD_LOCATION): label
230
            for path, label in labelled_filenames.items()}
231
        labelled_relative_filenames = self.filter_discovered_content(labelled_relative_filenames)
232
233
        for rel_fname, label in labelled_relative_filenames.items():
234
            fname = self.CONTENT_DOWNLOAD_LOCATION / rel_fname
235
            structured_content.add_file(str(fname), label)
236
237
        if fingerprint and dest_filename:
238
            structured_content.record_verification(dest_filename)
239
240
        return structured_content
241
242
    def _gather_available_files(self, actually_fetched_content, dest_filename):
243
        fpaths = []
244
        if not actually_fetched_content:
245
            if not dest_filename:  # using scap-security-guide
246
                fpaths = [self.DEFAULT_SSG_DATA_STREAM_PATH]
247
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
248
                fpaths = pathlib.Path(self.CONTENT_DOWNLOAD_LOCATION).rglob("*")
249
                fpaths = [str(p) for p in fpaths if p.is_file()]
250
        else:
251
            dest_filename = pathlib.Path(dest_filename)
252
            # RPM is an archive at this phase
253
            content_type = self.get_content_type(str(dest_filename))
254
            if content_type in ("archive", "rpm"):
255
                try:
256
                    fpaths = common.extract_data(
257
                        str(dest_filename),
258
                        str(dest_filename.parent)
259
                    )
260
                except common.ExtractionError as err:
261
                    msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}"
262
                    log.error(msg)
263
                    raise err
264
265
            elif content_type == "file":
266
                fpaths = [str(dest_filename)]
267
            else:
268
                raise common.OSCAPaddonError("Unsupported content type")
269
        return fpaths
270
271
    def use_downloaded_content(self, content):
272
        preferred_content = self.get_preferred_content(content)
273
274
        # We know that we have ended up with a datastream-like content,
275
        # but if we can't convert an archive to a datastream.
276
        # self._addon_data.content_type = "datastream"
277
        self._addon_data.content_path = str(preferred_content.relative_to(content.root))
278
279
        preferred_tailoring = self.get_preferred_tailoring(content)
280
        if content.tailoring:
281
            self._addon_data.tailoring_path = str(preferred_tailoring.relative_to(content.root))
282
283
    def use_system_content(self, content=None):
284
        self._addon_data.clear_all()
285
        self._addon_data.content_type = "scap-security-guide"
286
        self._addon_data.content_path = common.get_ssg_path()
287
288
    def get_preferred_content(self, content):
289
        if self._addon_data.content_path:
290
            preferred_content = content.find_expected_usable_content(self._addon_data.content_path)
291
        else:
292
            preferred_content = content.select_main_usable_content()
293
        return preferred_content
294
295
    def get_preferred_tailoring(self, content):
296
        tailoring_path = self._addon_data.tailoring_path
297
        if tailoring_path:
298
            if tailoring_path != str(content.tailoring.relative_to(content.root)):
299
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
300
                raise content_handling.ContentHandlingError(msg)
301
        return content.tailoring
302
303
304
class ObtainedContent:
305
    """
306
    This class aims to assist the gathered files discovery -
307
    the addon can downloaded files directly, or they can be extracted for an archive.
308
    The class enables user to quickly understand what is available,
309
    and whether the current set of contents is usable for further processing.
310
    """
311
    def __init__(self, root):
312
        self.labelled_files = dict()
313
        self.datastream = ""
314
        self.xccdf = ""
315
        self.ovals = []
316
        self.tailoring = ""
317
        self.archive = ""
318
        self.verified = ""
319
        self.root = pathlib.Path(root)
320
321
    def record_verification(self, path):
322
        """
323
        Declare a file as verified (typically by means of a checksum)
324
        """
325
        path = pathlib.Path(path)
326
        assert path in self.labelled_files
327
        self.verified = path
328
329
    def add_content_archive(self, fname):
330
        """
331
        If files come from an archive, record this information using this function.
332
        """
333
        path = pathlib.Path(fname)
334
        self.labelled_files[path] = None
335
        self.archive = path
336
337
    def _assign_content_type(self, attribute_name, new_value):
338
        old_value = getattr(self, attribute_name)
339
        if old_value:
340
            msg = (
341
                f"When dealing with {attribute_name}, "
342
                f"there was already the {old_value.name} when setting the new {new_value.name}")
343
            raise content_handling.ContentHandlingError(msg)
344
        setattr(self, attribute_name, new_value)
345
346
    def add_file(self, fname, label):
347
        path = pathlib.Path(fname)
348
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
349
            self._assign_content_type("tailoring", path)
350
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
351
            self._assign_content_type("datastream", path)
352
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
353
            self.ovals.append(path)
354
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
355
            self._assign_content_type("xccdf", path)
356
        self.labelled_files[path] = label
357
358
    def _datastream_content(self):
359
        if not self.datastream:
360
            return None
361
        if not self.datastream.exists():
362
            return None
363
        return self.datastream
364
365
    def _xccdf_content(self):
366
        if not self.xccdf or not self.ovals:
367
            return None
368
        some_ovals_exist = any([path.exists() for path in self.ovals])
369
        if not (self.xccdf.exists() and some_ovals_exist):
370
            return None
371
        return self.xccdf
372
373
    def find_expected_usable_content(self, relative_expected_content_path):
374
        content_path = self.root / relative_expected_content_path
375
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
376
377
        if content_path in eligible_main_content:
378
            return content_path
379
        else:
380
            if not content_path.exists():
381
                msg = f"Couldn't find '{content_path}' among the available content"
382
            else:
383
                msg = (
384
                    f"File '{content_path}' is not a valid datastream "
385
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
386
            raise content_handling.ContentHandlingError(msg)
387
388
    def select_main_usable_content(self):
389
        if self._datastream_content():
390
            return self._datastream_content()
391
        elif self._xccdf_content():
392
            return self._xccdf_content()
393
        else:
394
            msg = (
395
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
396
                "among the available content")
397
            raise content_handling.ContentHandlingError(msg)
398