Passed
Pull Request — rhel9-branch (#158)
by Matěj
01:15
created

org_fedora_oscap.content_discovery   F

Complexity

Total Complexity 69

Size/Duplication

Total Lines 367
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 247
dl 0
loc 367
rs 2.88
c 0
b 0
f 0
wmc 69

23 Methods

Rating   Name   Duplication   Size   Complexity  
A ContentBringer._fetch_files() 0 18 5
A ContentBringer.content_uri() 0 3 1
A ObtainedContent.find_expected_usable_content() 0 14 3
A ContentBringer.use_downloaded_content() 0 11 2
A ObtainedContent._datastream_content() 0 6 3
B ContentBringer._gather_available_files() 0 27 6
A ObtainedContent.record_verification() 0 7 1
A ContentBringer.get_preferred_tailoring() 0 6 3
A ObtainedContent.add_content_archive() 0 7 1
A ObtainedContent._assign_content_type() 0 8 2
A ContentBringer._verify_fingerprint() 0 14 3
A ObtainedContent._xccdf_content() 0 7 5
A ObtainedContent.add_file() 0 11 5
A ObtainedContent.select_main_usable_content() 0 10 3
A ContentBringer.get_preferred_content() 0 6 2
A ContentBringer._start_actual_fetch() 0 26 4
A ContentBringer.use_system_content() 0 4 1
A ContentBringer.fetch_content() 0 16 1
A ContentBringer.__init__() 0 11 1
A ContentBringer.finish_content_fetch() 0 31 3
B ContentBringer._finish_actual_fetch() 0 22 7
A ContentBringer.get_content_type() 0 7 3
A ObtainedContent.__init__() 0 9 1

2 Functions

Rating   Name   Duplication   Size   Complexity  
A clear_all() 0 17 1
A is_network() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.content_discovery often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import threading
2
import logging
3
import pathlib
4
import shutil
5
from glob import glob
6
7
from pyanaconda.core import constants
8
from pyanaconda.threading import threadMgr
9
from pykickstart.errors import KickstartValueError
10
11
from org_fedora_oscap import data_fetch, utils
12
from org_fedora_oscap import common
13
from org_fedora_oscap import content_handling
14
from org_fedora_oscap import rule_handling
15
16
from org_fedora_oscap.common import _
17
18
log = logging.getLogger("anaconda")
19
20
21
def is_network(scheme):
22
    return any(
23
        scheme.startswith(net_prefix)
24
        for net_prefix in data_fetch.NET_URL_PREFIXES)
25
26
27
def clear_all(data):
28
    data.content_type = ""
29
    data.content_url = ""
30
    data.datastream_id = ""
31
    data.xccdf_id = ""
32
    data.profile_id = ""
33
    data.content_path = ""
34
    data.cpe_path = ""
35
    data.tailoring_path = ""
36
37
    data.fingerprint = ""
38
39
    data.certificates = ""
40
41
    # internal values
42
    data.rule_data = rule_handling.RuleData()
43
    data.dry_run = False
44
45
46
class ContentBringer:
47
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
48
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
49
50
    def __init__(self, addon_data):
51
        self.content_uri_scheme = ""
52
        self.content_uri_path = ""
53
        self.fetched_content = ""
54
55
        self.activity_lock = threading.Lock()
56
        self.now_fetching_or_processing = False
57
58
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
59
60
        self._addon_data = addon_data
61
62
    def get_content_type(self, url):
63
        if url.endswith(".rpm"):
64
            return "rpm"
65
        elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES):
66
            return "archive"
67
        else:
68
            return "file"
69
70
    @property
71
    def content_uri(self):
72
        return self.content_uri_scheme + "://" + self.content_uri_path
73
74
    @content_uri.setter
75
    def content_uri(self, uri):
76
        scheme, path = uri.split("://", 1)
77
        self.content_uri_path = path
78
        self.content_uri_scheme = scheme
79
80
    def fetch_content(self, what_if_fail, ca_certs_path=""):
81
        """
82
        Initiate fetch of the content into an appropriate directory
83
84
        Args:
85
            what_if_fail: Callback accepting exception as an argument that
86
                should handle them in the calling layer.
87
            ca_certs_path: Path to the HTTPS certificate file
88
        """
89
        self.content_uri = self._addon_data.content_url
90
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
91
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
92
        fetching_thread_name = self._fetch_files(
93
            self.content_uri_scheme, self.content_uri_path,
94
            self.CONTENT_DOWNLOAD_LOCATION, ca_certs_path, what_if_fail)
95
        return fetching_thread_name
96
97
    def _fetch_files(self, scheme, path, destdir, ca_certs_path, what_if_fail):
98
        with self.activity_lock:
99
            if self.now_fetching_or_processing:
100
                msg = "Strange, it seems that we are already fetching something."
101
                log.warn(msg)
102
                return
103
            self.now_fetching_or_processing = True
104
105
        fetching_thread_name = None
106
        try:
107
            fetching_thread_name = self._start_actual_fetch(scheme, path, destdir, ca_certs_path)
108
        except Exception as exc:
109
            with self.activity_lock:
110
                self.now_fetching_or_processing = False
111
            what_if_fail(exc)
112
113
        # We are not finished yet with the fetch
114
        return fetching_thread_name
115
116
    def _start_actual_fetch(self, scheme, path, destdir, ca_certs_path):
117
        fetching_thread_name = None
118
        url = scheme + "://" + path
119
120
        if "/" not in path:
121
            msg = f"Missing the path component of the '{url}' URL"
122
            raise KickstartValueError(msg)
123
        basename = path.rsplit("/", 1)[1]
124
        if not basename:
125
            msg = f"Unable to deduce basename from the '{url}' URL"
126
            raise KickstartValueError(msg)
127
128
        dest = destdir / basename
129
130
        if is_network(scheme):
131
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
132
                url,
133
                dest,
134
                ca_certs_path
135
            )
136
        else:  # invalid schemes are handled down the road
137
            fetching_thread_name = data_fetch.fetch_local_data(
138
                url,
139
                dest,
140
            )
141
        return fetching_thread_name
142
143
    def finish_content_fetch(self, fetching_thread_name, fingerprint, report_callback, dest_filename,
144
                             what_if_fail):
145
        """
146
        Finish any ongoing fetch and analyze what has been fetched.
147
148
        After the fetch is completed, it analyzes verifies fetched content if applicable,
149
        analyzes it and compiles into an instance of ObtainedContent.
150
151
        Args:
152
            fetching_thread_name: Name of the fetching thread
153
                or None if we are only after the analysis
154
            fingerprint: A checksum for downloaded file verification
155
            report_callback: Means for the method to send user-relevant messages outside
156
            dest_filename: The target of the fetch operation. Can be falsy -
157
                in this case there is no content filename defined
158
            what_if_fail: Callback accepting exception as an argument
159
                that should handle them in the calling layer.
160
161
        Returns:
162
            Instance of ObtainedContent if everything went well, or None.
163
        """
164
        try:
165
            content = self._finish_actual_fetch(fetching_thread_name, fingerprint, report_callback, dest_filename)
166
        except Exception as exc:
167
            what_if_fail(exc)
168
            content = None
169
        finally:
170
            with self.activity_lock:
171
                self.now_fetching_or_processing = False
172
173
        return content
174
175
    def _verify_fingerprint(self, dest_filename, fingerprint=""):
176
        if not fingerprint:
177
            return
178
179
        hash_obj = utils.get_hashing_algorithm(fingerprint)
180
        digest = utils.get_file_fingerprint(dest_filename,
181
                                            hash_obj)
182
        if digest != fingerprint:
183
            log.error(
184
                f"File {dest_filename} failed integrity check - assumed a "
185
                f"{hash_obj.name} hash and '{fingerprint}', got '{digest}'"
186
            )
187
            msg = _(f"Integrity check of the content failed - {hash_obj.name} hash didn't match")
188
            raise content_handling.ContentCheckError(msg)
189
190
    def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
191
        threadMgr.wait(wait_for)
192
        actually_fetched_content = wait_for is not None
193
194
        if fingerprint and dest_filename:
195
            self._verify_fingerprint(dest_filename, fingerprint)
196
197
        fpaths = self._gather_available_files(actually_fetched_content, dest_filename)
198
199
        structured_content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION)
200
        content_type = self.get_content_type(str(dest_filename))
201
        if content_type in ("archive", "rpm"):
202
            structured_content.add_content_archive(dest_filename)
203
204
        labelled_files = content_handling.identify_files(fpaths)
205
        for fname, label in labelled_files.items():
206
            structured_content.add_file(fname, label)
207
208
        if fingerprint and dest_filename:
209
            structured_content.record_verification(dest_filename)
210
211
        return structured_content
212
213
    def _gather_available_files(self, actually_fetched_content, dest_filename):
214
        fpaths = []
215
        if not actually_fetched_content:
216
            if not dest_filename:  # using scap-security-guide
217
                fpaths = [self.DEFAULT_SSG_DATA_STREAM_PATH]
218
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
219
                fpaths = glob(str(self.CONTENT_DOWNLOAD_LOCATION / "*.xml"))
220
        else:
221
            dest_filename = pathlib.Path(dest_filename)
222
            # RPM is an archive at this phase
223
            content_type = self.get_content_type(str(dest_filename))
224
            if content_type in ("archive", "rpm"):
225
                try:
226
                    fpaths = common.extract_data(
227
                        str(dest_filename),
228
                        str(dest_filename.parent)
229
                    )
230
                except common.ExtractionError as err:
231
                    msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}"
232
                    log.error(msg)
233
                    raise err
234
235
            elif content_type == "file":
236
                fpaths = [str(dest_filename)]
237
            else:
238
                raise common.OSCAPaddonError("Unsupported content type")
239
        return fpaths
240
241
    def use_downloaded_content(self, content):
242
        preferred_content = self.get_preferred_content(content)
243
244
        # We know that we have ended up with a datastream-like content,
245
        # but if we can't convert an archive to a datastream.
246
        # self._addon_data.content_type = "datastream"
247
        self._addon_data.content_path = str(preferred_content.relative_to(content.root))
248
249
        preferred_tailoring = self.get_preferred_tailoring(content)
250
        if content.tailoring:
251
            self._addon_data.tailoring_path = str(preferred_tailoring.relative_to(content.root))
252
253
    def use_system_content(self, content=None):
254
        clear_all(self._addon_data)
255
        self._addon_data.content_type = "scap-security-guide"
256
        self._addon_data.content_path = common.get_ssg_path()
257
258
    def get_preferred_content(self, content):
259
        if self._addon_data.content_path:
260
            preferred_content = content.find_expected_usable_content(self._addon_data.content_path)
261
        else:
262
            preferred_content = content.select_main_usable_content()
263
        return preferred_content
264
265
    def get_preferred_tailoring(self, content):
266
        if self._addon_data.tailoring_path:
267
            if self._addon_data.tailoring_path != str(content.tailoring.relative_to(content.root)):
268
                msg = f"Expected a tailoring {self.tailoring_path}, but it couldn't be found"
269
                raise content_handling.ContentHandlingError(msg)
270
        return content.tailoring
271
272
273
class ObtainedContent:
274
    """
275
    This class aims to assist the gathered files discovery -
276
    the addon can downloaded files directly, or they can be extracted for an archive.
277
    The class enables user to quickly understand what is available,
278
    and whether the current set of contents is usable for further processing.
279
    """
280
    def __init__(self, root):
281
        self.labelled_files = dict()
282
        self.datastream = ""
283
        self.xccdf = ""
284
        self.ovals = []
285
        self.tailoring = ""
286
        self.archive = ""
287
        self.verified = ""
288
        self.root = pathlib.Path(root)
289
290
    def record_verification(self, path):
291
        """
292
        Declare a file as verified (typically by means of a checksum)
293
        """
294
        path = pathlib.Path(path)
295
        assert path in self.labelled_files
296
        self.verified = path
297
298
    def add_content_archive(self, fname):
299
        """
300
        If files come from an archive, record this information using this function.
301
        """
302
        path = pathlib.Path(fname)
303
        self.labelled_files[path] = None
304
        self.archive = path
305
306
    def _assign_content_type(self, attribute_name, new_value):
307
        old_value = getattr(self, attribute_name)
308
        if old_value:
309
            msg = (
310
                f"When dealing with {attribute_name}, "
311
                f"there was already the {old_value.name} when setting the new {new_value.name}")
312
            raise content_handling.ContentHandlingError(msg)
313
        setattr(self, attribute_name, new_value)
314
315
    def add_file(self, fname, label):
316
        path = pathlib.Path(fname)
317
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
318
            self._assign_content_type("tailoring", path)
319
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
320
            self._assign_content_type("datastream", path)
321
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
322
            self.ovals.append(path)
323
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
324
            self._assign_content_type("xccdf", path)
325
        self.labelled_files[path] = label
326
327
    def _datastream_content(self):
328
        if not self.datastream:
329
            return None
330
        if not self.datastream.exists():
331
            return None
332
        return self.datastream
333
334
    def _xccdf_content(self):
335
        if not self.xccdf or not self.ovals:
336
            return None
337
        some_ovals_exist = any([path.exists() for path in self.ovals])
338
        if not (self.xccdf.exists() and some_ovals_exist):
339
            return None
340
        return self.xccdf
341
342
    def find_expected_usable_content(self, relative_expected_content_path):
343
        content_path = self.root / relative_expected_content_path
344
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
345
346
        if content_path in eligible_main_content:
347
            return content_path
348
        else:
349
            if not content_path.exists():
350
                msg = f"Couldn't find '{content_path}' among the available content"
351
            else:
352
                msg = (
353
                    f"File '{content_path}' is not a valid datastream "
354
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
355
            raise content_handling.ContentHandlingError(msg)
356
357
    def select_main_usable_content(self):
358
        if self._datastream_content():
359
            return self._datastream_content()
360
        elif self._xccdf_content():
361
            return self._xccdf_content()
362
        else:
363
            msg = (
364
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
365
                "among the available content")
366
            raise content_handling.ContentHandlingError(msg)
367