Completed
Push — rhel8-branch ( 3645fb...e3e780 )
by Matěj
18s queued 15s
created

ContentBringer.fetch_content()   A

Complexity

Conditions 2

Size

Total Lines 16
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 3
dl 0
loc 16
rs 9.95
c 0
b 0
f 0
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
import os
6
from glob import glob
7
8
from pyanaconda.core import constants
9
from pyanaconda.threading import threadMgr
10
from pykickstart.errors import KickstartValueError
11
12
from org_fedora_oscap import data_fetch, utils
13
from org_fedora_oscap import common
14
from org_fedora_oscap import content_handling
15
from org_fedora_oscap.content_handling import CONTENT_TYPES
16
17
from org_fedora_oscap.common import _
18
19
log = logging.getLogger("anaconda")
20
21
22
def is_network(scheme):
23
    return any(
24
        scheme.startswith(net_prefix)
25
        for net_prefix in data_fetch.NET_URL_PREFIXES)
26
27
28
def paths_are_equivalent(p1, p2):
29
    return os.path.abspath(p1) == os.path.abspath(p2)
30
31
32
def path_is_present_among_paths(path, paths):
33
    absolute_path = os.path.abspath(path)
34
    for second_path in paths:
35
        if paths_are_equivalent(path, second_path):
36
            return True
37
    return False
38
39
40
class ContentBringer:
41
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
42
43
    def __init__(self, what_if_fail):
44
        self._valid_content_uri = ""
45
        self.dest_file_name = ""
46
47
        self.activity_lock = threading.Lock()
48
        self.now_fetching_or_processing = False
49
        self.what_if_fail = what_if_fail
50
51
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
52
53
    @property
54
    def content_uri(self):
55
        return self._valid_content_uri
56
57
    @content_uri.setter
58
    def content_uri(self, uri):
59
        scheme_and_maybe_path = uri.split("://")
60
        if len(scheme_and_maybe_path) == 1:
61
            msg = (
62
                f"Invalid supplied content URL '{uri}', "
63
                "use the 'scheme://path' form.")
64
            raise KickstartValueError(msg)
65
        path = scheme_and_maybe_path[1]
66
        if "/" not in path:
67
            msg = f"Missing the path component of the '{uri}' URL"
68
            raise KickstartValueError(msg)
69
        basename = path.rsplit("/", 1)[1]
70
        if not basename:
71
            msg = f"Unable to deduce basename from the '{uri}' URL"
72
            raise KickstartValueError(msg)
73
        self._valid_content_uri = uri
74
        self.dest_file_name = self.CONTENT_DOWNLOAD_LOCATION / basename
75
76
    def fetch_content(self, content_uri, ca_certs_path=""):
77
        """
78
        Initiate fetch of the content into an appropriate directory
79
80
        Args:
81
            content_uri: URI location of the content to be fetched
82
            ca_certs_path: Path to the HTTPS certificate file
83
        """
84
        try:
85
            self.content_uri = content_uri
86
        except Exception as exc:
87
            self.what_if_fail(exc)
88
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
89
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
90
        fetching_thread_name = self._fetch_files(ca_certs_path)
91
        return fetching_thread_name
92
93
    def _fetch_files(self, ca_certs_path):
94
        with self.activity_lock:
95
            if self.now_fetching_or_processing:
96
                msg = "OSCAP Addon: Strange, it seems that we are already " \
97
                    "fetching something."
98
                log.warn(msg)
99
                return
100
            self.now_fetching_or_processing = True
101
102
        fetching_thread_name = None
103
        try:
104
            fetching_thread_name = self._start_actual_fetch(ca_certs_path)
105
        except Exception as exc:
106
            with self.activity_lock:
107
                self.now_fetching_or_processing = False
108
            self.what_if_fail(exc)
109
110
        # We are not finished yet with the fetch
111
        return fetching_thread_name
112
113
    def _start_actual_fetch(self, ca_certs_path):
114
        fetching_thread_name = None
115
116
        scheme = self.content_uri.split("://")[0]
117
        if is_network(scheme):
118
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
119
                self.content_uri,
120
                self.dest_file_name,
121
                ca_certs_path
122
            )
123
        else:  # invalid schemes are handled down the road
124
            fetching_thread_name = data_fetch.fetch_local_data(
125
                self.content_uri,
126
                self.dest_file_name,
127
            )
128
        return fetching_thread_name
129
130
    def finish_content_fetch(self, fetching_thread_name, fingerprint):
131
        try:
132
            self._finish_actual_fetch(fetching_thread_name)
133
            if fingerprint:
134
                self._verify_fingerprint(fingerprint)
135
        except Exception as exc:
136
            self.what_if_fail(exc)
137
        finally:
138
            with self.activity_lock:
139
                self.now_fetching_or_processing = False
140
141
    def _finish_actual_fetch(self, wait_for):
142
        if wait_for:
143
            log.info(f"OSCAP Addon: Waiting for thread {wait_for}")
144
            threadMgr.wait(wait_for)
145
            log.info(f"OSCAP Addon: Finished waiting for thread {wait_for}")
146
147
    def _verify_fingerprint(self, fingerprint=""):
148
        if not fingerprint:
149
            return
150
151
        hash_obj = utils.get_hashing_algorithm(fingerprint)
152
        digest = utils.get_file_fingerprint(self.dest_file_name,
153
                                            hash_obj)
154
        if digest != fingerprint:
155
            log.error(
156
                "OSCAP Addon: "
157
                f"File {self.dest_file_name} failed integrity check - assumed "
158
                f"a {hash_obj.name} hash and '{fingerprint}', got '{digest}'"
159
            )
160
            msg = _(
161
                f"OSCAP Addon: Integrity check of the content failed - "
162
                f"{hash_obj.name} hash didn't match")
163
            raise content_handling.ContentCheckError(msg)
164
165
166
class ContentAnalyzer:
167
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
168
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
169
170
    @staticmethod
171
    def __get_content_type(url):
172
        if url.endswith(".rpm"):
173
            return "rpm"
174
        elif any(
175
                url.endswith(arch_type)
176
                for arch_type in common.SUPPORTED_ARCHIVES):
177
            return "archive"
178
        else:
179
            return "file"
180
181
    @staticmethod
182
    def __allow_one_expected_tailoring_or_no_tailoring(
183
            labelled_files, expected_tailoring):
184
        tailoring_label = CONTENT_TYPES["TAILORING"]
185
        if expected_tailoring:
186
            labelled_files = ContentAnalyzer.reduce_files(
187
                labelled_files, expected_tailoring, [tailoring_label])
188
        else:
189
            labelled_files = {
190
                path: label for path, label in labelled_files.items()
191
                if label != tailoring_label
192
            }
193
        return labelled_files
194
195
    @staticmethod
196
    def __filter_discovered_content(
197
            labelled_files, expected_path, expected_tailoring,
198
            expected_cpe_path):
199
        categories = (
200
            CONTENT_TYPES["DATASTREAM"],
201
            CONTENT_TYPES["XCCDF_CHECKLIST"])
202
        if expected_path:
203
            labelled_files = ContentAnalyzer.reduce_files(
204
                labelled_files, expected_path, categories)
205
206
        labelled_files = \
207
            ContentAnalyzer.__allow_one_expected_tailoring_or_no_tailoring(
208
                labelled_files, expected_tailoring)
209
210
        categories = (CONTENT_TYPES["CPE_DICT"], )
211
        if expected_cpe_path:
212
            labelled_files = ContentAnalyzer.reduce_files(
213
                labelled_files, expected_cpe_path, categories)
214
215
        return labelled_files
216
217
    @staticmethod
218
    def reduce_files(labelled_files, expected_path, categories):
219
        reduced_files = dict()
220
        if not path_is_present_among_paths(
221
                expected_path, labelled_files.keys()):
222
            msg = (
223
                f"Expected a file {expected_path} to be part of the supplied "
224
                f"content, but it was not the case, got only "
225
                f"{list(labelled_files.keys())}"
226
            )
227
            raise RuntimeError(msg)
228
        for path, label in labelled_files.items():
229
            if label in categories and not paths_are_equivalent(
230
                    path, expected_path):
231
                continue
232
            reduced_files[path] = label
233
        return reduced_files
234
235
    @staticmethod
236
    def analyze(
237
            fetching_thread_name, fingerprint, dest_filename, what_if_fail,
238
            expected_path, expected_tailoring, expected_cpe_path):
239
        try:
240
            content = ContentAnalyzer.__analyze_fetched_content(
241
                fetching_thread_name, fingerprint, dest_filename,
242
                expected_path, expected_tailoring, expected_cpe_path)
243
        except Exception as exc:
244
            what_if_fail(exc)
245
            content = None
246
        return content
247
248
    @staticmethod
249
    def __analyze_fetched_content(
250
                wait_for, fingerprint, dest_filename, expected_path,
251
                expected_tailoring, expected_cpe_path):
252
        actually_fetched_content = wait_for is not None
253
        fpaths = ContentAnalyzer.__gather_available_files(
254
            actually_fetched_content, dest_filename)
255
256
        structured_content = ObtainedContent(
257
            ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION)
258
        content_type = ContentAnalyzer.__get_content_type(str(dest_filename))
259
        if content_type in ("archive", "rpm"):
260
            structured_content.add_content_archive(dest_filename)
261
262
        labelled_filenames = content_handling.identify_files(fpaths)
263
        labelled_filenames = ContentAnalyzer.__filter_discovered_content(
264
            labelled_filenames, expected_path, expected_tailoring,
265
            expected_cpe_path)
266
267
        for fname, label in labelled_filenames.items():
268
            structured_content.add_file(str(fname), label)
269
270
        if fingerprint and dest_filename:
271
            structured_content.record_verification(dest_filename)
272
273
        return structured_content
274
275
    @staticmethod
276
    def __gather_available_files(actually_fetched_content, dest_filename):
277
        fpaths = []
278
        if not actually_fetched_content:
279
            if not dest_filename:  # using scap-security-guide
280
                fpaths = [ContentAnalyzer.DEFAULT_SSG_DATA_STREAM_PATH]
281
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
282
                fpaths = pathlib.Path(
283
                    ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION).rglob("*")
284
                fpaths = [str(p) for p in fpaths if p.is_file()]
285
        else:
286
            dest_filename = pathlib.Path(dest_filename)
287
            # RPM is an archive at this phase
288
            content_type = ContentAnalyzer.__get_content_type(
289
                str(dest_filename))
290
            if content_type in ("archive", "rpm"):
291
                try:
292
                    fpaths = common.extract_data(
293
                        str(dest_filename),
294
                        str(dest_filename.parent)
295
                    )
296
                except common.ExtractionError as err:
297
                    msg = (
298
                        f"Failed to extract the '{dest_filename}' "
299
                        f"archive: {str(err)}")
300
                    log.error("OSCAP Addon: " + msg)
301
                    raise err
302
303
            elif content_type == "file":
304
                fpaths = [str(dest_filename)]
305
            else:
306
                raise common.OSCAPaddonError("Unsupported content type")
307
        return fpaths
308
309
310
class ObtainedContent:
311
    """
312
    This class aims to assist the gathered files discovery -
313
    the addon can downloaded files directly, or they can be extracted for an archive.
314
    The class enables user to quickly understand what is available,
315
    and whether the current set of contents is usable for further processing.
316
    """
317
    def __init__(self, root):
318
        self.labelled_files = dict()
319
        self.datastream = ""
320
        self.xccdf = ""
321
        self.ovals = []
322
        self.tailoring = ""
323
        self.archive = ""
324
        self.verified = ""
325
        self.root = pathlib.Path(root)
326
327
    def record_verification(self, path):
328
        """
329
        Declare a file as verified (typically by means of a checksum)
330
        """
331
        path = pathlib.Path(path)
332
        assert path in self.labelled_files
333
        self.verified = path
334
335
    def add_content_archive(self, fname):
336
        """
337
        If files come from an archive, record this information using this function.
338
        """
339
        path = pathlib.Path(fname)
340
        self.labelled_files[path] = None
341
        self.archive = path
342
343
    def _assign_content_type(self, attribute_name, new_value):
344
        old_value = getattr(self, attribute_name)
345
        if old_value:
346
            msg = (
347
                f"When dealing with {attribute_name}, "
348
                f"there was already the {old_value.name} when setting the new {new_value.name}")
349
            raise content_handling.ContentHandlingError(msg)
350
        setattr(self, attribute_name, new_value)
351
352
    def add_file(self, fname, label):
353
        path = pathlib.Path(fname)
354
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
355
            self._assign_content_type("tailoring", path)
356
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
357
            self._assign_content_type("datastream", path)
358
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
359
            self.ovals.append(path)
360
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
361
            self._assign_content_type("xccdf", path)
362
        self.labelled_files[path] = label
363
364
    def _datastream_content(self):
365
        if not self.datastream:
366
            return None
367
        if not self.datastream.exists():
368
            return None
369
        return self.datastream
370
371
    def _xccdf_content(self):
372
        if not self.xccdf or not self.ovals:
373
            return None
374
        some_ovals_exist = any([path.exists() for path in self.ovals])
375
        if not (self.xccdf.exists() and some_ovals_exist):
376
            return None
377
        return self.xccdf
378
379
    def find_expected_usable_content(self, relative_expected_content_path):
380
        content_path = self.root / relative_expected_content_path
381
        content_path = content_path.resolve()
382
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
383
384
        if content_path in eligible_main_content:
385
            return content_path
386
        else:
387
            if not content_path.exists():
388
                msg = f"Couldn't find '{content_path}' among the available content"
389
            else:
390
                msg = (
391
                    f"File '{content_path}' is not a valid datastream "
392
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
393
            raise content_handling.ContentHandlingError(msg)
394
395
    def select_main_usable_content(self):
396
        if self._datastream_content():
397
            return self._datastream_content()
398
        elif self._xccdf_content():
399
            return self._xccdf_content()
400
        else:
401
            msg = (
402
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
403
                "among the available content")
404
            raise content_handling.ContentHandlingError(msg)
405
406
    def get_preferred_tailoring(self, tailoring_path):
407
        if tailoring_path:
408
            if tailoring_path != str(self.tailoring.relative_to(self.root)):
409
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
410
                raise content_handling.ContentHandlingError(msg)
411
        return self.tailoring
412
413
    def get_preferred_content(self, content_path):
414
        if content_path:
415
            preferred_content = self.find_expected_usable_content(content_path)
416
        else:
417
            preferred_content = self.select_main_usable_content()
418
        return preferred_content
419