Passed
Pull Request — rawhide (#242)
by Jan
01:59
created

ObtainedContent.add_file()   B

Complexity

Conditions 6

Size

Total Lines 13
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 13
nop 3
dl 0
loc 13
rs 8.6666
c 0
b 0
f 0
1
import threading
2
import logging
3
import os
4
import pathlib
5
import shutil
6
from glob import glob
7
from typing import List
8
9
from pyanaconda.core import constants
10
from pyanaconda.core.threads import thread_manager
11
from pykickstart.errors import KickstartValueError
12
13
from org_fedora_oscap import data_fetch, utils
14
from org_fedora_oscap import common
15
from org_fedora_oscap import content_handling
16
from org_fedora_oscap.content_handling import CONTENT_TYPES
17
18
from org_fedora_oscap.common import _
19
20
log = logging.getLogger("anaconda")
21
22
23
def is_network(scheme):
24
    return any(
25
        scheme.startswith(net_prefix)
26
        for net_prefix in data_fetch.NET_URL_PREFIXES)
27
28
29
def paths_are_equivalent(p1, p2):
30
    return os.path.abspath(p1) == os.path.abspath(p2)
31
32
33
def path_is_present_among_paths(path, paths):
34
    absolute_path = os.path.abspath(path)
35
    for second_path in paths:
36
        if paths_are_equivalent(path, second_path):
37
            return True
38
    return False
39
40
41
class ContentBringer:
42
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
43
44
    def __init__(self, what_if_fail):
45
        self._valid_content_uri = ""
46
        self.dest_file_name = ""
47
48
        self.activity_lock = threading.Lock()
49
        self.now_fetching_or_processing = False
50
        self.what_if_fail = what_if_fail
51
52
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
53
54
    @property
55
    def content_uri(self):
56
        return self._valid_content_uri
57
58
    @content_uri.setter
59
    def content_uri(self, uri):
60
        scheme_and_maybe_path = uri.split("://")
61
        if len(scheme_and_maybe_path) == 1:
62
            msg = (
63
                f"Invalid supplied content URL '{uri}', "
64
                "use the 'scheme://path' form.")
65
            raise KickstartValueError(msg)
66
        path = scheme_and_maybe_path[1]
67
        if "/" not in path:
68
            msg = f"Missing the path component of the '{uri}' URL"
69
            raise KickstartValueError(msg)
70
        basename = path.rsplit("/", 1)[1]
71
        if not basename:
72
            msg = f"Unable to deduce basename from the '{uri}' URL"
73
            raise KickstartValueError(msg)
74
        self._valid_content_uri = uri
75
        self.dest_file_name = self.CONTENT_DOWNLOAD_LOCATION / basename
76
77
    def fetch_content(self, content_uri, ca_certs_path=""):
78
        """
79
        Initiate fetch of the content into an appropriate directory
80
81
        Args:
82
            content_uri: URI location of the content to be fetched
83
            ca_certs_path: Path to the HTTPS certificate file
84
        """
85
        try:
86
            self.content_uri = content_uri
87
        except Exception as exc:
88
            self.what_if_fail(exc)
89
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
90
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
91
        fetching_thread_name = self._fetch_files(ca_certs_path)
92
        return fetching_thread_name
93
94
    def _fetch_files(self, ca_certs_path):
95
        with self.activity_lock:
96
            if self.now_fetching_or_processing:
97
                msg = "OSCAP Addon: Strange, it seems that we are already " \
98
                    "fetching something."
99
                log.warn(msg)
100
                return
101
            self.now_fetching_or_processing = True
102
103
        fetching_thread_name = None
104
        try:
105
            fetching_thread_name = self._start_actual_fetch(ca_certs_path)
106
        except Exception as exc:
107
            with self.activity_lock:
108
                self.now_fetching_or_processing = False
109
            self.what_if_fail(exc)
110
111
        # We are not finished yet with the fetch
112
        return fetching_thread_name
113
114
    def _start_actual_fetch(self, ca_certs_path):
115
        fetching_thread_name = None
116
117
        scheme = self.content_uri.split("://")[0]
118
        if is_network(scheme):
119
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
120
                self.content_uri,
121
                self.dest_file_name,
122
                ca_certs_path
123
            )
124
        else:  # invalid schemes are handled down the road
125
            fetching_thread_name = data_fetch.fetch_local_data(
126
                self.content_uri,
127
                self.dest_file_name,
128
            )
129
        return fetching_thread_name
130
131
    def finish_content_fetch(self, fetching_thread_name, fingerprint):
132
        try:
133
            self._finish_actual_fetch(fetching_thread_name)
134
            if fingerprint:
135
                self._verify_fingerprint(fingerprint)
136
        except Exception as exc:
137
            self.what_if_fail(exc)
138
        finally:
139
            with self.activity_lock:
140
                self.now_fetching_or_processing = False
141
142
    def _finish_actual_fetch(self, wait_for):
143
        if wait_for:
144
            log.info(f"OSCAP Addon: Waiting for thread {wait_for}")
145
            thread_manager.wait(wait_for)
146
            log.info(f"OSCAP Addon: Finished waiting for thread {wait_for}")
147
148
    def _verify_fingerprint(self, fingerprint=""):
149
        if not fingerprint:
150
            log.info(
151
                "OSCAP Addon: No fingerprint provided, skipping integrity "
152
                "check")
153
            return
154
155
        hash_obj = utils.get_hashing_algorithm(fingerprint)
156
        digest = utils.get_file_fingerprint(self.dest_file_name,
157
                                            hash_obj)
158
        if digest != fingerprint:
159
            log.error(
160
                "OSCAP Addon: "
161
                f"File {self.dest_file_name} failed integrity check - assumed "
162
                f"a {hash_obj.name} hash and '{fingerprint}', got '{digest}'"
163
            )
164
            msg = _(
165
                f"OSCAP Addon: Integrity check of the content failed - "
166
                f"{hash_obj.name} hash didn't match")
167
            raise content_handling.ContentCheckError(msg)
168
        log.info(f"Integrity check passed using {hash_obj.name} hash")
169
170
171
class ContentAnalyzer:
172
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
173
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
174
175
    @staticmethod
176
    def __get_content_type(url):
177
        if url.endswith(".rpm"):
178
            return "rpm"
179
        elif any(
180
                url.endswith(arch_type)
181
                for arch_type in common.SUPPORTED_ARCHIVES):
182
            return "archive"
183
        else:
184
            return "file"
185
186
    @staticmethod
187
    def analyze(
188
            fetching_thread_name, fingerprint, dest_filename, what_if_fail,
189
            expected_path, expected_tailoring, expected_cpe_path):
190
        try:
191
            content = ContentAnalyzer.__analyze_fetched_content(
192
                fetching_thread_name, fingerprint, dest_filename,
193
                expected_path, expected_tailoring, expected_cpe_path)
194
        except Exception as exc:
195
            what_if_fail(exc)
196
            content = None
197
        return content
198
199
    @staticmethod
200
    def __analyze_fetched_content(
201
                wait_for, fingerprint, dest_filename, expected_path,
202
                expected_tailoring, expected_cpe_path):
203
        actually_fetched_content = wait_for is not None
204
        fpaths = ContentAnalyzer.__gather_available_files(
205
            actually_fetched_content, dest_filename)
206
207
        structured_content = ObtainedContent(
208
            ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION)
209
        content_type = ContentAnalyzer.__get_content_type(str(dest_filename))
210
        log.info(f"OSCAP Addon: started to look at the content")
211
        if content_type in ("archive", "rpm"):
212
            structured_content.add_content_archive(dest_filename)
213
214
        labelled_filenames = content_handling.identify_files(fpaths)
215
216
        for fname, label in labelled_filenames.items():
217
            structured_content.add_file(str(fname), label)
218
219
        if fingerprint and dest_filename:
220
            structured_content.record_verification(dest_filename)
221
222
        log.info(f"OSCAP Addon: finished looking at the content")
223
        return structured_content
224
225
    @staticmethod
226
    def __gather_available_files(actually_fetched_content, dest_filename):
227
        fpaths = []
228
        if not actually_fetched_content:
229
            if not dest_filename:  # using scap-security-guide
230
                fpaths = [ContentAnalyzer.DEFAULT_SSG_DATA_STREAM_PATH]
231
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
232
                fpaths = pathlib.Path(
233
                    ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION).rglob("*")
234
                fpaths = [str(p) for p in fpaths if p.is_file()]
235
        else:
236
            dest_filename = pathlib.Path(dest_filename)
237
            # RPM is an archive at this phase
238
            content_type = ContentAnalyzer.__get_content_type(
239
                str(dest_filename))
240
            if content_type in ("archive", "rpm"):
241
                try:
242
                    fpaths = common.extract_data(
243
                        str(dest_filename),
244
                        str(dest_filename.parent)
245
                    )
246
                except common.ExtractionError as err:
247
                    msg = (
248
                        f"Failed to extract the '{dest_filename}' "
249
                        f"archive: {str(err)}")
250
                    log.error("OSCAP Addon: " + msg)
251
                    raise err
252
253
            elif content_type == "file":
254
                fpaths = [str(dest_filename)]
255
            else:
256
                raise common.OSCAPaddonError("Unsupported content type")
257
        return fpaths
258
259
260
class ObtainedContent:
261
    """
262
    This class aims to assist the gathered files discovery -
263
    the addon can downloaded files directly, or they can be extracted for an archive.
264
    The class enables user to quickly understand what is available,
265
    and whether the current set of contents is usable for further processing.
266
    """
267
    def __init__(self, root):
268
        self.labelled_files = dict()
269
        self.datastream = None  # type: Pathlib.Path
270
        self.xccdf = None  # type: Pathlib.Path
271
        self.ovals = []  # type: List[Pathlib.Path]
272
        self.tailoring = None  # type: Pathlib.Path
273
        self.archive = None  # type: Pathlib.Path
274
        self.verified = None  # type: Pathlib.Path
275
        self.root = pathlib.Path(root)
276
277
    def record_verification(self, path):
278
        """
279
        Declare a file as verified (typically by means of a checksum)
280
        """
281
        path = pathlib.Path(path)
282
        assert path in self.labelled_files
283
        self.verified = path
284
285
    def add_content_archive(self, fname):
286
        """
287
        If files come from an archive, record this information using this function.
288
        """
289
        path = pathlib.Path(fname)
290
        self.labelled_files[path] = None
291
        self.archive = path
292
293
    def _assign_content_type(self, attribute_name, new_value):
294
        old_value = getattr(self, attribute_name)
295
        if old_value and old_value != new_value:
296
            msg = (
297
                f"When dealing with {attribute_name}, "
298
                f"there was already the {old_value.name} when setting the new {new_value.name}")
299
            raise content_handling.ContentHandlingError(msg)
300
        setattr(self, attribute_name, new_value)
301
302
    def add_file(self, fname, label=None):
303
        if not label:
304
            label = content_handling.identify_files([fname])[fname]
305
        path = pathlib.Path(fname)
306
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
307
            self._assign_content_type("tailoring", path)
308
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
309
            self._assign_content_type("datastream", path)
310
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
311
            self.ovals.append(path)
312
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
313
            self._assign_content_type("xccdf", path)
314
        self.labelled_files[path] = label
315
316
    def _datastream_content(self):
317
        if not self.datastream:
318
            return None
319
        if not self.datastream.exists():
320
            return None
321
        return self.datastream
322
323
    def _xccdf_content(self):
324
        if not self.xccdf or not self.ovals:
325
            return None
326
        some_ovals_exist = any([path.exists() for path in self.ovals])
327
        if not (self.xccdf.exists() and some_ovals_exist):
328
            return None
329
        return self.xccdf
330
331
    def find_expected_usable_content(self, relative_expected_content_path):
332
        content_path = self.root / relative_expected_content_path
333
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
334
335
        if content_path in eligible_main_content:
336
            return content_path
337
        else:
338
            if not content_path.exists():
339
                msg = f"Couldn't find '{content_path}' among the available content"
340
            else:
341
                msg = (
342
                    f"File '{content_path}' is not a valid datastream "
343
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
344
            raise content_handling.ContentHandlingError(msg)
345
346
    def select_main_usable_content(self):
347
        if self._datastream_content():
348
            return self._datastream_content()
349
        elif self._xccdf_content():
350
            return self._xccdf_content()
351
        else:
352
            msg = (
353
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
354
                "among the available content")
355
            raise content_handling.ContentHandlingError(msg)
356
357
    def get_preferred_tailoring(self, tailoring_path):
358
        if tailoring_path:
359
            if tailoring_path != str(self.tailoring.relative_to(self.root)):
360
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
361
                raise content_handling.ContentHandlingError(msg)
362
        return self.tailoring
363
364
    def get_preferred_content(self, content_path):
365
        if content_path:
366
            preferred_content = self.find_expected_usable_content(content_path)
367
        else:
368
            preferred_content = self.select_main_usable_content()
369
        return preferred_content
370