Passed
Pull Request — rhel9-branch (#240)
by Matěj
01:12
created

ObtainedContent.add_file()   B

Complexity

Conditions 6

Size

Total Lines 13
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 13
nop 3
dl 0
loc 13
rs 8.6666
c 0
b 0
f 0
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
import os
6
from glob import glob
7
from typing import List
8
9
from pyanaconda.core import constants
10
from pyanaconda.threading import threadMgr, AnacondaThread
11
from pykickstart.errors import KickstartValueError
12
13
from org_fedora_oscap import data_fetch, utils
14
from org_fedora_oscap import common
15
from org_fedora_oscap import content_handling
16
from org_fedora_oscap.content_handling import CONTENT_TYPES
17
18
from org_fedora_oscap.common import _
19
20
log = logging.getLogger("anaconda")
21
22
23
def is_network(scheme):
24
    return any(
25
        scheme.startswith(net_prefix)
26
        for net_prefix in data_fetch.NET_URL_PREFIXES)
27
28
29
def paths_are_equivalent(p1, p2):
30
    return os.path.abspath(p1) == os.path.abspath(p2)
31
32
33
def path_is_present_among_paths(path, paths):
34
    for second_path in paths:
35
        if paths_are_equivalent(path, second_path):
36
            return True
37
    return False
38
39
40
class ContentBringer:
41
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
42
43
    def __init__(self, what_if_fail):
44
        self._valid_content_uri = ""
45
        self.ca_certs_path = ""
46
        self.dest_file_name = ""
47
48
        self.activity_lock = threading.Lock()
49
        self.now_fetching_or_processing = False
50
        self.what_if_fail = what_if_fail
51
52
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
53
54
    @property
55
    def content_uri(self):
56
        return self._valid_content_uri
57
58
    @content_uri.setter
59
    def content_uri(self, uri):
60
        scheme_and_maybe_path = uri.split("://")
61
        if len(scheme_and_maybe_path) == 1:
62
            msg = (
63
                f"Invalid supplied content URL '{uri}', "
64
                "use the 'scheme://path' form.")
65
            raise KickstartValueError(msg)
66
        path = scheme_and_maybe_path[1]
67
        if "/" not in path:
68
            msg = f"Missing the path component of the '{uri}' URL"
69
            raise KickstartValueError(msg)
70
        basename = path.rsplit("/", 1)[1]
71
        if not basename:
72
            msg = f"Unable to deduce basename from the '{uri}' URL"
73
            raise KickstartValueError(msg)
74
        self._valid_content_uri = uri
75
        self.dest_file_name = self.CONTENT_DOWNLOAD_LOCATION / basename
76
77
    def fetch_content(self, content_uri, ca_certs_path=""):
78
        """
79
        Initiate fetch of the content into an appropriate directory
80
81
        Args:
82
            content_uri: URI location of the content to be fetched
83
            ca_certs_path: Path to the HTTPS certificate file
84
        """
85
        try:
86
            self.content_uri = content_uri
87
            self.ca_certs_path = ca_certs_path
88
        except Exception as exc:
89
            self.what_if_fail(exc)
90
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
91
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
92
        fetching_thread_name = self._fetch_files()
93
        return fetching_thread_name
94
95
    def _fetch_files(self):
96
        with self.activity_lock:
97
            if self.now_fetching_or_processing:
98
                msg = _(
99
                    f"Attempting to fetch '{self.content_uri}, "
100
                    "but the previous fetch is still in progress")
101
                log.warn(f"OSCAP Addon: {msg}")
102
                return
103
            self.now_fetching_or_processing = True
104
105
        fetching_thread_name = None
106
        try:
107
            fetching_thread_name = self._start_actual_fetch()
108
        except Exception as exc:
109
            with self.activity_lock:
110
                self.now_fetching_or_processing = False
111
            self.what_if_fail(exc)
112
113
        # We are not finished yet with the fetch
114
        return fetching_thread_name
115
116
    def _start_actual_fetch(self):
117
        fetching_thread_name = common.THREAD_FETCH_DATA
118
119
        scheme = self.content_uri.split("://")[0]
120
        if is_network(scheme):
121
            try:
122
                data_fetch.wait_for_network()
123
            except common.OSCAPaddonNetworkError as exc:
124
                msg = _(f"Network connection needed to fetch data. {exc}")
125
                raise common.OSCAPaddonNetworkError(msg)
126
127
        fetch_data_thread = AnacondaThread(
128
            name=fetching_thread_name,
129
            target=self.fetch_operation,
130
            args=(self.dest_file_name,),
131
            fatal=False)
132
133
        threadMgr.add(fetch_data_thread)
134
135
        return fetching_thread_name
136
137
    def fetch_operation(self, out_file):
138
        return data_fetch.fetch_data(self.content_uri, out_file, self.ca_certs_path)
139
140
    def finish_content_fetch(self, fetching_thread_name, fingerprint=""):
141
        try:
142
            self._finish_actual_fetch(fetching_thread_name)
143
            if fingerprint:
144
                self._verify_fingerprint(fingerprint)
145
        except Exception as exc:
146
            self.what_if_fail(exc)
147
        finally:
148
            with self.activity_lock:
149
                self.now_fetching_or_processing = False
150
151
    def _finish_actual_fetch(self, wait_for):
152
        if wait_for:
153
            log.info(f"OSCAP Addon: Waiting for thread {wait_for}")
154
            threadMgr.wait(wait_for)
155
            log.info(f"OSCAP Addon: Finished waiting for thread {wait_for}")
156
157
    def _verify_fingerprint(self, fingerprint=""):
158
        if not fingerprint:
159
            log.info(
160
                "OSCAP Addon: No fingerprint provided, skipping integrity "
161
                "check")
162
            return
163
164
        hash_obj = utils.get_hashing_algorithm(fingerprint)
165
        digest = utils.get_file_fingerprint(self.dest_file_name,
166
                                            hash_obj)
167
        if digest != fingerprint:
168
            log.error(
169
                "OSCAP Addon: "
170
                f"File {self.dest_file_name} failed integrity check - assumed "
171
                f"a {hash_obj.name} hash and '{fingerprint}', got '{digest}'"
172
            )
173
            msg = _(
174
                f"OSCAP Addon: Integrity check of the content failed - "
175
                f"{hash_obj.name} hash didn't match")
176
            raise content_handling.ContentCheckError(msg)
177
        log.info(f"Integrity check passed using {hash_obj.name} hash")
178
179
180
class ContentAnalyzer:
181
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
182
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
183
184
    @staticmethod
185
    def __get_content_type(url):
186
        if url.endswith(".rpm"):
187
            return "rpm"
188
        elif any(
189
                url.endswith(arch_type)
190
                for arch_type in common.SUPPORTED_ARCHIVES):
191
            return "archive"
192
        else:
193
            return "file"
194
195
    @staticmethod
196
    def __allow_one_expected_tailoring_or_no_tailoring(
197
            labelled_files, expected_tailoring):
198
        tailoring_label = CONTENT_TYPES["TAILORING"]
199
        if expected_tailoring:
200
            labelled_files = ContentAnalyzer.reduce_files(
201
                labelled_files, expected_tailoring, [tailoring_label])
202
        else:
203
            labelled_files = {
204
                path: label for path, label in labelled_files.items()
205
                if label != tailoring_label
206
            }
207
        return labelled_files
208
209
    @staticmethod
210
    def __filter_discovered_content(
211
            labelled_files, expected_path, expected_tailoring,
212
            expected_cpe_path):
213
        categories = (
214
            CONTENT_TYPES["DATASTREAM"],
215
            CONTENT_TYPES["XCCDF_CHECKLIST"])
216
        if expected_path:
217
            labelled_files = ContentAnalyzer.reduce_files(
218
                labelled_files, expected_path, categories)
219
220
        labelled_files = \
221
            ContentAnalyzer.__allow_one_expected_tailoring_or_no_tailoring(
222
                labelled_files, expected_tailoring)
223
224
        categories = (CONTENT_TYPES["CPE_DICT"], )
225
        if expected_cpe_path:
226
            labelled_files = ContentAnalyzer.reduce_files(
227
                labelled_files, expected_cpe_path, categories)
228
229
        return labelled_files
230
231
    @staticmethod
232
    def reduce_files(labelled_files, expected_path, categories):
233
        reduced_files = dict()
234
        if not path_is_present_among_paths(
235
                expected_path, labelled_files.keys()):
236
            msg = (
237
                f"Expected a file {expected_path} to be part of the supplied "
238
                f"content, but it was not the case, got only "
239
                f"{list(labelled_files.keys())}"
240
            )
241
            raise content_handling.ContentHandlingError(msg)
242
        for path, label in labelled_files.items():
243
            if label in categories and not paths_are_equivalent(
244
                    path, expected_path):
245
                continue
246
            reduced_files[path] = label
247
        return reduced_files
248
249
    @staticmethod
250
    def analyze(
251
            fetching_thread_name, fingerprint, dest_filename, what_if_fail,
252
            expected_path, expected_tailoring, expected_cpe_path):
253
        try:
254
            content = ContentAnalyzer.__analyze_fetched_content(
255
                fetching_thread_name, fingerprint, dest_filename,
256
                expected_path, expected_tailoring, expected_cpe_path)
257
        except Exception as exc:
258
            what_if_fail(exc)
259
            content = None
260
        return content
261
262
    @staticmethod
263
    def __analyze_fetched_content(
264
                wait_for, fingerprint, dest_filename, expected_path,
265
                expected_tailoring, expected_cpe_path):
266
        actually_fetched_content = wait_for is not None
267
        fpaths = ContentAnalyzer.__gather_available_files(
268
            actually_fetched_content, dest_filename)
269
270
        structured_content = ObtainedContent(
271
            ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION)
272
        content_type = ContentAnalyzer.__get_content_type(str(dest_filename))
273
        log.info(f"OSCAP Addon: started to look at the content")
274
        if content_type in ("archive", "rpm"):
275
            structured_content.add_content_archive(dest_filename)
276
277
        labelled_filenames = content_handling.identify_files(fpaths)
278
        labelled_filenames = ContentAnalyzer.__filter_discovered_content(
279
            labelled_filenames, expected_path, expected_tailoring,
280
            expected_cpe_path)
281
282
        for fname, label in labelled_filenames.items():
283
            structured_content.add_file(str(fname), label)
284
285
        if fingerprint and dest_filename:
286
            structured_content.record_verification(dest_filename)
287
288
        log.info(f"OSCAP Addon: finished looking at the content")
289
        return structured_content
290
291
    @staticmethod
292
    def __gather_available_files(actually_fetched_content, dest_filename):
293
        fpaths = []
294
        if not actually_fetched_content:
295
            if not dest_filename:  # using scap-security-guide
296
                fpaths = [ContentAnalyzer.DEFAULT_SSG_DATA_STREAM_PATH]
297
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
298
                fpaths = pathlib.Path(
299
                    ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION).rglob("*")
300
                fpaths = [str(p) for p in fpaths if p.is_file()]
301
        else:
302
            dest_filename = pathlib.Path(dest_filename)
303
            # RPM is an archive at this phase
304
            content_type = ContentAnalyzer.__get_content_type(
305
                str(dest_filename))
306
            if content_type in ("archive", "rpm"):
307
                try:
308
                    fpaths = common.extract_data(
309
                        str(dest_filename),
310
                        str(dest_filename.parent)
311
                    )
312
                except common.ExtractionError as err:
313
                    msg = (
314
                        f"Failed to extract the '{dest_filename}' "
315
                        f"archive: {str(err)}")
316
                    log.error("OSCAP Addon: " + msg)
317
                    raise err
318
319
            elif content_type == "file":
320
                fpaths = [str(dest_filename)]
321
            else:
322
                raise common.OSCAPaddonError("Unsupported content type")
323
        return fpaths
324
325
326
class ObtainedContent:
327
    """
328
    This class aims to assist the gathered files discovery -
329
    the addon can downloaded files directly, or they can be extracted for an archive.
330
    The class enables user to quickly understand what is available,
331
    and whether the current set of contents is usable for further processing.
332
    """
333
    def __init__(self, root):
334
        self.labelled_files = dict()
335
        self.datastream = None  # type: Pathlib.Path
336
        self.xccdf = None  # type: Pathlib.Path
337
        self.ovals = []  # type: List[Pathlib.Path]
338
        self.tailoring = None  # type: Pathlib.Path
339
        self.archive = None  # type: Pathlib.Path
340
        self.verified = None  # type: Pathlib.Path
341
        self.root = pathlib.Path(root)
342
343
    def record_verification(self, path):
344
        """
345
        Declare a file as verified (typically by means of a checksum)
346
        """
347
        path = pathlib.Path(path)
348
        assert path in self.labelled_files
349
        self.verified = path
350
351
    def add_content_archive(self, fname):
352
        """
353
        If files come from an archive, record this information using this function.
354
        """
355
        path = pathlib.Path(fname)
356
        self.labelled_files[path] = None
357
        self.archive = path
358
359
    def _assign_content_type(self, attribute_name, new_value):
360
        old_value = getattr(self, attribute_name)
361
        if old_value and old_value != new_value:
362
            msg = (
363
                f"When dealing with {attribute_name}, "
364
                f"there was already the {old_value.name} when setting the new {new_value.name}")
365
            raise content_handling.ContentHandlingError(msg)
366
        setattr(self, attribute_name, new_value)
367
368
    def add_file(self, fname, label=None):
369
        if not label:
370
            label = content_handling.identify_files([fname])[fname]
371
        path = pathlib.Path(fname)
372
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
373
            self._assign_content_type("tailoring", path)
374
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
375
            self._assign_content_type("datastream", path)
376
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
377
            self.ovals.append(path)
378
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
379
            self._assign_content_type("xccdf", path)
380
        self.labelled_files[path] = label
381
382
    def _datastream_content(self):
383
        if not self.datastream:
384
            return None
385
        if not self.datastream.exists():
386
            return None
387
        return self.datastream
388
389
    def _xccdf_content(self):
390
        if not self.xccdf or not self.ovals:
391
            return None
392
        some_ovals_exist = any([path.exists() for path in self.ovals])
393
        if not (self.xccdf.exists() and some_ovals_exist):
394
            return None
395
        return self.xccdf
396
397
    def find_expected_usable_content(self, relative_expected_content_path):
398
        content_path = self.root / relative_expected_content_path
399
        content_path = content_path.resolve()
400
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
401
402
        if content_path in eligible_main_content:
403
            return content_path
404
        else:
405
            if not content_path.exists():
406
                msg = f"Couldn't find '{content_path}' among the available content"
407
            else:
408
                msg = (
409
                    f"File '{content_path}' is not a valid datastream "
410
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
411
            raise content_handling.ContentHandlingError(msg)
412
413
    def select_main_usable_content(self):
414
        if self._datastream_content():
415
            return self._datastream_content()
416
        elif self._xccdf_content():
417
            return self._xccdf_content()
418
        else:
419
            msg = (
420
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
421
                "among the available content")
422
            raise content_handling.ContentHandlingError(msg)
423
424
    def get_preferred_tailoring(self, tailoring_path):
425
        if tailoring_path:
426
            if tailoring_path != str(self.tailoring.relative_to(self.root)):
427
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
428
                raise content_handling.ContentHandlingError(msg)
429
        return self.tailoring
430
431
    def get_preferred_content(self, content_path):
432
        if content_path:
433
            preferred_content = self.find_expected_usable_content(content_path)
434
        else:
435
            preferred_content = self.select_main_usable_content()
436
        return preferred_content
437