Passed
Pull Request — rawhide (#242)
by Jan
02:08
created

ContentAnalyzer.analyze()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 7
dl 0
loc 12
rs 9.8
c 0
b 0
f 0
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
from glob import glob
6
from typing import List
7
8
from pyanaconda.core import constants
9
from pyanaconda.threading import threadMgr
10
from pykickstart.errors import KickstartValueError
11
12
from org_fedora_oscap import data_fetch, utils
13
from org_fedora_oscap import common
14
from org_fedora_oscap import content_handling
15
from org_fedora_oscap.content_handling import CONTENT_TYPES
16
17
from org_fedora_oscap.common import _
18
19
log = logging.getLogger("anaconda")
20
21
22
def is_network(scheme):
23
    return any(
24
        scheme.startswith(net_prefix)
25
        for net_prefix in data_fetch.NET_URL_PREFIXES)
26
27
28
def paths_are_equivalent(p1, p2):
29
    return os.path.abspath(p1) == os.path.abspath(p2)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable os does not seem to be defined.
Loading history...
30
31
32
def path_is_present_among_paths(path, paths):
33
    absolute_path = os.path.abspath(path)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable os does not seem to be defined.
Loading history...
34
    for second_path in paths:
35
        if paths_are_equivalent(path, second_path):
36
            return True
37
    return False
38
39
40
class ContentBringer:
41
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
42
43
    def __init__(self, what_if_fail):
44
        self._valid_content_uri = ""
45
        self.dest_file_name = ""
46
47
        self.activity_lock = threading.Lock()
48
        self.now_fetching_or_processing = False
49
        self.what_if_fail = what_if_fail
50
51
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
52
53
    @property
54
    def content_uri(self):
55
        return self._valid_content_uri
56
57
    @content_uri.setter
58
    def content_uri(self, uri):
59
        scheme_and_maybe_path = uri.split("://")
60
        if len(scheme_and_maybe_path) == 1:
61
            msg = (
62
                f"Invalid supplied content URL '{uri}', "
63
                "use the 'scheme://path' form.")
64
            raise KickstartValueError(msg)
65
        path = scheme_and_maybe_path[1]
66
        if "/" not in path:
67
            msg = f"Missing the path component of the '{uri}' URL"
68
            raise KickstartValueError(msg)
69
        basename = path.rsplit("/", 1)[1]
70
        if not basename:
71
            msg = f"Unable to deduce basename from the '{uri}' URL"
72
            raise KickstartValueError(msg)
73
        self._valid_content_uri = uri
74
        self.dest_file_name = self.CONTENT_DOWNLOAD_LOCATION / basename
75
76
    def fetch_content(self, content_uri, ca_certs_path=""):
77
        """
78
        Initiate fetch of the content into an appropriate directory
79
80
        Args:
81
            content_uri: URI location of the content to be fetched
82
            ca_certs_path: Path to the HTTPS certificate file
83
        """
84
        try:
85
            self.content_uri = content_uri
86
        except Exception as exc:
87
            self.what_if_fail(exc)
88
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
89
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
90
        fetching_thread_name = self._fetch_files(ca_certs_path)
91
        return fetching_thread_name
92
93
    def _fetch_files(self, ca_certs_path):
94
        with self.activity_lock:
95
            if self.now_fetching_or_processing:
96
                msg = "OSCAP Addon: Strange, it seems that we are already " \
97
                    "fetching something."
98
                log.warn(msg)
99
                return
100
            self.now_fetching_or_processing = True
101
102
        fetching_thread_name = None
103
        try:
104
            fetching_thread_name = self._start_actual_fetch(ca_certs_path)
105
        except Exception as exc:
106
            with self.activity_lock:
107
                self.now_fetching_or_processing = False
108
            self.what_if_fail(exc)
109
110
        # We are not finished yet with the fetch
111
        return fetching_thread_name
112
113
    def _start_actual_fetch(self, ca_certs_path):
114
        fetching_thread_name = None
115
116
        scheme = self.content_uri.split("://")[0]
117
        if is_network(scheme):
118
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
119
                self.content_uri,
120
                self.dest_file_name,
121
                ca_certs_path
122
            )
123
        else:  # invalid schemes are handled down the road
124
            fetching_thread_name = data_fetch.fetch_local_data(
125
                self.content_uri,
126
                self.dest_file_name,
127
            )
128
        return fetching_thread_name
129
130
    def finish_content_fetch(self, fetching_thread_name, fingerprint):
131
        try:
132
            self._finish_actual_fetch(fetching_thread_name)
133
            if fingerprint:
134
                self._verify_fingerprint(fingerprint)
135
        except Exception as exc:
136
            self.what_if_fail(exc)
137
        finally:
138
            with self.activity_lock:
139
                self.now_fetching_or_processing = False
140
141
    def _finish_actual_fetch(self, wait_for):
142
        if wait_for:
143
            log.info(f"OSCAP Addon: Waiting for thread {wait_for}")
144
            threadMgr.wait(wait_for)
145
            log.info(f"OSCAP Addon: Finished waiting for thread {wait_for}")
146
147
    def _verify_fingerprint(self, fingerprint=""):
148
        if not fingerprint:
149
            log.info(
150
                "OSCAP Addon: No fingerprint provided, skipping integrity "
151
                "check")
152
            return
153
154
        hash_obj = utils.get_hashing_algorithm(fingerprint)
155
        digest = utils.get_file_fingerprint(self.dest_file_name,
156
                                            hash_obj)
157
        if digest != fingerprint:
158
            log.error(
159
                "OSCAP Addon: "
160
                f"File {self.dest_file_name} failed integrity check - assumed "
161
                f"a {hash_obj.name} hash and '{fingerprint}', got '{digest}'"
162
            )
163
            msg = _(
164
                f"OSCAP Addon: Integrity check of the content failed - "
165
                f"{hash_obj.name} hash didn't match")
166
            raise content_handling.ContentCheckError(msg)
167
        log.info(f"Integrity check passed using {hash_obj.name} hash")
168
169
170
class ContentAnalyzer:
171
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
172
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
173
174
    @staticmethod
175
    def __get_content_type(url):
176
        if url.endswith(".rpm"):
177
            return "rpm"
178
        elif any(
179
                url.endswith(arch_type)
180
                for arch_type in common.SUPPORTED_ARCHIVES):
181
            return "archive"
182
        else:
183
            return "file"
184
185
    @staticmethod
186
    def __allow_one_expected_tailoring_or_no_tailoring(
187
            labelled_files, expected_tailoring):
188
        tailoring_label = CONTENT_TYPES["TAILORING"]
189
        if expected_tailoring:
190
            labelled_files = ContentAnalyzer.reduce_files(
191
                labelled_files, expected_tailoring, [tailoring_label])
192
        else:
193
            labelled_files = {
194
                path: label for path, label in labelled_files.items()
195
                if label != tailoring_label
196
            }
197
        return labelled_files
198
199
    @staticmethod
200
    def __filter_discovered_content(
201
            labelled_files, expected_path, expected_tailoring,
202
            expected_cpe_path):
203
        categories = (
204
            CONTENT_TYPES["DATASTREAM"],
205
            CONTENT_TYPES["XCCDF_CHECKLIST"])
206
        if expected_path:
207
            labelled_files = ContentAnalyzer.reduce_files(
208
                labelled_files, expected_path, categories)
209
210
        labelled_files = \
211
            ContentAnalyzer.__allow_one_expected_tailoring_or_no_tailoring(
212
                labelled_files, expected_tailoring)
213
214
        categories = (CONTENT_TYPES["CPE_DICT"], )
215
        if expected_cpe_path:
216
            labelled_files = ContentAnalyzer.reduce_files(
217
                labelled_files, expected_cpe_path, categories)
218
219
        return labelled_files
220
221
    @staticmethod
222
    def reduce_files(labelled_files, expected_path, categories):
223
        reduced_files = dict()
224
        if not path_is_present_among_paths(
225
                expected_path, labelled_files.keys()):
226
            msg = (
227
                f"Expected a file {expected_path} to be part of the supplied "
228
                f"content, but it was not the case, got only "
229
                f"{list(labelled_files.keys())}"
230
            )
231
            raise content_handling.ContentHandlingError(msg)
232
        for path, label in labelled_files.items():
233
            if label in categories and not paths_are_equivalent(
234
                    path, expected_path):
235
                continue
236
            reduced_files[path] = label
237
        return reduced_files
238
239
    @staticmethod
240
    def analyze(
241
            fetching_thread_name, fingerprint, dest_filename, what_if_fail,
242
            expected_path, expected_tailoring, expected_cpe_path):
243
        try:
244
            content = ContentAnalyzer.__analyze_fetched_content(
245
                fetching_thread_name, fingerprint, dest_filename,
246
                expected_path, expected_tailoring, expected_cpe_path)
247
        except Exception as exc:
248
            what_if_fail(exc)
249
            content = None
250
        return content
251
252
    @staticmethod
253
    def __analyze_fetched_content(
254
                wait_for, fingerprint, dest_filename, expected_path,
255
                expected_tailoring, expected_cpe_path):
256
        actually_fetched_content = wait_for is not None
257
        fpaths = ContentAnalyzer.__gather_available_files(
258
            actually_fetched_content, dest_filename)
259
260
        structured_content = ObtainedContent(
261
            ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION)
262
        content_type = ContentAnalyzer.__get_content_type(str(dest_filename))
263
        log.info(f"OSCAP Addon: started to look at the content")
264
        if content_type in ("archive", "rpm"):
265
            structured_content.add_content_archive(dest_filename)
266
267
        labelled_filenames = content_handling.identify_files(fpaths)
268
        labelled_filenames = ContentAnalyzer.__filter_discovered_content(
269
            labelled_filenames, expected_path, expected_tailoring,
270
            expected_cpe_path)
271
272
        for fname, label in labelled_filenames.items():
273
            structured_content.add_file(str(fname), label)
274
275
        if fingerprint and dest_filename:
276
            structured_content.record_verification(dest_filename)
277
278
        log.info(f"OSCAP Addon: finished looking at the content")
279
        return structured_content
280
281
    @staticmethod
282
    def __gather_available_files(actually_fetched_content, dest_filename):
283
        fpaths = []
284
        if not actually_fetched_content:
285
            if not dest_filename:  # using scap-security-guide
286
                fpaths = [ContentAnalyzer.DEFAULT_SSG_DATA_STREAM_PATH]
287
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
288
                fpaths = pathlib.Path(
289
                    ContentAnalyzer.CONTENT_DOWNLOAD_LOCATION).rglob("*")
290
                fpaths = [str(p) for p in fpaths if p.is_file()]
291
        else:
292
            dest_filename = pathlib.Path(dest_filename)
293
            # RPM is an archive at this phase
294
            content_type = ContentAnalyzer.__get_content_type(
295
                str(dest_filename))
296
            if content_type in ("archive", "rpm"):
297
                try:
298
                    fpaths = common.extract_data(
299
                        str(dest_filename),
300
                        str(dest_filename.parent)
301
                    )
302
                except common.ExtractionError as err:
303
                    msg = (
304
                        f"Failed to extract the '{dest_filename}' "
305
                        f"archive: {str(err)}")
306
                    log.error("OSCAP Addon: " + msg)
307
                    raise err
308
309
            elif content_type == "file":
310
                fpaths = [str(dest_filename)]
311
            else:
312
                raise common.OSCAPaddonError("Unsupported content type")
313
        return fpaths
314
315
316
class ObtainedContent:
317
    """
318
    This class aims to assist the gathered files discovery -
319
    the addon can downloaded files directly, or they can be extracted for an archive.
320
    The class enables user to quickly understand what is available,
321
    and whether the current set of contents is usable for further processing.
322
    """
323
    def __init__(self, root):
324
        self.labelled_files = dict()
325
        self.datastream = None  # type: Pathlib.Path
326
        self.xccdf = None  # type: Pathlib.Path
327
        self.ovals = []  # type: List[Pathlib.Path]
328
        self.tailoring = None  # type: Pathlib.Path
329
        self.archive = None  # type: Pathlib.Path
330
        self.verified = None  # type: Pathlib.Path
331
        self.root = pathlib.Path(root)
332
333
    def record_verification(self, path):
334
        """
335
        Declare a file as verified (typically by means of a checksum)
336
        """
337
        path = pathlib.Path(path)
338
        assert path in self.labelled_files
339
        self.verified = path
340
341
    def add_content_archive(self, fname):
342
        """
343
        If files come from an archive, record this information using this function.
344
        """
345
        path = pathlib.Path(fname)
346
        self.labelled_files[path] = None
347
        self.archive = path
348
349
    def _assign_content_type(self, attribute_name, new_value):
350
        old_value = getattr(self, attribute_name)
351
        if old_value and old_value != new_value:
352
            msg = (
353
                f"When dealing with {attribute_name}, "
354
                f"there was already the {old_value.name} when setting the new {new_value.name}")
355
            raise content_handling.ContentHandlingError(msg)
356
        setattr(self, attribute_name, new_value)
357
358
    def add_file(self, fname, label=None):
359
        if not label:
360
            label = content_handling.identify_files([fname])[fname]
361
        path = pathlib.Path(fname)
362
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
363
            self._assign_content_type("tailoring", path)
364
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
365
            self._assign_content_type("datastream", path)
366
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
367
            self.ovals.append(path)
368
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
369
            self._assign_content_type("xccdf", path)
370
        self.labelled_files[path] = label
371
372
    def _datastream_content(self):
373
        if not self.datastream:
374
            return None
375
        if not self.datastream.exists():
376
            return None
377
        return self.datastream
378
379
    def _xccdf_content(self):
380
        if not self.xccdf or not self.ovals:
381
            return None
382
        some_ovals_exist = any([path.exists() for path in self.ovals])
383
        if not (self.xccdf.exists() and some_ovals_exist):
384
            return None
385
        return self.xccdf
386
387
    def find_expected_usable_content(self, relative_expected_content_path):
388
        content_path = self.root / relative_expected_content_path
389
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
390
391
        if content_path in eligible_main_content:
392
            return content_path
393
        else:
394
            if not content_path.exists():
395
                msg = f"Couldn't find '{content_path}' among the available content"
396
            else:
397
                msg = (
398
                    f"File '{content_path}' is not a valid datastream "
399
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
400
            raise content_handling.ContentHandlingError(msg)
401
402
    def select_main_usable_content(self):
403
        if self._datastream_content():
404
            return self._datastream_content()
405
        elif self._xccdf_content():
406
            return self._xccdf_content()
407
        else:
408
            msg = (
409
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
410
                "among the available content")
411
            raise content_handling.ContentHandlingError(msg)
412
413
    def get_preferred_tailoring(self, tailoring_path):
414
        if tailoring_path:
415
            if tailoring_path != str(self.tailoring.relative_to(self.root)):
416
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
417
                raise content_handling.ContentHandlingError(msg)
418
        return self.tailoring
419
420
    def get_preferred_content(self, content_path):
421
        if content_path:
422
            preferred_content = self.find_expected_usable_content(content_path)
423
        else:
424
            preferred_content = self.select_main_usable_content()
425
        return preferred_content
426