Passed
Pull Request — master (#219)
by Matěj
01:21
created

ContentBringer.fetch_content()   A

Complexity

Conditions 1

Size

Total Lines 16
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 16
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
from glob import glob
6
from typing import List
7
8
from pyanaconda.core import constants
9
from pyanaconda.threading import threadMgr
10
from pykickstart.errors import KickstartValueError
11
12
from org_fedora_oscap import data_fetch, utils
13
from org_fedora_oscap import common
14
from org_fedora_oscap import content_handling
15
from org_fedora_oscap import rule_handling
16
17
from org_fedora_oscap.common import _
18
19
log = logging.getLogger("anaconda")
20
21
22
def is_network(scheme):
23
    return any(
24
        scheme.startswith(net_prefix)
25
        for net_prefix in data_fetch.NET_URL_PREFIXES)
26
27
28
def clear_all(data):
29
    data.content_type = ""
30
    data.content_url = ""
31
    data.datastream_id = ""
32
    data.xccdf_id = ""
33
    data.profile_id = ""
34
    data.content_path = ""
35
    data.cpe_path = ""
36
    data.tailoring_path = ""
37
38
    data.fingerprint = ""
39
40
    data.certificates = ""
41
42
    # internal values
43
    data.rule_data = rule_handling.RuleData()
44
    data.dry_run = False
45
46
47
class ContentBringer:
48
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
49
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
50
51
    def __init__(self, addon_data):
52
        self.content_uri_scheme = ""
53
        self.content_uri_path = ""
54
        self.fetched_content = ""
55
56
        self.activity_lock = threading.Lock()
57
        self.now_fetching_or_processing = False
58
59
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
60
61
        self._addon_data = addon_data
62
63
    def get_content_type(self, url):
64
        if url.endswith(".rpm"):
65
            return "rpm"
66
        elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES):
67
            return "archive"
68
        else:
69
            return "file"
70
71
    @property
72
    def content_uri(self):
73
        return self.content_uri_scheme + "://" + self.content_uri_path
74
75
    @content_uri.setter
76
    def content_uri(self, uri):
77
        scheme, path = uri.split("://", 1)
78
        self.content_uri_path = path
79
        self.content_uri_scheme = scheme
80
81
    def fetch_content(self, what_if_fail, ca_certs_path=""):
82
        """
83
        Initiate fetch of the content into an appropriate directory
84
85
        Args:
86
            what_if_fail: Callback accepting exception as an argument that
87
                should handle them in the calling layer.
88
            ca_certs_path: Path to the HTTPS certificate file
89
        """
90
        self.content_uri = self._addon_data.content_url
91
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
92
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
93
        fetching_thread_name = self._fetch_files(
94
            self.content_uri_scheme, self.content_uri_path,
95
            self.CONTENT_DOWNLOAD_LOCATION, ca_certs_path, what_if_fail)
96
        return fetching_thread_name
97
98
    def _fetch_files(self, scheme, path, destdir, ca_certs_path, what_if_fail):
99
        with self.activity_lock:
100
            if self.now_fetching_or_processing:
101
                msg = "OSCAP Addon: Strange, it seems that we are already fetching something."
102
                log.warn(msg)
103
                return
104
            self.now_fetching_or_processing = True
105
106
        fetching_thread_name = None
107
        try:
108
            fetching_thread_name = self._start_actual_fetch(scheme, path, destdir, ca_certs_path)
109
        except Exception as exc:
110
            with self.activity_lock:
111
                self.now_fetching_or_processing = False
112
            what_if_fail(exc)
113
114
        # We are not finished yet with the fetch
115
        return fetching_thread_name
116
117
    def _start_actual_fetch(self, scheme, path, destdir, ca_certs_path):
118
        fetching_thread_name = None
119
        url = scheme + "://" + path
120
121
        if "/" not in path:
122
            msg = f"Missing the path component of the '{url}' URL"
123
            raise KickstartValueError(msg)
124
        basename = path.rsplit("/", 1)[1]
125
        if not basename:
126
            msg = f"Unable to deduce basename from the '{url}' URL"
127
            raise KickstartValueError(msg)
128
129
        dest = destdir / basename
130
131
        if is_network(scheme):
132
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
133
                url,
134
                dest,
135
                ca_certs_path
136
            )
137
        else:  # invalid schemes are handled down the road
138
            fetching_thread_name = data_fetch.fetch_local_data(
139
                url,
140
                dest,
141
            )
142
        return fetching_thread_name
143
144
    def finish_content_fetch(self, fetching_thread_name, fingerprint, report_callback, dest_filename,
145
                             what_if_fail):
146
        """
147
        Finish any ongoing fetch and analyze what has been fetched.
148
149
        After the fetch is completed, it analyzes verifies fetched content if applicable,
150
        analyzes it and compiles into an instance of ObtainedContent.
151
152
        Args:
153
            fetching_thread_name: Name of the fetching thread
154
                or None if we are only after the analysis
155
            fingerprint: A checksum for downloaded file verification
156
            report_callback: Means for the method to send user-relevant messages outside
157
            dest_filename: The target of the fetch operation. Can be falsy -
158
                in this case there is no content filename defined
159
            what_if_fail: Callback accepting exception as an argument
160
                that should handle them in the calling layer.
161
162
        Returns:
163
            Instance of ObtainedContent if everything went well, or None.
164
        """
165
        try:
166
            content = self._finish_actual_fetch(fetching_thread_name, fingerprint, report_callback, dest_filename)
167
        except Exception as exc:
168
            what_if_fail(exc)
169
            content = None
170
        finally:
171
            with self.activity_lock:
172
                self.now_fetching_or_processing = False
173
174
        return content
175
176
    def _verify_fingerprint(self, dest_filename, fingerprint=""):
177
        if not fingerprint:
178
            log.info("OSCAP Addon: No fingerprint provided, skipping integrity check")
179
            return
180
181
        hash_obj = utils.get_hashing_algorithm(fingerprint)
182
        digest = utils.get_file_fingerprint(dest_filename,
183
                                            hash_obj)
184
        if digest != fingerprint:
185
            log.error(
186
                "OSCAP Addon: "
187
                f"File {dest_filename} failed integrity check - assumed a "
188
                f"{hash_obj.name} hash and '{fingerprint}', got '{digest}'"
189
            )
190
            msg = _(f"OSCAP Addon: Integrity check of the content failed - {hash_obj.name} hash didn't match")
191
            raise content_handling.ContentCheckError(msg)
192
        log.info(f"Integrity check passed using {hash_obj.name} hash")
193
194
    def filter_discovered_content(self, labelled_files):
195
        expected_path = self._addon_data.content_path
196
        categories = (content_handling["DATASTREAM"], content_handling["XCCDF_CHECKLIST"])
197
        if expected_content:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable expected_content does not seem to be defined.
Loading history...
198
            labelled_files = self.reduce_files(labelled_files, expected_path, categories)
199
200
        expected_path = self._addon_data.tailoring_path
201
        categories = (content_handling["TAILORING"], )
202
        if expected_tailoring:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable expected_tailoring does not seem to be defined.
Loading history...
203
            labelled_files = self.reduce_files(labelled_files, expected_path, categories)
204
205
        expected_path = self._addon_data.cpe_path
206
        categories = (content_handling["CPE_DICT"], )
207
        if expected_tailoring:
208
            labelled_files = self.reduce_files(labelled_files, expected_path, categories)
209
210
        return labelled_files
211
212
    def reduce_files(self, labelled_files, expected_path, categories):
213
        reduced_files = dict()
214
        if expected_path not in labelled_files:
215
            msg = (
216
                f"Expected a file {expected_path} to be part of the supplied content, "
217
                "but it was not the case."
218
            )
219
            raise RuntimeError(msg)
220
        for path, label in labelled_files.items():
221
            if label in categories and path != expected_path:
222
                continue
223
            reduced_files[path] = label
224
        return reduced_files
225
226
    def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
227
        if wait_for:
228
            log.info(f"OSCAP Addon: Waiting for thread {wait_for}")
229
            threadMgr.wait(wait_for)
230
            log.info(f"OSCAP Addon: Finished waiting for thread {wait_for}")
231
        actually_fetched_content = wait_for is not None
232
233
        if fingerprint and dest_filename:
234
            self._verify_fingerprint(dest_filename, fingerprint)
235
236
        fpaths = self._gather_available_files(actually_fetched_content, dest_filename)
237
238
        structured_content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION)
239
        content_type = self.get_content_type(str(dest_filename))
240
        log.info(f"OSCAP Addon: started to look at the content")
241
        if content_type in ("archive", "rpm"):
242
            structured_content.add_content_archive(dest_filename)
243
244
        labelled_files = content_handling.identify_files(fpaths)
245
        labelled_files = self.filter_discovered_content(labelled_files)
246
247
        for fname, label in labelled_files.items():
248
            structured_content.add_file(fname, label)
249
250
        if fingerprint and dest_filename:
251
            structured_content.record_verification(dest_filename)
252
253
        log.info(f"OSCAP Addon: finished looking at the content")
254
        return structured_content
255
256
    def _gather_available_files(self, actually_fetched_content, dest_filename):
257
        fpaths = []
258
        if not actually_fetched_content:
259
            if not dest_filename:  # using scap-security-guide
260
                fpaths = [self.DEFAULT_SSG_DATA_STREAM_PATH]
261
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
262
                fpaths = pathlib.Path(self.CONTENT_DOWNLOAD_LOCATION).rglob("*")
263
                fpaths = [str(p) for p in fpaths if p.is_file()]
264
        else:
265
            dest_filename = pathlib.Path(dest_filename)
266
            # RPM is an archive at this phase
267
            content_type = self.get_content_type(str(dest_filename))
268
            if content_type in ("archive", "rpm"):
269
                try:
270
                    fpaths = common.extract_data(
271
                        str(dest_filename),
272
                        str(dest_filename.parent)
273
                    )
274
                except common.ExtractionError as err:
275
                    msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}"
276
                    log.error("OSCAP Addon: " + msg)
277
                    raise err
278
279
            elif content_type == "file":
280
                fpaths = [str(dest_filename)]
281
            else:
282
                raise common.OSCAPaddonError("Unsupported content type")
283
        return fpaths
284
285
    def use_downloaded_content(self, content):
286
        preferred_content = self.get_preferred_content(content)
287
288
        # We know that we have ended up with a datastream-like content,
289
        # but if we can't convert an archive to a datastream.
290
        # self._addon_data.content_type = "datastream"
291
        self._addon_data.content_path = str(preferred_content.relative_to(content.root))
292
293
        preferred_tailoring = self.get_preferred_tailoring(content)
294
        if content.tailoring:
295
            self._addon_data.tailoring_path = str(preferred_tailoring.relative_to(content.root))
296
297
    def use_system_content(self, content=None):
298
        clear_all(self._addon_data)
299
        self._addon_data.content_type = "scap-security-guide"
300
        self._addon_data.content_path = common.get_ssg_path()
301
302
    def get_preferred_content(self, content):
303
        if self._addon_data.content_path:
304
            preferred_content = content.find_expected_usable_content(self._addon_data.content_path)
305
        else:
306
            preferred_content = content.select_main_usable_content()
307
        return preferred_content
308
309
    def get_preferred_tailoring(self, content):
310
        tailoring_path = self._addon_data.tailoring_path
311
        if tailoring_path:
312
            if tailoring_path != str(content.tailoring.relative_to(content.root)):
313
                msg = f"Expected a tailoring {tailoring_path}, but it couldn't be found"
314
                raise content_handling.ContentHandlingError(msg)
315
        return content.tailoring
316
317
318
class ObtainedContent:
319
    """
320
    This class aims to assist the gathered files discovery -
321
    the addon can downloaded files directly, or they can be extracted for an archive.
322
    The class enables user to quickly understand what is available,
323
    and whether the current set of contents is usable for further processing.
324
    """
325
    def __init__(self, root):
326
        self.labelled_files = dict()
327
        self.datastream = None  # type: Pathlib.Path
328
        self.xccdf = None  # type: Pathlib.Path
329
        self.ovals = []  # type: List[Pathlib.Path]
330
        self.tailoring = None  # type: Pathlib.Path
331
        self.archive = None  # type: Pathlib.Path
332
        self.verified = None  # type: Pathlib.Path
333
        self.root = pathlib.Path(root)
334
335
    def record_verification(self, path):
336
        """
337
        Declare a file as verified (typically by means of a checksum)
338
        """
339
        path = pathlib.Path(path)
340
        assert path in self.labelled_files
341
        self.verified = path
342
343
    def add_content_archive(self, fname):
344
        """
345
        If files come from an archive, record this information using this function.
346
        """
347
        path = pathlib.Path(fname)
348
        self.labelled_files[path] = None
349
        self.archive = path
350
351
    def _assign_content_type(self, attribute_name, new_value):
352
        old_value = getattr(self, attribute_name)
353
        if old_value and old_value != new_value:
354
            msg = (
355
                f"When dealing with {attribute_name}, "
356
                f"there was already the {old_value.name} when setting the new {new_value.name}")
357
            raise content_handling.ContentHandlingError(msg)
358
        setattr(self, attribute_name, new_value)
359
360
    def add_file(self, fname, label=None):
361
        if not label:
362
            label = content_handling.identify_files([fname])[fname]
363
        path = pathlib.Path(fname)
364
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
365
            self._assign_content_type("tailoring", path)
366
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
367
            self._assign_content_type("datastream", path)
368
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
369
            self.ovals.append(path)
370
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
371
            self._assign_content_type("xccdf", path)
372
        self.labelled_files[path] = label
373
374
    def _datastream_content(self):
375
        if not self.datastream:
376
            return None
377
        if not self.datastream.exists():
378
            return None
379
        return self.datastream
380
381
    def _xccdf_content(self):
382
        if not self.xccdf or not self.ovals:
383
            return None
384
        some_ovals_exist = any([path.exists() for path in self.ovals])
385
        if not (self.xccdf.exists() and some_ovals_exist):
386
            return None
387
        return self.xccdf
388
389
    def find_expected_usable_content(self, relative_expected_content_path):
390
        content_path = self.root / relative_expected_content_path
391
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
392
393
        if content_path in eligible_main_content:
394
            return content_path
395
        else:
396
            if not content_path.exists():
397
                msg = f"Couldn't find '{content_path}' among the available content"
398
            else:
399
                msg = (
400
                    f"File '{content_path}' is not a valid datastream "
401
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
402
            raise content_handling.ContentHandlingError(msg)
403
404
    def select_main_usable_content(self):
405
        if self._datastream_content():
406
            return self._datastream_content()
407
        elif self._xccdf_content():
408
            return self._xccdf_content()
409
        else:
410
            msg = (
411
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
412
                "among the available content")
413
            raise content_handling.ContentHandlingError(msg)
414