Passed
Pull Request — rhel9-branch (#240)
by Matěj
01:52
created

ContentBringer.content_uri()   A

Complexity

Conditions 4

Size

Total Lines 3
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 17
nop 2
dl 0
loc 3
rs 9.55
c 0
b 0
f 0
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
import os
6
from glob import glob
7
from typing import List
8
9
from pyanaconda.core import constants
10
from pyanaconda.threading import threadMgr, AnacondaThread
11
from pykickstart.errors import KickstartValueError
12
13
from org_fedora_oscap import data_fetch, utils
14
from org_fedora_oscap import common
15
from org_fedora_oscap import content_handling
16
from org_fedora_oscap.content_handling import CONTENT_TYPES
17
18
from org_fedora_oscap.common import _
19
20
log = logging.getLogger("anaconda")
21
22
23
def is_network(scheme):
24
    return any(
25
        scheme.startswith(net_prefix)
26
        for net_prefix in data_fetch.NET_URL_PREFIXES)
27
28
29
def paths_are_equivalent(p1, p2):
30
    return os.path.abspath(p1) == os.path.abspath(p2)
31
32
33
def path_is_present_among_paths(path, paths):
34
    absolute_path = os.path.abspath(path)
35
    for second_path in paths:
36
        if paths_are_equivalent(path, second_path):
37
            return True
38
    return False
39
40
41
class ContentBringer:
42
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
43
44
    def __init__(self, what_if_fail):
45
        self._valid_content_uri = ""
46
        self.dest_file_name = ""
47
48
        self.activity_lock = threading.Lock()
49
        self.now_fetching_or_processing = False
50
        self.what_if_fail = what_if_fail
51
52
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
53
54
    @property
55
    def content_uri(self):
56
        return self._valid_content_uri
57
58
    @content_uri.setter
59
    def content_uri(self, uri):
60
        scheme_and_maybe_path = uri.split("://")
61
        if len(scheme_and_maybe_path) == 1:
62
            msg = (
63
                f"Invalid supplied content URL '{uri}', "
64
                "use the 'scheme://path' form.")
65
            raise KickstartValueError(msg)
66
        path = scheme_and_maybe_path[1]
67
        if "/" not in path:
68
            msg = f"Missing the path component of the '{uri}' URL"
69
            raise KickstartValueError(msg)
70
        basename = path.rsplit("/", 1)[1]
71
        if not basename:
72
            msg = f"Unable to deduce basename from the '{uri}' URL"
73
            raise KickstartValueError(msg)
74
        self._valid_content_uri = uri
75
        self.dest_file_name = self.CONTENT_DOWNLOAD_LOCATION / basename
76
77
    def fetch_content(self, content_uri, ca_certs_path=""):
78
        """
79
        Initiate fetch of the content into an appropriate directory
80
81
        Args:
82
            content_uri: URI location of the content to be fetched
83
            ca_certs_path: Path to the HTTPS certificate file
84
        """
85
        try:
86
            self.content_uri = content_uri
87
        except Exception as exc:
88
            self.what_if_fail(exc)
89
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
90
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
91
        fetching_thread_name = self._fetch_files(ca_certs_path)
92
        return fetching_thread_name
93
94
    def _fetch_files(self, ca_certs_path):
95
        with self.activity_lock:
96
            if self.now_fetching_or_processing:
97
                msg = _(
98
                    f"Attempting to fetch '{self.content_uri}, "
99
                    "but the previous fetch is still in progress")
100
                log.warn(f"OSCAP Addon: {msg}")
101
                return
102
            self.now_fetching_or_processing = True
103
104
        fetching_thread_name = None
105
        try:
106
            fetching_thread_name = self._start_actual_fetch(ca_certs_path)
107
        except Exception as exc:
108
            with self.activity_lock:
109
                self.now_fetching_or_processing = False
110
            self.what_if_fail(exc)
111
112
        # We are not finished yet with the fetch
113
        return fetching_thread_name
114
115
    def _start_actual_fetch(self, ca_certs_path):
116
        fetching_thread_name = common.THREAD_FETCH_DATA
117
118
        scheme = self.content_uri.split("://")[0]
119
        if is_network(scheme):
120
            try:
121
                data_fetch.wait_for_network()
122
            except common.OSCAPaddonNetworkError as exc:
123
                msg = _(f"Network connection needed to fetch data. {exc}")
124
                raise common.OSCAPaddonNetworkError(msg)
125
126
        fetch_data_thread = AnacondaThread(
127
            name=fetching_thread_name,
128
            target=self.fetch_operation,
129
            args=(self.content_uri, self.dest_file_name, ca_certs_path),
130
            fatal=False)
131
132
        threadMgr.add(fetch_data_thread)
133
134
        return fetching_thread_name
135
136
    def fetch_operation(self, uri, out_file, ca_certs_path=None):
137
        return data_fetch.fetch_data(uri, out_file, ca_certs_path)
138
139
    def finish_content_fetch(self, fetching_thread_name, fingerprint=""):
140
        try:
141
            self._finish_actual_fetch(fetching_thread_name)
142
            if fingerprint:
143
                self._verify_fingerprint(fingerprint)
144
        except Exception as exc:
145
            self.what_if_fail(exc)
146
        finally:
147
            with self.activity_lock:
148
                self.now_fetching_or_processing = False
149
150
    def _finish_actual_fetch(self, wait_for):
151
        if wait_for:
152
            log.info(f"OSCAP Addon: Waiting for thread {wait_for}")
153
            threadMgr.wait(wait_for)
154
            log.info(f"OSCAP Addon: Finished waiting for thread {wait_for}")
155
156
    def _verify_fingerprint(self, fingerprint=""):
157
        if not fingerprint:
158
            log.info(
159
                "OSCAP Addon: No fingerprint provided, skipping integrity "
160
                "check")
161
            return
162
163
        hash_obj = utils.get_hashing_algorithm(fingerprint)
164
        digest = utils.get_file_fingerprint(self.dest_file_name,
165
                                            hash_obj)
166
        if digest != fingerprint:
167
            log.error(
168
                "OSCAP Addon: "
169
                f"File {self.dest_file_name} failed integrity check - assumed "
170
                f"a {hash_obj.name} hash and '{fingerprint}', got '{digest}'"
171
            )
172
            msg = _(
173
                f"OSCAP Addon: Integrity check of the content failed - "
174
                f"{hash_obj.name} hash didn't match")
175
            raise content_handling.ContentCheckError(msg)
176
        log.info(f"Integrity check passed using {hash_obj.name} hash")
177
178
179
class ContentAnalyzer:
180
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
181
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
182
183
    @staticmethod
184
    def __get_content_type(url):
185
        if url.endswith(".rpm"):
186
            return "rpm"
187
        elif any(
188
                url.endswith(arch_type)
189
                for arch_type in common.SUPPORTED_ARCHIVES):
190
            return "archive"
191
        else:
192
            return "file"
193
194
    @staticmethod
195
    def __allow_one_expected_tailoring_or_no_tailoring(
196
            labelled_files, expected_tailoring):
197
        tailoring_label = CONTENT_TYPES["TAILORING"]
198
        if expected_tailoring:
199
            labelled_files = ContentAnalyzer.reduce_files(
200
                labelled_files, expected_tailoring, [tailoring_label])
201
        else:
202
            labelled_files = {
203
                path: label for path, label in labelled_files.items()
204
                if label != tailoring_label
205
            }
206
        return labelled_files
207
208
    @staticmethod
209
    def __filter_discovered_content(
210
            labelled_files, expected_path, expected_tailoring,
211
            expected_cpe_path):
212
        categories = (
213
            CONTENT_TYPES["DATASTREAM"],
214
            CONTENT_TYPES["XCCDF_CHECKLIST"])
215
        if expected_path:
216
            labelled_files = ContentAnalyzer.reduce_files(
217
                labelled_files, expected_path, categories)
218
219
        labelled_files = \
220
            ContentAnalyzer.__allow_one_expected_tailoring_or_no_tailoring(
221
                labelled_files, expected_tailoring)
222
223
        categories = (CONTENT_TYPES["CPE_DICT"], )
224
        if expected_cpe_path:
225
            labelled_files = ContentAnalyzer.reduce_files(
226
                labelled_files, expected_cpe_path, categories)
227
228
        return labelled_files
229
230
    @staticmethod
231
    def reduce_files(labelled_files, expected_path, categories):
232
        reduced_files = dict()
233
        if not path_is_present_among_paths(
234
                expected_path, labelled_files.keys()):
235
            msg = (
236
                f"Expected a file {expected_path} to be part of the supplied "
237
                f"content, but it was not the case, got only "
238
                f"{list(labelled_files.keys())}"
239
            )
240
            raise content_handling.ContentHandlingError(msg)
241
        for path, label in labelled_files.items():
242
            if label in categories and not paths_are_equivalent(
243
                    path, expected_path):
244
                continue
245
            reduced_files[path] = label
246
        return reduced_files
247
248
    @staticmethod
249
    def analyze(
250
            fetching_thread_name, fingerprint, dest_filename, what_if_fail,
251
            expected_path, expected_tailoring, expected_cpe_path):
252
        try:
253
            content = ContentAnalyzer.__analyze_fetched_content(
254
                fetching_thread_name, fingerprint, dest_filename,
255
                expected_path, expected_tailoring, expected_cpe_path)
256
        except Exception as exc:
257
            what_if_fail(exc)
258
            content = None
259
        return content
260
261
    @staticmethod
262
    def __analyze_fetched_content(
263
                wait_for, fingerprint, dest_filename, expected_path,
264
                expected_tailoring, expected_cpe_path):
265
        actually_fetched_content = wait_for is not None
266
        fpaths = ContentAnalyzer.__gather_available_files(
267
            actually_fetched_content, dest_filename)
268
269
        structured_content = ObtainedContent(
270
            ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION)
271
        content_type = ContentAnalyzer.__get_content_type(str(dest_filename))
272
        log.info(f"OSCAP Addon: started to look at the content")
273
        if content_type in ("archive", "rpm"):
274
            structured_content.add_content_archive(dest_filename)
275
276
        labelled_filenames = content_handling.identify_files(fpaths)
277
        labelled_filenames = ContentAnalyzer.__filter_discovered_content(
278
            labelled_filenames, expected_path, expected_tailoring,
279
            expected_cpe_path)
280
281
        for fname, label in labelled_filenames.items():
282
            structured_content.add_file(str(fname), label)
283
284
        if fingerprint and dest_filename:
285
            structured_content.record_verification(dest_filename)
286
287
        log.info(f"OSCAP Addon: finished looking at the content")
288
        return structured_content
289
290
    @staticmethod
291
    def __gather_available_files(actually_fetched_content, dest_filename):
292
        fpaths = []
293
        if not actually_fetched_content:
294
            if not dest_filename:  # using scap-security-guide
295
                fpaths = [ContentAnalyzer.DEFAULT_SSG_DATA_STREAM_PATH]
296
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
297
                fpaths = pathlib.Path(
298
                    ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION).rglob("*")
299
                fpaths = [str(p) for p in fpaths if p.is_file()]
300
        else:
301
            dest_filename = pathlib.Path(dest_filename)
302
            # RPM is an archive at this phase
303
            content_type = ContentAnalyzer.__get_content_type(
304
                str(dest_filename))
305
            if content_type in ("archive", "rpm"):
306
                try:
307
                    fpaths = common.extract_data(
308
                        str(dest_filename),
309
                        str(dest_filename.parent)
310
                    )
311
                except common.ExtractionError as err:
312
                    msg = (
313
                        f"Failed to extract the '{dest_filename}' "
314
                        f"archive: {str(err)}")
315
                    log.error("OSCAP Addon: " + msg)
316
                    raise err
317
318
            elif content_type == "file":
319
                fpaths = [str(dest_filename)]
320
            else:
321
                raise common.OSCAPaddonError("Unsupported content type")
322
        return fpaths
323
324
325
class ObtainedContent:
326
    """
327
    This class aims to assist the gathered files discovery -
328
    the addon can downloaded files directly, or they can be extracted for an archive.
329
    The class enables user to quickly understand what is available,
330
    and whether the current set of contents is usable for further processing.
331
    """
332
    def __init__(self, root):
333
        self.labelled_files = dict()
334
        self.datastream = None  # type: Pathlib.Path
335
        self.xccdf = None  # type: Pathlib.Path
336
        self.ovals = []  # type: List[Pathlib.Path]
337
        self.tailoring = None  # type: Pathlib.Path
338
        self.archive = None  # type: Pathlib.Path
339
        self.verified = None  # type: Pathlib.Path
340
        self.root = pathlib.Path(root)
341
342
    def record_verification(self, path):
343
        """
344
        Declare a file as verified (typically by means of a checksum)
345
        """
346
        path = pathlib.Path(path)
347
        assert path in self.labelled_files
348
        self.verified = path
349
350
    def add_content_archive(self, fname):
351
        """
352
        If files come from an archive, record this information using this function.
353
        """
354
        path = pathlib.Path(fname)
355
        self.labelled_files[path] = None
356
        self.archive = path
357
358
    def _assign_content_type(self, attribute_name, new_value):
359
        old_value = getattr(self, attribute_name)
360
        if old_value and old_value != new_value:
361
            msg = (
362
                f"When dealing with {attribute_name}, "
363
                f"there was already the {old_value.name} when setting the new {new_value.name}")
364
            raise content_handling.ContentHandlingError(msg)
365
        setattr(self, attribute_name, new_value)
366
367
    def add_file(self, fname, label=None):
368
        if not label:
369
            label = content_handling.identify_files([fname])[fname]
370
        path = pathlib.Path(fname)
371
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
372
            self._assign_content_type("tailoring", path)
373
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
374
            self._assign_content_type("datastream", path)
375
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
376
            self.ovals.append(path)
377
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
378
            self._assign_content_type("xccdf", path)
379
        self.labelled_files[path] = label
380
381
    def _datastream_content(self):
382
        if not self.datastream:
383
            return None
384
        if not self.datastream.exists():
385
            return None
386
        return self.datastream
387
388
    def _xccdf_content(self):
389
        if not self.xccdf or not self.ovals:
390
            return None
391
        some_ovals_exist = any([path.exists() for path in self.ovals])
392
        if not (self.xccdf.exists() and some_ovals_exist):
393
            return None
394
        return self.xccdf
395
396
    def find_expected_usable_content(self, relative_expected_content_path):
397
        content_path = self.root / relative_expected_content_path
398
        content_path = content_path.resolve()
399
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
400
401
        if content_path in eligible_main_content:
402
            return content_path
403
        else:
404
            if not content_path.exists():
405
                msg = f"Couldn't find '{content_path}' among the available content"
406
            else:
407
                msg = (
408
                    f"File '{content_path}' is not a valid datastream "
409
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
410
            raise content_handling.ContentHandlingError(msg)
411
412
    def select_main_usable_content(self):
413
        if self._datastream_content():
414
            return self._datastream_content()
415
        elif self._xccdf_content():
416
            return self._xccdf_content()
417
        else:
418
            msg = (
419
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
420
                "among the available content")
421
            raise content_handling.ContentHandlingError(msg)
422
423
    def get_preferred_tailoring(self, tailoring_path):
424
        if tailoring_path:
425
            if tailoring_path != str(self.tailoring.relative_to(self.root)):
426
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
427
                raise content_handling.ContentHandlingError(msg)
428
        return self.tailoring
429
430
    def get_preferred_content(self, content_path):
431
        if content_path:
432
            preferred_content = self.find_expected_usable_content(content_path)
433
        else:
434
            preferred_content = self.select_main_usable_content()
435
        return preferred_content
436