Passed
Pull Request — rhel9-branch (#240)
by Matěj
02:36
created

ContentBringer.content_uri()   A

Complexity

Conditions 4

Size

Total Lines 3
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 17
nop 2
dl 0
loc 3
rs 9.55
c 0
b 0
f 0
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
import os
6
from glob import glob
7
from typing import List
8
9
from pyanaconda.core import constants
10
from pyanaconda.threading import threadMgr, AnacondaThread
11
from pykickstart.errors import KickstartValueError
12
13
from org_fedora_oscap import data_fetch, utils
14
from org_fedora_oscap import common
15
from org_fedora_oscap import content_handling
16
from org_fedora_oscap.content_handling import CONTENT_TYPES
17
18
from org_fedora_oscap.common import _
19
20
log = logging.getLogger("anaconda")
21
22
23
def is_network(scheme):
24
    return any(
25
        scheme.startswith(net_prefix)
26
        for net_prefix in data_fetch.NET_URL_PREFIXES)
27
28
29
def paths_are_equivalent(p1, p2):
30
    return os.path.abspath(p1) == os.path.abspath(p2)
31
32
33
def path_is_present_among_paths(path, paths):
34
    absolute_path = os.path.abspath(path)
35
    for second_path in paths:
36
        if paths_are_equivalent(path, second_path):
37
            return True
38
    return False
39
40
41
class ContentBringer:
42
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
43
44
    def __init__(self, what_if_fail):
45
        self._valid_content_uri = ""
46
        self.ca_certs_path = ""
47
        self.dest_file_name = ""
48
49
        self.activity_lock = threading.Lock()
50
        self.now_fetching_or_processing = False
51
        self.what_if_fail = what_if_fail
52
53
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
54
55
    @property
56
    def content_uri(self):
57
        return self._valid_content_uri
58
59
    @content_uri.setter
60
    def content_uri(self, uri):
61
        scheme_and_maybe_path = uri.split("://")
62
        if len(scheme_and_maybe_path) == 1:
63
            msg = (
64
                f"Invalid supplied content URL '{uri}', "
65
                "use the 'scheme://path' form.")
66
            raise KickstartValueError(msg)
67
        path = scheme_and_maybe_path[1]
68
        if "/" not in path:
69
            msg = f"Missing the path component of the '{uri}' URL"
70
            raise KickstartValueError(msg)
71
        basename = path.rsplit("/", 1)[1]
72
        if not basename:
73
            msg = f"Unable to deduce basename from the '{uri}' URL"
74
            raise KickstartValueError(msg)
75
        self._valid_content_uri = uri
76
        self.dest_file_name = self.CONTENT_DOWNLOAD_LOCATION / basename
77
78
    def fetch_content(self, content_uri, ca_certs_path=""):
79
        """
80
        Initiate fetch of the content into an appropriate directory
81
82
        Args:
83
            content_uri: URI location of the content to be fetched
84
            ca_certs_path: Path to the HTTPS certificate file
85
        """
86
        try:
87
            self.content_uri = content_uri
88
            self.ca_certs_path = ca_certs_path
89
        except Exception as exc:
90
            self.what_if_fail(exc)
91
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
92
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
93
        fetching_thread_name = self._fetch_files()
94
        return fetching_thread_name
95
96
    def _fetch_files(self):
97
        with self.activity_lock:
98
            if self.now_fetching_or_processing:
99
                msg = _(
100
                    f"Attempting to fetch '{self.content_uri}, "
101
                    "but the previous fetch is still in progress")
102
                log.warn(f"OSCAP Addon: {msg}")
103
                return
104
            self.now_fetching_or_processing = True
105
106
        fetching_thread_name = None
107
        try:
108
            fetching_thread_name = self._start_actual_fetch()
109
        except Exception as exc:
110
            with self.activity_lock:
111
                self.now_fetching_or_processing = False
112
            self.what_if_fail(exc)
113
114
        # We are not finished yet with the fetch
115
        return fetching_thread_name
116
117
    def _start_actual_fetch(self):
118
        fetching_thread_name = common.THREAD_FETCH_DATA
119
120
        scheme = self.content_uri.split("://")[0]
121
        if is_network(scheme):
122
            try:
123
                data_fetch.wait_for_network()
124
            except common.OSCAPaddonNetworkError as exc:
125
                msg = _(f"Network connection needed to fetch data. {exc}")
126
                raise common.OSCAPaddonNetworkError(msg)
127
128
        fetch_data_thread = AnacondaThread(
129
            name=fetching_thread_name,
130
            target=self.fetch_operation,
131
            args=(self.dest_file_name,),
132
            fatal=False)
133
134
        threadMgr.add(fetch_data_thread)
135
136
        return fetching_thread_name
137
138
    def fetch_operation(self, out_file):
139
        return data_fetch.fetch_data(self.content_uri, out_file, self.ca_certs_path)
140
141
    def finish_content_fetch(self, fetching_thread_name, fingerprint=""):
142
        try:
143
            self._finish_actual_fetch(fetching_thread_name)
144
            if fingerprint:
145
                self._verify_fingerprint(fingerprint)
146
        except Exception as exc:
147
            self.what_if_fail(exc)
148
        finally:
149
            with self.activity_lock:
150
                self.now_fetching_or_processing = False
151
152
    def _finish_actual_fetch(self, wait_for):
153
        if wait_for:
154
            log.info(f"OSCAP Addon: Waiting for thread {wait_for}")
155
            threadMgr.wait(wait_for)
156
            log.info(f"OSCAP Addon: Finished waiting for thread {wait_for}")
157
158
    def _verify_fingerprint(self, fingerprint=""):
159
        if not fingerprint:
160
            log.info(
161
                "OSCAP Addon: No fingerprint provided, skipping integrity "
162
                "check")
163
            return
164
165
        hash_obj = utils.get_hashing_algorithm(fingerprint)
166
        digest = utils.get_file_fingerprint(self.dest_file_name,
167
                                            hash_obj)
168
        if digest != fingerprint:
169
            log.error(
170
                "OSCAP Addon: "
171
                f"File {self.dest_file_name} failed integrity check - assumed "
172
                f"a {hash_obj.name} hash and '{fingerprint}', got '{digest}'"
173
            )
174
            msg = _(
175
                f"OSCAP Addon: Integrity check of the content failed - "
176
                f"{hash_obj.name} hash didn't match")
177
            raise content_handling.ContentCheckError(msg)
178
        log.info(f"Integrity check passed using {hash_obj.name} hash")
179
180
181
class ContentAnalyzer:
182
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
183
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
184
185
    @staticmethod
186
    def __get_content_type(url):
187
        if url.endswith(".rpm"):
188
            return "rpm"
189
        elif any(
190
                url.endswith(arch_type)
191
                for arch_type in common.SUPPORTED_ARCHIVES):
192
            return "archive"
193
        else:
194
            return "file"
195
196
    @staticmethod
197
    def __allow_one_expected_tailoring_or_no_tailoring(
198
            labelled_files, expected_tailoring):
199
        tailoring_label = CONTENT_TYPES["TAILORING"]
200
        if expected_tailoring:
201
            labelled_files = ContentAnalyzer.reduce_files(
202
                labelled_files, expected_tailoring, [tailoring_label])
203
        else:
204
            labelled_files = {
205
                path: label for path, label in labelled_files.items()
206
                if label != tailoring_label
207
            }
208
        return labelled_files
209
210
    @staticmethod
211
    def __filter_discovered_content(
212
            labelled_files, expected_path, expected_tailoring,
213
            expected_cpe_path):
214
        categories = (
215
            CONTENT_TYPES["DATASTREAM"],
216
            CONTENT_TYPES["XCCDF_CHECKLIST"])
217
        if expected_path:
218
            labelled_files = ContentAnalyzer.reduce_files(
219
                labelled_files, expected_path, categories)
220
221
        labelled_files = \
222
            ContentAnalyzer.__allow_one_expected_tailoring_or_no_tailoring(
223
                labelled_files, expected_tailoring)
224
225
        categories = (CONTENT_TYPES["CPE_DICT"], )
226
        if expected_cpe_path:
227
            labelled_files = ContentAnalyzer.reduce_files(
228
                labelled_files, expected_cpe_path, categories)
229
230
        return labelled_files
231
232
    @staticmethod
233
    def reduce_files(labelled_files, expected_path, categories):
234
        reduced_files = dict()
235
        if not path_is_present_among_paths(
236
                expected_path, labelled_files.keys()):
237
            msg = (
238
                f"Expected a file {expected_path} to be part of the supplied "
239
                f"content, but it was not the case, got only "
240
                f"{list(labelled_files.keys())}"
241
            )
242
            raise content_handling.ContentHandlingError(msg)
243
        for path, label in labelled_files.items():
244
            if label in categories and not paths_are_equivalent(
245
                    path, expected_path):
246
                continue
247
            reduced_files[path] = label
248
        return reduced_files
249
250
    @staticmethod
251
    def analyze(
252
            fetching_thread_name, fingerprint, dest_filename, what_if_fail,
253
            expected_path, expected_tailoring, expected_cpe_path):
254
        try:
255
            content = ContentAnalyzer.__analyze_fetched_content(
256
                fetching_thread_name, fingerprint, dest_filename,
257
                expected_path, expected_tailoring, expected_cpe_path)
258
        except Exception as exc:
259
            what_if_fail(exc)
260
            content = None
261
        return content
262
263
    @staticmethod
264
    def __analyze_fetched_content(
265
                wait_for, fingerprint, dest_filename, expected_path,
266
                expected_tailoring, expected_cpe_path):
267
        actually_fetched_content = wait_for is not None
268
        fpaths = ContentAnalyzer.__gather_available_files(
269
            actually_fetched_content, dest_filename)
270
271
        structured_content = ObtainedContent(
272
            ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION)
273
        content_type = ContentAnalyzer.__get_content_type(str(dest_filename))
274
        log.info(f"OSCAP Addon: started to look at the content")
275
        if content_type in ("archive", "rpm"):
276
            structured_content.add_content_archive(dest_filename)
277
278
        labelled_filenames = content_handling.identify_files(fpaths)
279
        labelled_filenames = ContentAnalyzer.__filter_discovered_content(
280
            labelled_filenames, expected_path, expected_tailoring,
281
            expected_cpe_path)
282
283
        for fname, label in labelled_filenames.items():
284
            structured_content.add_file(str(fname), label)
285
286
        if fingerprint and dest_filename:
287
            structured_content.record_verification(dest_filename)
288
289
        log.info(f"OSCAP Addon: finished looking at the content")
290
        return structured_content
291
292
    @staticmethod
293
    def __gather_available_files(actually_fetched_content, dest_filename):
294
        fpaths = []
295
        if not actually_fetched_content:
296
            if not dest_filename:  # using scap-security-guide
297
                fpaths = [ContentAnalyzer.DEFAULT_SSG_DATA_STREAM_PATH]
298
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
299
                fpaths = pathlib.Path(
300
                    ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION).rglob("*")
301
                fpaths = [str(p) for p in fpaths if p.is_file()]
302
        else:
303
            dest_filename = pathlib.Path(dest_filename)
304
            # RPM is an archive at this phase
305
            content_type = ContentAnalyzer.__get_content_type(
306
                str(dest_filename))
307
            if content_type in ("archive", "rpm"):
308
                try:
309
                    fpaths = common.extract_data(
310
                        str(dest_filename),
311
                        str(dest_filename.parent)
312
                    )
313
                except common.ExtractionError as err:
314
                    msg = (
315
                        f"Failed to extract the '{dest_filename}' "
316
                        f"archive: {str(err)}")
317
                    log.error("OSCAP Addon: " + msg)
318
                    raise err
319
320
            elif content_type == "file":
321
                fpaths = [str(dest_filename)]
322
            else:
323
                raise common.OSCAPaddonError("Unsupported content type")
324
        return fpaths
325
326
327
class ObtainedContent:
328
    """
329
    This class aims to assist the gathered files discovery -
330
    the addon can downloaded files directly, or they can be extracted for an archive.
331
    The class enables user to quickly understand what is available,
332
    and whether the current set of contents is usable for further processing.
333
    """
334
    def __init__(self, root):
335
        self.labelled_files = dict()
336
        self.datastream = None  # type: Pathlib.Path
337
        self.xccdf = None  # type: Pathlib.Path
338
        self.ovals = []  # type: List[Pathlib.Path]
339
        self.tailoring = None  # type: Pathlib.Path
340
        self.archive = None  # type: Pathlib.Path
341
        self.verified = None  # type: Pathlib.Path
342
        self.root = pathlib.Path(root)
343
344
    def record_verification(self, path):
345
        """
346
        Declare a file as verified (typically by means of a checksum)
347
        """
348
        path = pathlib.Path(path)
349
        assert path in self.labelled_files
350
        self.verified = path
351
352
    def add_content_archive(self, fname):
353
        """
354
        If files come from an archive, record this information using this function.
355
        """
356
        path = pathlib.Path(fname)
357
        self.labelled_files[path] = None
358
        self.archive = path
359
360
    def _assign_content_type(self, attribute_name, new_value):
361
        old_value = getattr(self, attribute_name)
362
        if old_value and old_value != new_value:
363
            msg = (
364
                f"When dealing with {attribute_name}, "
365
                f"there was already the {old_value.name} when setting the new {new_value.name}")
366
            raise content_handling.ContentHandlingError(msg)
367
        setattr(self, attribute_name, new_value)
368
369
    def add_file(self, fname, label=None):
370
        if not label:
371
            label = content_handling.identify_files([fname])[fname]
372
        path = pathlib.Path(fname)
373
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
374
            self._assign_content_type("tailoring", path)
375
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
376
            self._assign_content_type("datastream", path)
377
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
378
            self.ovals.append(path)
379
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
380
            self._assign_content_type("xccdf", path)
381
        self.labelled_files[path] = label
382
383
    def _datastream_content(self):
384
        if not self.datastream:
385
            return None
386
        if not self.datastream.exists():
387
            return None
388
        return self.datastream
389
390
    def _xccdf_content(self):
391
        if not self.xccdf or not self.ovals:
392
            return None
393
        some_ovals_exist = any([path.exists() for path in self.ovals])
394
        if not (self.xccdf.exists() and some_ovals_exist):
395
            return None
396
        return self.xccdf
397
398
    def find_expected_usable_content(self, relative_expected_content_path):
399
        content_path = self.root / relative_expected_content_path
400
        content_path = content_path.resolve()
401
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
402
403
        if content_path in eligible_main_content:
404
            return content_path
405
        else:
406
            if not content_path.exists():
407
                msg = f"Couldn't find '{content_path}' among the available content"
408
            else:
409
                msg = (
410
                    f"File '{content_path}' is not a valid datastream "
411
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
412
            raise content_handling.ContentHandlingError(msg)
413
414
    def select_main_usable_content(self):
415
        if self._datastream_content():
416
            return self._datastream_content()
417
        elif self._xccdf_content():
418
            return self._xccdf_content()
419
        else:
420
            msg = (
421
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
422
                "among the available content")
423
            raise content_handling.ContentHandlingError(msg)
424
425
    def get_preferred_tailoring(self, tailoring_path):
426
        if tailoring_path:
427
            if tailoring_path != str(self.tailoring.relative_to(self.root)):
428
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
429
                raise content_handling.ContentHandlingError(msg)
430
        return self.tailoring
431
432
    def get_preferred_content(self, content_path):
433
        if content_path:
434
            preferred_content = self.find_expected_usable_content(content_path)
435
        else:
436
            preferred_content = self.select_main_usable_content()
437
        return preferred_content
438