Passed
Push — rhel9-branch ( 8d280e...2a8799 )
by Matěj
02:10 queued 14s
created

ObtainedContent.add_file()   B

Complexity

Conditions 6

Size

Total Lines 13
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 13
nop 3
dl 0
loc 13
rs 8.6666
c 0
b 0
f 0
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
import os
6
from glob import glob
7
from typing import List
8
9
from pyanaconda.core import constants
10
from pyanaconda.threading import threadMgr
11
from pykickstart.errors import KickstartValueError
12
13
from org_fedora_oscap import data_fetch, utils
14
from org_fedora_oscap import common
15
from org_fedora_oscap import content_handling
16
from org_fedora_oscap.content_handling import CONTENT_TYPES
17
18
from org_fedora_oscap.common import _
19
20
log = logging.getLogger("anaconda")
21
22
23
def is_network(scheme):
24
    return any(
25
        scheme.startswith(net_prefix)
26
        for net_prefix in data_fetch.NET_URL_PREFIXES)
27
28
29
def paths_are_equivalent(p1, p2):
30
    return os.path.abspath(p1) == os.path.abspath(p2)
31
32
33
def path_is_present_among_paths(path, paths):
34
    absolute_path = os.path.abspath(path)
35
    for second_path in paths:
36
        if paths_are_equivalent(path, second_path):
37
            return True
38
    return False
39
40
41
class ContentBringer:
42
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
43
44
    def __init__(self, what_if_fail):
45
        self._valid_content_uri = ""
46
        self.dest_file_name = ""
47
48
        self.activity_lock = threading.Lock()
49
        self.now_fetching_or_processing = False
50
        self.what_if_fail = what_if_fail
51
52
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
53
54
    @property
55
    def content_uri(self):
56
        return self._valid_content_uri
57
58
    @content_uri.setter
59
    def content_uri(self, uri):
60
        scheme_and_maybe_path = uri.split("://")
61
        if len(scheme_and_maybe_path) == 1:
62
            msg = (
63
                f"Invalid supplied content URL '{uri}', "
64
                "use the 'scheme://path' form.")
65
            raise KickstartValueError(msg)
66
        path = scheme_and_maybe_path[1]
67
        if "/" not in path:
68
            msg = f"Missing the path component of the '{uri}' URL"
69
            raise KickstartValueError(msg)
70
        basename = path.rsplit("/", 1)[1]
71
        if not basename:
72
            msg = f"Unable to deduce basename from the '{uri}' URL"
73
            raise KickstartValueError(msg)
74
        self._valid_content_uri = uri
75
        self.dest_file_name = self.CONTENT_DOWNLOAD_LOCATION / basename
76
77
    def fetch_content(self, content_uri, ca_certs_path=""):
78
        """
79
        Initiate fetch of the content into an appropriate directory
80
81
        Args:
82
            content_uri: URI location of the content to be fetched
83
            ca_certs_path: Path to the HTTPS certificate file
84
        """
85
        try:
86
            self.content_uri = content_uri
87
        except Exception as exc:
88
            self.what_if_fail(exc)
89
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
90
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
91
        fetching_thread_name = self._fetch_files(ca_certs_path)
92
        return fetching_thread_name
93
94
    def _fetch_files(self, ca_certs_path):
95
        with self.activity_lock:
96
            if self.now_fetching_or_processing:
97
                msg = "OSCAP Addon: Strange, it seems that we are already " \
98
                    "fetching something."
99
                log.warn(msg)
100
                return
101
            self.now_fetching_or_processing = True
102
103
        fetching_thread_name = None
104
        try:
105
            fetching_thread_name = self._start_actual_fetch(ca_certs_path)
106
        except Exception as exc:
107
            with self.activity_lock:
108
                self.now_fetching_or_processing = False
109
            self.what_if_fail(exc)
110
111
        # We are not finished yet with the fetch
112
        return fetching_thread_name
113
114
    def _start_actual_fetch(self, ca_certs_path):
115
        fetching_thread_name = None
116
117
        scheme = self.content_uri.split("://")[0]
118
        if is_network(scheme):
119
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
120
                self.content_uri,
121
                self.dest_file_name,
122
                ca_certs_path
123
            )
124
        else:  # invalid schemes are handled down the road
125
            fetching_thread_name = data_fetch.fetch_local_data(
126
                self.content_uri,
127
                self.dest_file_name,
128
            )
129
        return fetching_thread_name
130
131
    def finish_content_fetch(self, fetching_thread_name, fingerprint):
132
        try:
133
            self._finish_actual_fetch(fetching_thread_name)
134
            if fingerprint:
135
                self._verify_fingerprint(fingerprint)
136
        except Exception as exc:
137
            self.what_if_fail(exc)
138
        finally:
139
            with self.activity_lock:
140
                self.now_fetching_or_processing = False
141
142
    def _finish_actual_fetch(self, wait_for):
143
        if wait_for:
144
            log.info(f"OSCAP Addon: Waiting for thread {wait_for}")
145
            threadMgr.wait(wait_for)
146
            log.info(f"OSCAP Addon: Finished waiting for thread {wait_for}")
147
148
    def _verify_fingerprint(self, fingerprint=""):
149
        if not fingerprint:
150
            log.info(
151
                "OSCAP Addon: No fingerprint provided, skipping integrity "
152
                "check")
153
            return
154
155
        hash_obj = utils.get_hashing_algorithm(fingerprint)
156
        digest = utils.get_file_fingerprint(self.dest_file_name,
157
                                            hash_obj)
158
        if digest != fingerprint:
159
            log.error(
160
                "OSCAP Addon: "
161
                f"File {self.dest_file_name} failed integrity check - assumed "
162
                f"a {hash_obj.name} hash and '{fingerprint}', got '{digest}'"
163
            )
164
            msg = _(
165
                f"OSCAP Addon: Integrity check of the content failed - "
166
                f"{hash_obj.name} hash didn't match")
167
            raise content_handling.ContentCheckError(msg)
168
        log.info(f"Integrity check passed using {hash_obj.name} hash")
169
170
171
class ContentAnalyzer:
172
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
173
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
174
175
    @staticmethod
176
    def __get_content_type(url):
177
        if url.endswith(".rpm"):
178
            return "rpm"
179
        elif any(
180
                url.endswith(arch_type)
181
                for arch_type in common.SUPPORTED_ARCHIVES):
182
            return "archive"
183
        else:
184
            return "file"
185
186
    @staticmethod
187
    def __allow_one_expected_tailoring_or_no_tailoring(
188
            labelled_files, expected_tailoring):
189
        tailoring_label = CONTENT_TYPES["TAILORING"]
190
        if expected_tailoring:
191
            labelled_files = ContentAnalyzer.reduce_files(
192
                labelled_files, expected_tailoring, [tailoring_label])
193
        else:
194
            labelled_files = {
195
                path: label for path, label in labelled_files.items()
196
                if label != tailoring_label
197
            }
198
        return labelled_files
199
200
    @staticmethod
201
    def __filter_discovered_content(
202
            labelled_files, expected_path, expected_tailoring,
203
            expected_cpe_path):
204
        categories = (
205
            CONTENT_TYPES["DATASTREAM"],
206
            CONTENT_TYPES["XCCDF_CHECKLIST"])
207
        if expected_path:
208
            labelled_files = ContentAnalyzer.reduce_files(
209
                labelled_files, expected_path, categories)
210
211
        labelled_files = \
212
            ContentAnalyzer.__allow_one_expected_tailoring_or_no_tailoring(
213
                labelled_files, expected_tailoring)
214
215
        categories = (CONTENT_TYPES["CPE_DICT"], )
216
        if expected_cpe_path:
217
            labelled_files = ContentAnalyzer.reduce_files(
218
                labelled_files, expected_cpe_path, categories)
219
220
        return labelled_files
221
222
    @staticmethod
223
    def reduce_files(labelled_files, expected_path, categories):
224
        reduced_files = dict()
225
        if not path_is_present_among_paths(
226
                expected_path, labelled_files.keys()):
227
            msg = (
228
                f"Expected a file {expected_path} to be part of the supplied "
229
                f"content, but it was not the case, got only "
230
                f"{list(labelled_files.keys())}"
231
            )
232
            raise content_handling.ContentHandlingError(msg)
233
        for path, label in labelled_files.items():
234
            if label in categories and not paths_are_equivalent(
235
                    path, expected_path):
236
                continue
237
            reduced_files[path] = label
238
        return reduced_files
239
240
    @staticmethod
241
    def analyze(
242
            fetching_thread_name, fingerprint, dest_filename, what_if_fail,
243
            expected_path, expected_tailoring, expected_cpe_path):
244
        try:
245
            content = ContentAnalyzer.__analyze_fetched_content(
246
                fetching_thread_name, fingerprint, dest_filename,
247
                expected_path, expected_tailoring, expected_cpe_path)
248
        except Exception as exc:
249
            what_if_fail(exc)
250
            content = None
251
        return content
252
253
    @staticmethod
254
    def __analyze_fetched_content(
255
                wait_for, fingerprint, dest_filename, expected_path,
256
                expected_tailoring, expected_cpe_path):
257
        actually_fetched_content = wait_for is not None
258
        fpaths = ContentAnalyzer.__gather_available_files(
259
            actually_fetched_content, dest_filename)
260
261
        structured_content = ObtainedContent(
262
            ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION)
263
        content_type = ContentAnalyzer.__get_content_type(str(dest_filename))
264
        log.info(f"OSCAP Addon: started to look at the content")
265
        if content_type in ("archive", "rpm"):
266
            structured_content.add_content_archive(dest_filename)
267
268
        labelled_filenames = content_handling.identify_files(fpaths)
269
        labelled_filenames = ContentAnalyzer.__filter_discovered_content(
270
            labelled_filenames, expected_path, expected_tailoring,
271
            expected_cpe_path)
272
273
        for fname, label in labelled_filenames.items():
274
            structured_content.add_file(str(fname), label)
275
276
        if fingerprint and dest_filename:
277
            structured_content.record_verification(dest_filename)
278
279
        log.info(f"OSCAP Addon: finished looking at the content")
280
        return structured_content
281
282
    @staticmethod
283
    def __gather_available_files(actually_fetched_content, dest_filename):
284
        fpaths = []
285
        if not actually_fetched_content:
286
            if not dest_filename:  # using scap-security-guide
287
                fpaths = [ContentAnalyzer.DEFAULT_SSG_DATA_STREAM_PATH]
288
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
289
                fpaths = pathlib.Path(
290
                    ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION).rglob("*")
291
                fpaths = [str(p) for p in fpaths if p.is_file()]
292
        else:
293
            dest_filename = pathlib.Path(dest_filename)
294
            # RPM is an archive at this phase
295
            content_type = ContentAnalyzer.__get_content_type(
296
                str(dest_filename))
297
            if content_type in ("archive", "rpm"):
298
                try:
299
                    fpaths = common.extract_data(
300
                        str(dest_filename),
301
                        str(dest_filename.parent)
302
                    )
303
                except common.ExtractionError as err:
304
                    msg = (
305
                        f"Failed to extract the '{dest_filename}' "
306
                        f"archive: {str(err)}")
307
                    log.error("OSCAP Addon: " + msg)
308
                    raise err
309
310
            elif content_type == "file":
311
                fpaths = [str(dest_filename)]
312
            else:
313
                raise common.OSCAPaddonError("Unsupported content type")
314
        return fpaths
315
316
317
class ObtainedContent:
318
    """
319
    This class aims to assist the gathered files discovery -
320
    the addon can downloaded files directly, or they can be extracted for an archive.
321
    The class enables user to quickly understand what is available,
322
    and whether the current set of contents is usable for further processing.
323
    """
324
    def __init__(self, root):
325
        self.labelled_files = dict()
326
        self.datastream = None  # type: Pathlib.Path
327
        self.xccdf = None  # type: Pathlib.Path
328
        self.ovals = []  # type: List[Pathlib.Path]
329
        self.tailoring = None  # type: Pathlib.Path
330
        self.archive = None  # type: Pathlib.Path
331
        self.verified = None  # type: Pathlib.Path
332
        self.root = pathlib.Path(root)
333
334
    def record_verification(self, path):
335
        """
336
        Declare a file as verified (typically by means of a checksum)
337
        """
338
        path = pathlib.Path(path)
339
        assert path in self.labelled_files
340
        self.verified = path
341
342
    def add_content_archive(self, fname):
343
        """
344
        If files come from an archive, record this information using this function.
345
        """
346
        path = pathlib.Path(fname)
347
        self.labelled_files[path] = None
348
        self.archive = path
349
350
    def _assign_content_type(self, attribute_name, new_value):
351
        old_value = getattr(self, attribute_name)
352
        if old_value and old_value != new_value:
353
            msg = (
354
                f"When dealing with {attribute_name}, "
355
                f"there was already the {old_value.name} when setting the new {new_value.name}")
356
            raise content_handling.ContentHandlingError(msg)
357
        setattr(self, attribute_name, new_value)
358
359
    def add_file(self, fname, label=None):
360
        if not label:
361
            label = content_handling.identify_files([fname])[fname]
362
        path = pathlib.Path(fname)
363
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
364
            self._assign_content_type("tailoring", path)
365
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
366
            self._assign_content_type("datastream", path)
367
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
368
            self.ovals.append(path)
369
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
370
            self._assign_content_type("xccdf", path)
371
        self.labelled_files[path] = label
372
373
    def _datastream_content(self):
374
        if not self.datastream:
375
            return None
376
        if not self.datastream.exists():
377
            return None
378
        return self.datastream
379
380
    def _xccdf_content(self):
381
        if not self.xccdf or not self.ovals:
382
            return None
383
        some_ovals_exist = any([path.exists() for path in self.ovals])
384
        if not (self.xccdf.exists() and some_ovals_exist):
385
            return None
386
        return self.xccdf
387
388
    def find_expected_usable_content(self, relative_expected_content_path):
389
        content_path = self.root / relative_expected_content_path
390
        content_path = content_path.resolve()
391
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
392
393
        if content_path in eligible_main_content:
394
            return content_path
395
        else:
396
            if not content_path.exists():
397
                msg = f"Couldn't find '{content_path}' among the available content"
398
            else:
399
                msg = (
400
                    f"File '{content_path}' is not a valid datastream "
401
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
402
            raise content_handling.ContentHandlingError(msg)
403
404
    def select_main_usable_content(self):
405
        if self._datastream_content():
406
            return self._datastream_content()
407
        elif self._xccdf_content():
408
            return self._xccdf_content()
409
        else:
410
            msg = (
411
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
412
                "among the available content")
413
            raise content_handling.ContentHandlingError(msg)
414
415
    def get_preferred_tailoring(self, tailoring_path):
416
        if tailoring_path:
417
            if tailoring_path != str(self.tailoring.relative_to(self.root)):
418
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
419
                raise content_handling.ContentHandlingError(msg)
420
        return self.tailoring
421
422
    def get_preferred_content(self, content_path):
423
        if content_path:
424
            preferred_content = self.find_expected_usable_content(content_path)
425
        else:
426
            preferred_content = self.select_main_usable_content()
427
        return preferred_content
428