Passed
Pull Request — rhel9-branch (#159)
by Matěj
01:16
created

org_fedora_oscap.content_discovery   F

Complexity

Total Complexity 71

Size/Duplication

Total Lines 373
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 253
dl 0
loc 373
rs 2.7199
c 0
b 0
f 0
wmc 71

23 Methods

Rating   Name   Duplication   Size   Complexity  
A ContentBringer._fetch_files() 0 18 5
A ContentBringer.content_uri() 0 3 1
A ContentBringer.use_downloaded_content() 0 11 2
B ContentBringer._gather_available_files() 0 27 6
A ObtainedContent.record_verification() 0 7 1
A ContentBringer.get_preferred_tailoring() 0 7 3
A ObtainedContent.add_content_archive() 0 7 1
A ContentBringer._verify_fingerprint() 0 16 3
A ContentBringer.get_preferred_content() 0 6 2
A ContentBringer._start_actual_fetch() 0 26 4
A ContentBringer.use_system_content() 0 4 1
A ContentBringer.fetch_content() 0 16 1
A ContentBringer.__init__() 0 11 1
A ContentBringer.finish_content_fetch() 0 31 3
B ContentBringer._finish_actual_fetch() 0 22 7
A ContentBringer.get_content_type() 0 7 3
A ObtainedContent.__init__() 0 9 1
A ObtainedContent.find_expected_usable_content() 0 14 3
A ObtainedContent._datastream_content() 0 6 3
A ObtainedContent._assign_content_type() 0 8 3
A ObtainedContent._xccdf_content() 0 7 5
B ObtainedContent.add_file() 0 13 6
A ObtainedContent.select_main_usable_content() 0 10 3

2 Functions

Rating   Name   Duplication   Size   Complexity  
A clear_all() 0 17 1
A is_network() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.content_discovery often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import threading
2
import logging
3
import pathlib
4
import shutil
5
from glob import glob
6
from typing import List
7
8
from pyanaconda.core import constants
9
from pyanaconda.threading import threadMgr
10
from pykickstart.errors import KickstartValueError
11
12
from org_fedora_oscap import data_fetch, utils
13
from org_fedora_oscap import common
14
from org_fedora_oscap import content_handling
15
from org_fedora_oscap import rule_handling
16
17
from org_fedora_oscap.common import _
18
19
log = logging.getLogger("anaconda")
20
21
22
def is_network(scheme):
23
    return any(
24
        scheme.startswith(net_prefix)
25
        for net_prefix in data_fetch.NET_URL_PREFIXES)
26
27
28
def clear_all(data):
29
    data.content_type = ""
30
    data.content_url = ""
31
    data.datastream_id = ""
32
    data.xccdf_id = ""
33
    data.profile_id = ""
34
    data.content_path = ""
35
    data.cpe_path = ""
36
    data.tailoring_path = ""
37
38
    data.fingerprint = ""
39
40
    data.certificates = ""
41
42
    # internal values
43
    data.rule_data = rule_handling.RuleData()
44
    data.dry_run = False
45
46
47
class ContentBringer:
48
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
49
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
50
51
    def __init__(self, addon_data):
52
        self.content_uri_scheme = ""
53
        self.content_uri_path = ""
54
        self.fetched_content = ""
55
56
        self.activity_lock = threading.Lock()
57
        self.now_fetching_or_processing = False
58
59
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
60
61
        self._addon_data = addon_data
62
63
    def get_content_type(self, url):
64
        if url.endswith(".rpm"):
65
            return "rpm"
66
        elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES):
67
            return "archive"
68
        else:
69
            return "file"
70
71
    @property
72
    def content_uri(self):
73
        return self.content_uri_scheme + "://" + self.content_uri_path
74
75
    @content_uri.setter
76
    def content_uri(self, uri):
77
        scheme, path = uri.split("://", 1)
78
        self.content_uri_path = path
79
        self.content_uri_scheme = scheme
80
81
    def fetch_content(self, what_if_fail, ca_certs_path=""):
82
        """
83
        Initiate fetch of the content into an appropriate directory
84
85
        Args:
86
            what_if_fail: Callback accepting exception as an argument that
87
                should handle them in the calling layer.
88
            ca_certs_path: Path to the HTTPS certificate file
89
        """
90
        self.content_uri = self._addon_data.content_url
91
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
92
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
93
        fetching_thread_name = self._fetch_files(
94
            self.content_uri_scheme, self.content_uri_path,
95
            self.CONTENT_DOWNLOAD_LOCATION, ca_certs_path, what_if_fail)
96
        return fetching_thread_name
97
98
    def _fetch_files(self, scheme, path, destdir, ca_certs_path, what_if_fail):
99
        with self.activity_lock:
100
            if self.now_fetching_or_processing:
101
                msg = "Strange, it seems that we are already fetching something."
102
                log.warn(msg)
103
                return
104
            self.now_fetching_or_processing = True
105
106
        fetching_thread_name = None
107
        try:
108
            fetching_thread_name = self._start_actual_fetch(scheme, path, destdir, ca_certs_path)
109
        except Exception as exc:
110
            with self.activity_lock:
111
                self.now_fetching_or_processing = False
112
            what_if_fail(exc)
113
114
        # We are not finished yet with the fetch
115
        return fetching_thread_name
116
117
    def _start_actual_fetch(self, scheme, path, destdir, ca_certs_path):
118
        fetching_thread_name = None
119
        url = scheme + "://" + path
120
121
        if "/" not in path:
122
            msg = f"Missing the path component of the '{url}' URL"
123
            raise KickstartValueError(msg)
124
        basename = path.rsplit("/", 1)[1]
125
        if not basename:
126
            msg = f"Unable to deduce basename from the '{url}' URL"
127
            raise KickstartValueError(msg)
128
129
        dest = destdir / basename
130
131
        if is_network(scheme):
132
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
133
                url,
134
                dest,
135
                ca_certs_path
136
            )
137
        else:  # invalid schemes are handled down the road
138
            fetching_thread_name = data_fetch.fetch_local_data(
139
                url,
140
                dest,
141
            )
142
        return fetching_thread_name
143
144
    def finish_content_fetch(self, fetching_thread_name, fingerprint, report_callback, dest_filename,
145
                             what_if_fail):
146
        """
147
        Finish any ongoing fetch and analyze what has been fetched.
148
149
        After the fetch is completed, it analyzes verifies fetched content if applicable,
150
        analyzes it and compiles into an instance of ObtainedContent.
151
152
        Args:
153
            fetching_thread_name: Name of the fetching thread
154
                or None if we are only after the analysis
155
            fingerprint: A checksum for downloaded file verification
156
            report_callback: Means for the method to send user-relevant messages outside
157
            dest_filename: The target of the fetch operation. Can be falsy -
158
                in this case there is no content filename defined
159
            what_if_fail: Callback accepting exception as an argument
160
                that should handle them in the calling layer.
161
162
        Returns:
163
            Instance of ObtainedContent if everything went well, or None.
164
        """
165
        try:
166
            content = self._finish_actual_fetch(fetching_thread_name, fingerprint, report_callback, dest_filename)
167
        except Exception as exc:
168
            what_if_fail(exc)
169
            content = None
170
        finally:
171
            with self.activity_lock:
172
                self.now_fetching_or_processing = False
173
174
        return content
175
176
    def _verify_fingerprint(self, dest_filename, fingerprint=""):
177
        if not fingerprint:
178
            log.info("No fingerprint provided, skipping integrity check")
179
            return
180
181
        hash_obj = utils.get_hashing_algorithm(fingerprint)
182
        digest = utils.get_file_fingerprint(dest_filename,
183
                                            hash_obj)
184
        if digest != fingerprint:
185
            log.error(
186
                f"File {dest_filename} failed integrity check - assumed a "
187
                f"{hash_obj.name} hash and '{fingerprint}', got '{digest}'"
188
            )
189
            msg = _(f"Integrity check of the content failed - {hash_obj.name} hash didn't match")
190
            raise content_handling.ContentCheckError(msg)
191
        log.info(f"Integrity check passed using {hash_obj.name} hash")
192
193
    def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
194
        threadMgr.wait(wait_for)
195
        actually_fetched_content = wait_for is not None
196
197
        if fingerprint and dest_filename:
198
            self._verify_fingerprint(dest_filename, fingerprint)
199
200
        fpaths = self._gather_available_files(actually_fetched_content, dest_filename)
201
202
        structured_content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION)
203
        content_type = self.get_content_type(str(dest_filename))
204
        if content_type in ("archive", "rpm"):
205
            structured_content.add_content_archive(dest_filename)
206
207
        labelled_files = content_handling.identify_files(fpaths)
208
        for fname, label in labelled_files.items():
209
            structured_content.add_file(fname, label)
210
211
        if fingerprint and dest_filename:
212
            structured_content.record_verification(dest_filename)
213
214
        return structured_content
215
216
    def _gather_available_files(self, actually_fetched_content, dest_filename):
217
        fpaths = []
218
        if not actually_fetched_content:
219
            if not dest_filename:  # using scap-security-guide
220
                fpaths = [self.DEFAULT_SSG_DATA_STREAM_PATH]
221
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
222
                fpaths = glob(str(self.CONTENT_DOWNLOAD_LOCATION / "*.xml"))
223
        else:
224
            dest_filename = pathlib.Path(dest_filename)
225
            # RPM is an archive at this phase
226
            content_type = self.get_content_type(str(dest_filename))
227
            if content_type in ("archive", "rpm"):
228
                try:
229
                    fpaths = common.extract_data(
230
                        str(dest_filename),
231
                        str(dest_filename.parent)
232
                    )
233
                except common.ExtractionError as err:
234
                    msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}"
235
                    log.error(msg)
236
                    raise err
237
238
            elif content_type == "file":
239
                fpaths = [str(dest_filename)]
240
            else:
241
                raise common.OSCAPaddonError("Unsupported content type")
242
        return fpaths
243
244
    def use_downloaded_content(self, content):
245
        preferred_content = self.get_preferred_content(content)
246
247
        # We know that we have ended up with a datastream-like content,
248
        # but if we can't convert an archive to a datastream.
249
        # self._addon_data.content_type = "datastream"
250
        self._addon_data.content_path = str(preferred_content.relative_to(content.root))
251
252
        preferred_tailoring = self.get_preferred_tailoring(content)
253
        if content.tailoring:
254
            self._addon_data.tailoring_path = str(preferred_tailoring.relative_to(content.root))
255
256
    def use_system_content(self, content=None):
257
        clear_all(self._addon_data)
258
        self._addon_data.content_type = "scap-security-guide"
259
        self._addon_data.content_path = common.get_ssg_path()
260
261
    def get_preferred_content(self, content):
262
        if self._addon_data.content_path:
263
            preferred_content = content.find_expected_usable_content(self._addon_data.content_path)
264
        else:
265
            preferred_content = content.select_main_usable_content()
266
        return preferred_content
267
268
    def get_preferred_tailoring(self, content):
269
        tailoring_path = self._addon_data.tailoring_path
270
        if tailoring_path:
271
            if tailoring_path != str(content.tailoring.relative_to(content.root)):
272
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
273
                raise content_handling.ContentHandlingError(msg)
274
        return content.tailoring
275
276
277
class ObtainedContent:
278
    """
279
    This class aims to assist the gathered files discovery -
280
    the addon can downloaded files directly, or they can be extracted for an archive.
281
    The class enables user to quickly understand what is available,
282
    and whether the current set of contents is usable for further processing.
283
    """
284
    def __init__(self, root):
285
        self.labelled_files = dict()
286
        self.datastream = None  # type: Pathlib.Path
287
        self.xccdf = None  # type: Pathlib.Path
288
        self.ovals = []  # type: List[Pathlib.Path]
289
        self.tailoring = None  # type: Pathlib.Path
290
        self.archive = None  # type: Pathlib.Path
291
        self.verified = None  # type: Pathlib.Path
292
        self.root = pathlib.Path(root)
293
294
    def record_verification(self, path):
295
        """
296
        Declare a file as verified (typically by means of a checksum)
297
        """
298
        path = pathlib.Path(path)
299
        assert path in self.labelled_files
300
        self.verified = path
301
302
    def add_content_archive(self, fname):
303
        """
304
        If files come from an archive, record this information using this function.
305
        """
306
        path = pathlib.Path(fname)
307
        self.labelled_files[path] = None
308
        self.archive = path
309
310
    def _assign_content_type(self, attribute_name, new_value):
311
        old_value = getattr(self, attribute_name)
312
        if old_value and old_value != new_value:
313
            msg = (
314
                f"When dealing with {attribute_name}, "
315
                f"there was already the {old_value.name} when setting the new {new_value.name}")
316
            raise content_handling.ContentHandlingError(msg)
317
        setattr(self, attribute_name, new_value)
318
319
    def add_file(self, fname, label=None):
320
        if not label:
321
            label = content_handling.identify_files([fname])[fname]
322
        path = pathlib.Path(fname)
323
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
324
            self._assign_content_type("tailoring", path)
325
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
326
            self._assign_content_type("datastream", path)
327
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
328
            self.ovals.append(path)
329
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
330
            self._assign_content_type("xccdf", path)
331
        self.labelled_files[path] = label
332
333
    def _datastream_content(self):
334
        if not self.datastream:
335
            return None
336
        if not self.datastream.exists():
337
            return None
338
        return self.datastream
339
340
    def _xccdf_content(self):
341
        if not self.xccdf or not self.ovals:
342
            return None
343
        some_ovals_exist = any([path.exists() for path in self.ovals])
344
        if not (self.xccdf.exists() and some_ovals_exist):
345
            return None
346
        return self.xccdf
347
348
    def find_expected_usable_content(self, relative_expected_content_path):
349
        content_path = self.root / relative_expected_content_path
350
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
351
352
        if content_path in eligible_main_content:
353
            return content_path
354
        else:
355
            if not content_path.exists():
356
                msg = f"Couldn't find '{content_path}' among the available content"
357
            else:
358
                msg = (
359
                    f"File '{content_path}' is not a valid datastream "
360
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
361
            raise content_handling.ContentHandlingError(msg)
362
363
    def select_main_usable_content(self):
364
        if self._datastream_content():
365
            return self._datastream_content()
366
        elif self._xccdf_content():
367
            return self._xccdf_content()
368
        else:
369
            msg = (
370
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
371
                "among the available content")
372
            raise content_handling.ContentHandlingError(msg)
373