Completed
Push — rhel9-branch ( 055f96...bb3c5f )
by Jan
16s queued 13s
created

ContentBringer.reduce_files()   A

Complexity

Conditions 5

Size

Total Lines 13
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 11
nop 4
dl 0
loc 13
rs 9.3333
c 0
b 0
f 0
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
import os
6
from glob import glob
7
from typing import List
8
9
from pyanaconda.core import constants
10
from pyanaconda.threading import threadMgr
11
from pykickstart.errors import KickstartValueError
12
13
from org_fedora_oscap import data_fetch, utils
14
from org_fedora_oscap import common
15
from org_fedora_oscap import content_handling
16
from org_fedora_oscap.content_handling import CONTENT_TYPES
17
from org_fedora_oscap import rule_handling
18
19
from org_fedora_oscap.common import _
20
21
log = logging.getLogger("anaconda")
22
23
24
def is_network(scheme):
25
    return any(
26
        scheme.startswith(net_prefix)
27
        for net_prefix in data_fetch.NET_URL_PREFIXES)
28
29
30
def clear_all(data):
31
    data.content_type = ""
32
    data.content_url = ""
33
    data.datastream_id = ""
34
    data.xccdf_id = ""
35
    data.profile_id = ""
36
    data.content_path = ""
37
    data.cpe_path = ""
38
    data.tailoring_path = ""
39
40
    data.fingerprint = ""
41
42
    data.certificates = ""
43
44
    # internal values
45
    data.rule_data = rule_handling.RuleData()
46
    data.dry_run = False
47
48
49
class ContentBringer:
50
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
51
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
52
53
    def __init__(self, addon_data):
54
        self.content_uri_scheme = ""
55
        self.content_uri_path = ""
56
        self.fetched_content = ""
57
58
        self.activity_lock = threading.Lock()
59
        self.now_fetching_or_processing = False
60
61
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
62
63
        self._addon_data = addon_data
64
65
    def get_content_type(self, url):
66
        if url.endswith(".rpm"):
67
            return "rpm"
68
        elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES):
69
            return "archive"
70
        else:
71
            return "file"
72
73
    @property
74
    def content_uri(self):
75
        return self.content_uri_scheme + "://" + self.content_uri_path
76
77
    @content_uri.setter
78
    def content_uri(self, uri):
79
        scheme, path = uri.split("://", 1)
80
        self.content_uri_path = path
81
        self.content_uri_scheme = scheme
82
83
    def fetch_content(self, what_if_fail, ca_certs_path=""):
84
        """
85
        Initiate fetch of the content into an appropriate directory
86
87
        Args:
88
            what_if_fail: Callback accepting exception as an argument that
89
                should handle them in the calling layer.
90
            ca_certs_path: Path to the HTTPS certificate file
91
        """
92
        self.content_uri = self._addon_data.content_url
93
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
94
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
95
        fetching_thread_name = self._fetch_files(
96
            self.content_uri_scheme, self.content_uri_path,
97
            self.CONTENT_DOWNLOAD_LOCATION, ca_certs_path, what_if_fail)
98
        return fetching_thread_name
99
100
    def _fetch_files(self, scheme, path, destdir, ca_certs_path, what_if_fail):
101
        with self.activity_lock:
102
            if self.now_fetching_or_processing:
103
                msg = "OSCAP Addon: Strange, it seems that we are already fetching something."
104
                log.warn(msg)
105
                return
106
            self.now_fetching_or_processing = True
107
108
        fetching_thread_name = None
109
        try:
110
            fetching_thread_name = self._start_actual_fetch(scheme, path, destdir, ca_certs_path)
111
        except Exception as exc:
112
            with self.activity_lock:
113
                self.now_fetching_or_processing = False
114
            what_if_fail(exc)
115
116
        # We are not finished yet with the fetch
117
        return fetching_thread_name
118
119
    def _start_actual_fetch(self, scheme, path, destdir, ca_certs_path):
120
        fetching_thread_name = None
121
        url = scheme + "://" + path
122
123
        if "/" not in path:
124
            msg = f"Missing the path component of the '{url}' URL"
125
            raise KickstartValueError(msg)
126
        basename = path.rsplit("/", 1)[1]
127
        if not basename:
128
            msg = f"Unable to deduce basename from the '{url}' URL"
129
            raise KickstartValueError(msg)
130
131
        dest = destdir / basename
132
133
        if is_network(scheme):
134
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
135
                url,
136
                dest,
137
                ca_certs_path
138
            )
139
        else:  # invalid schemes are handled down the road
140
            fetching_thread_name = data_fetch.fetch_local_data(
141
                url,
142
                dest,
143
            )
144
        return fetching_thread_name
145
146
    def finish_content_fetch(self, fetching_thread_name, fingerprint, report_callback, dest_filename,
147
                             what_if_fail):
148
        """
149
        Finish any ongoing fetch and analyze what has been fetched.
150
151
        After the fetch is completed, it analyzes verifies fetched content if applicable,
152
        analyzes it and compiles into an instance of ObtainedContent.
153
154
        Args:
155
            fetching_thread_name: Name of the fetching thread
156
                or None if we are only after the analysis
157
            fingerprint: A checksum for downloaded file verification
158
            report_callback: Means for the method to send user-relevant messages outside
159
            dest_filename: The target of the fetch operation. Can be falsy -
160
                in this case there is no content filename defined
161
            what_if_fail: Callback accepting exception as an argument
162
                that should handle them in the calling layer.
163
164
        Returns:
165
            Instance of ObtainedContent if everything went well, or None.
166
        """
167
        try:
168
            content = self._finish_actual_fetch(fetching_thread_name, fingerprint, report_callback, dest_filename)
169
        except Exception as exc:
170
            what_if_fail(exc)
171
            content = None
172
        finally:
173
            with self.activity_lock:
174
                self.now_fetching_or_processing = False
175
176
        return content
177
178
    def _verify_fingerprint(self, dest_filename, fingerprint=""):
179
        if not fingerprint:
180
            log.info("OSCAP Addon: No fingerprint provided, skipping integrity check")
181
            return
182
183
        hash_obj = utils.get_hashing_algorithm(fingerprint)
184
        digest = utils.get_file_fingerprint(dest_filename,
185
                                            hash_obj)
186
        if digest != fingerprint:
187
            log.error(
188
                "OSCAP Addon: "
189
                f"File {dest_filename} failed integrity check - assumed a "
190
                f"{hash_obj.name} hash and '{fingerprint}', got '{digest}'"
191
            )
192
            msg = _(f"OSCAP Addon: Integrity check of the content failed - {hash_obj.name} hash didn't match")
193
            raise content_handling.ContentCheckError(msg)
194
        log.info(f"Integrity check passed using {hash_obj.name} hash")
195
196
    def allow_one_expected_tailoring_or_no_tailoring(self, labelled_files):
197
        expected_tailoring = self._addon_data.tailoring_path
198
        tailoring_label = CONTENT_TYPES["TAILORING"]
199
        if expected_tailoring:
200
            labelled_files = self.reduce_files(labelled_files, expected_tailoring, [tailoring_label])
201
        else:
202
            labelled_files = {
203
                path: label for path, label in labelled_files.items()
204
                if label != tailoring_label
205
            }
206
        return labelled_files
207
208
    def filter_discovered_content(self, labelled_files):
209
        expected_path = self._addon_data.content_path
210
        categories = (CONTENT_TYPES["DATASTREAM"], CONTENT_TYPES["XCCDF_CHECKLIST"])
211
        if expected_path:
212
            labelled_files = self.reduce_files(labelled_files, expected_path, categories)
213
214
        labelled_files = self.allow_one_expected_tailoring_or_no_tailoring(labelled_files)
215
216
        expected_path = self._addon_data.cpe_path
217
        categories = (CONTENT_TYPES["CPE_DICT"], )
218
        if expected_path:
219
            labelled_files = self.reduce_files(labelled_files, expected_path, categories)
220
221
        return labelled_files
222
223
    def reduce_files(self, labelled_files, expected_path, categories):
224
        reduced_files = dict()
225
        if expected_path not in labelled_files:
226
            msg = (
227
                f"Expected a file {expected_path} to be part of the supplied content, "
228
                f"but it was not the case, got only {list(labelled_files.keys())}"
229
            )
230
            raise RuntimeError(msg)
231
        for path, label in labelled_files.items():
232
            if label in categories and path != expected_path:
233
                continue
234
            reduced_files[path] = label
235
        return reduced_files
236
237
    def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
238
        if wait_for:
239
            log.info(f"OSCAP Addon: Waiting for thread {wait_for}")
240
            threadMgr.wait(wait_for)
241
            log.info(f"OSCAP Addon: Finished waiting for thread {wait_for}")
242
        actually_fetched_content = wait_for is not None
243
244
        if fingerprint and dest_filename:
245
            self._verify_fingerprint(dest_filename, fingerprint)
246
247
        fpaths = self._gather_available_files(actually_fetched_content, dest_filename)
248
249
        structured_content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION)
250
        content_type = self.get_content_type(str(dest_filename))
251
        log.info(f"OSCAP Addon: started to look at the content")
252
        if content_type in ("archive", "rpm"):
253
            structured_content.add_content_archive(dest_filename)
254
255
        labelled_filenames = content_handling.identify_files(fpaths)
256
        labelled_relative_filenames = {
257
            os.path.relpath(path, self.CONTENT_DOWNLOAD_LOCATION): label
258
            for path, label in labelled_filenames.items()}
259
        labelled_relative_filenames = self.filter_discovered_content(labelled_relative_filenames)
260
261
        for rel_fname, label in labelled_relative_filenames.items():
262
            fname = self.CONTENT_DOWNLOAD_LOCATION / rel_fname
263
            structured_content.add_file(str(fname), label)
264
265
        if fingerprint and dest_filename:
266
            structured_content.record_verification(dest_filename)
267
268
        log.info(f"OSCAP Addon: finished looking at the content")
269
        return structured_content
270
271
    def _gather_available_files(self, actually_fetched_content, dest_filename):
272
        fpaths = []
273
        if not actually_fetched_content:
274
            if not dest_filename:  # using scap-security-guide
275
                fpaths = [self.DEFAULT_SSG_DATA_STREAM_PATH]
276
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
277
                fpaths = pathlib.Path(self.CONTENT_DOWNLOAD_LOCATION).rglob("*")
278
                fpaths = [str(p) for p in fpaths if p.is_file()]
279
        else:
280
            dest_filename = pathlib.Path(dest_filename)
281
            # RPM is an archive at this phase
282
            content_type = self.get_content_type(str(dest_filename))
283
            if content_type in ("archive", "rpm"):
284
                try:
285
                    fpaths = common.extract_data(
286
                        str(dest_filename),
287
                        str(dest_filename.parent)
288
                    )
289
                except common.ExtractionError as err:
290
                    msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}"
291
                    log.error("OSCAP Addon: " + msg)
292
                    raise err
293
294
            elif content_type == "file":
295
                fpaths = [str(dest_filename)]
296
            else:
297
                raise common.OSCAPaddonError("Unsupported content type")
298
        return fpaths
299
300
    def use_downloaded_content(self, content):
301
        preferred_content = self.get_preferred_content(content)
302
303
        # We know that we have ended up with a datastream-like content,
304
        # but if we can't convert an archive to a datastream.
305
        # self._addon_data.content_type = "datastream"
306
        self._addon_data.content_path = str(preferred_content.relative_to(content.root))
307
308
        preferred_tailoring = self.get_preferred_tailoring(content)
309
        if content.tailoring:
310
            self._addon_data.tailoring_path = str(preferred_tailoring.relative_to(content.root))
311
312
    def use_system_content(self, content=None):
313
        clear_all(self._addon_data)
314
        self._addon_data.content_type = "scap-security-guide"
315
        self._addon_data.content_path = common.get_ssg_path()
316
317
    def get_preferred_content(self, content):
318
        if self._addon_data.content_path:
319
            preferred_content = content.find_expected_usable_content(self._addon_data.content_path)
320
        else:
321
            preferred_content = content.select_main_usable_content()
322
        return preferred_content
323
324
    def get_preferred_tailoring(self, content):
325
        tailoring_path = self._addon_data.tailoring_path
326
        if tailoring_path:
327
            if tailoring_path != str(content.tailoring.relative_to(content.root)):
328
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
329
                raise content_handling.ContentHandlingError(msg)
330
        return content.tailoring
331
332
333
class ObtainedContent:
334
    """
335
    This class aims to assist the gathered files discovery -
336
    the addon can downloaded files directly, or they can be extracted for an archive.
337
    The class enables user to quickly understand what is available,
338
    and whether the current set of contents is usable for further processing.
339
    """
340
    def __init__(self, root):
341
        self.labelled_files = dict()
342
        self.datastream = None  # type: Pathlib.Path
343
        self.xccdf = None  # type: Pathlib.Path
344
        self.ovals = []  # type: List[Pathlib.Path]
345
        self.tailoring = None  # type: Pathlib.Path
346
        self.archive = None  # type: Pathlib.Path
347
        self.verified = None  # type: Pathlib.Path
348
        self.root = pathlib.Path(root)
349
350
    def record_verification(self, path):
351
        """
352
        Declare a file as verified (typically by means of a checksum)
353
        """
354
        path = pathlib.Path(path)
355
        assert path in self.labelled_files
356
        self.verified = path
357
358
    def add_content_archive(self, fname):
359
        """
360
        If files come from an archive, record this information using this function.
361
        """
362
        path = pathlib.Path(fname)
363
        self.labelled_files[path] = None
364
        self.archive = path
365
366
    def _assign_content_type(self, attribute_name, new_value):
367
        old_value = getattr(self, attribute_name)
368
        if old_value and old_value != new_value:
369
            msg = (
370
                f"When dealing with {attribute_name}, "
371
                f"there was already the {old_value.name} when setting the new {new_value.name}")
372
            raise content_handling.ContentHandlingError(msg)
373
        setattr(self, attribute_name, new_value)
374
375
    def add_file(self, fname, label=None):
376
        if not label:
377
            label = content_handling.identify_files([fname])[fname]
378
        path = pathlib.Path(fname)
379
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
380
            self._assign_content_type("tailoring", path)
381
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
382
            self._assign_content_type("datastream", path)
383
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
384
            self.ovals.append(path)
385
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
386
            self._assign_content_type("xccdf", path)
387
        self.labelled_files[path] = label
388
389
    def _datastream_content(self):
390
        if not self.datastream:
391
            return None
392
        if not self.datastream.exists():
393
            return None
394
        return self.datastream
395
396
    def _xccdf_content(self):
397
        if not self.xccdf or not self.ovals:
398
            return None
399
        some_ovals_exist = any([path.exists() for path in self.ovals])
400
        if not (self.xccdf.exists() and some_ovals_exist):
401
            return None
402
        return self.xccdf
403
404
    def find_expected_usable_content(self, relative_expected_content_path):
405
        content_path = self.root / relative_expected_content_path
406
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
407
408
        if content_path in eligible_main_content:
409
            return content_path
410
        else:
411
            if not content_path.exists():
412
                msg = f"Couldn't find '{content_path}' among the available content"
413
            else:
414
                msg = (
415
                    f"File '{content_path}' is not a valid datastream "
416
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
417
            raise content_handling.ContentHandlingError(msg)
418
419
    def select_main_usable_content(self):
420
        if self._datastream_content():
421
            return self._datastream_content()
422
        elif self._xccdf_content():
423
            return self._xccdf_content()
424
        else:
425
            msg = (
426
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
427
                "among the available content")
428
            raise content_handling.ContentHandlingError(msg)
429