Passed
Pull Request — rhel8-branch (#148)
by Matěj
02:13
created

ContentBringer.content_uri()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
from glob import glob
6
7
from pyanaconda.core import constants
8
from pyanaconda.threading import threadMgr
9
from pykickstart.errors import KickstartValueError
10
11
from org_fedora_oscap import data_fetch, utils
12
from org_fedora_oscap import common
13
from org_fedora_oscap import content_handling
14
15
from org_fedora_oscap.common import _
16
17
log = logging.getLogger("anaconda")
18
19
20
def is_network(scheme):
21
    return any(
22
        scheme.startswith(net_prefix)
23
        for net_prefix in data_fetch.NET_URL_PREFIXES)
24
25
26
class ContentBringer:
27
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR)
28
    DEFAULT_SSG_DATA_STREAM_PATH = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
29
30
    def __init__(self, addon_data):
31
        self.content_uri_scheme = ""
32
        self.content_uri_path = ""
33
        self.fetched_content = ""
34
35
        self.activity_lock = threading.Lock()
36
        self.now_fetching_or_processing = False
37
38
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
39
40
        self._addon_data = addon_data
41
42
    def get_content_type(self, url):
43
        if url.endswith(".rpm"):
44
            return "rpm"
45
        elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES):
46
            return "archive"
47
        else:
48
            return "file"
49
50
    @property
51
    def content_uri(self):
52
        return self.content_uri_scheme + "://" + self.content_uri_path
53
54
    @content_uri.setter
55
    def content_uri(self, uri):
56
        scheme, path = uri.split("://", 1)
57
        self.content_uri_path = path
58
        self.content_uri_scheme = scheme
59
60
    def fetch_content(self, what_if_fail, ca_certs_path=""):
61
        """
62
        Initiate fetch of the content into an appropriate directory
63
64
        Args:
65
            what_if_fail: Callback accepting exception as an argument that
66
                should handle them in the calling layer.
67
            ca_certs_path: Path to the HTTPS certificate file
68
        """
69
        self.content_uri = self._addon_data.content_url
70
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
71
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
72
        fetching_thread_name = self._fetch_files(
73
            self.content_uri_scheme, self.content_uri_path,
74
            self.CONTENT_DOWNLOAD_LOCATION, ca_certs_path, what_if_fail)
75
        return fetching_thread_name
76
77
    def _fetch_files(self, scheme, path, destdir, ca_certs_path, what_if_fail):
78
        with self.activity_lock:
79
            if self.now_fetching_or_processing:
80
                msg = "Strange, it seems that we are already fetching something."
81
                log.warn(msg)
82
                return
83
            self.now_fetching_or_processing = True
84
85
        fetching_thread_name = None
86
        try:
87
            fetching_thread_name = self._start_actual_fetch(scheme, path, destdir, ca_certs_path)
88
        except Exception as exc:
89
            with self.activity_lock:
90
                self.now_fetching_or_processing = False
91
            what_if_fail(exc)
92
93
        # We are not finished yet with the fetch
94
        return fetching_thread_name
95
96
    def _start_actual_fetch(self, scheme, path, destdir, ca_certs_path):
97
        fetching_thread_name = None
98
        url = scheme + "://" + path
99
100
        if "/" not in path:
101
            msg = f"Missing the path component of the '{url}' URL"
102
            raise KickstartValueError(msg)
103
        basename = path.rsplit("/", 1)[1]
104
        if not basename:
105
            msg = f"Unable to deduce basename from the '{url}' URL"
106
            raise KickstartValueError(msg)
107
108
        dest = destdir / basename
109
110
        if is_network(scheme):
111
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
112
                url,
113
                dest,
114
                ca_certs_path
115
            )
116
        else:  # invalid schemes are handled down the road
117
            fetching_thread_name = data_fetch.fetch_local_data(
118
                url,
119
                dest,
120
            )
121
        return fetching_thread_name
122
123
    def finish_content_fetch(self, fetching_thread_name, fingerprint, report_callback, dest_filename,
124
                             what_if_fail):
125
        """
126
        Finish any ongoing fetch and analyze what has been fetched.
127
128
        After the fetch is completed, it analyzes verifies fetched content if applicable,
129
        analyzes it and compiles into an instance of ObtainedContent.
130
131
        Args:
132
            fetching_thread_name: Name of the fetching thread
133
                or None if we are only after the analysis
134
            fingerprint: A checksum for downloaded file verification
135
            report_callback: Means for the method to send user-relevant messages outside
136
            dest_filename: The target of the fetch operation. Can be falsy -
137
                in this case there is no content filename defined
138
            what_if_fail: Callback accepting exception as an argument
139
                that should handle them in the calling layer.
140
141
        Returns:
142
            Instance of ObtainedContent if everything went well, or None.
143
        """
144
        try:
145
            content = self._finish_actual_fetch(fetching_thread_name, fingerprint, report_callback, dest_filename)
146
        except Exception as exc:
147
            what_if_fail(exc)
148
            content = None
149
        finally:
150
            with self.activity_lock:
151
                self.now_fetching_or_processing = False
152
153
        return content
154
155
    def _verify_fingerprint(self, dest_filename, fingerprint=""):
156
        if not fingerprint:
157
            return
158
159
        hash_obj = utils.get_hashing_algorithm(fingerprint)
160
        digest = utils.get_file_fingerprint(dest_filename,
161
                                            hash_obj)
162
        if digest != fingerprint:
163
            log.error(
164
                f"File {dest_filename} failed integrity check - assumed a "
165
                f"{hash_obj.name} hash and '{fingerprint}', got '{digest}'"
166
            )
167
            msg = _(f"Integrity check of the content failed - {hash_obj.name} hash didn't match")
168
            raise content_handling.ContentCheckError(msg)
169
170
    def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
171
        threadMgr.wait(wait_for)
172
        actually_fetched_content = wait_for is not None
173
174
        if fingerprint and dest_filename:
175
            self._verify_fingerprint(dest_filename, fingerprint)
176
177
        fpaths = self._gather_available_files(actually_fetched_content, dest_filename)
178
179
        structured_content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION)
180
        content_type = self.get_content_type(str(dest_filename))
181
        if content_type in ("archive", "rpm"):
182
            structured_content.add_content_archive(dest_filename)
183
184
        labelled_files = content_handling.identify_files(fpaths)
185
        for fname, label in labelled_files.items():
186
            structured_content.add_file(fname, label)
187
188
        if fingerprint and dest_filename:
189
            structured_content.record_verification(dest_filename)
190
191
        return structured_content
192
193
    def _gather_available_files(self, actually_fetched_content, dest_filename):
194
        fpaths = []
195
        if not actually_fetched_content:
196
            if not dest_filename:  # using scap-security-guide
197
                fpaths = [self.DEFAULT_SSG_DATA_STREAM_PATH]
198
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
199
                fpaths = glob(str(self.CONTENT_DOWNLOAD_LOCATION / "*.xml"))
200
        else:
201
            dest_filename = pathlib.Path(dest_filename)
202
            # RPM is an archive at this phase
203
            content_type = self.get_content_type(str(dest_filename))
204
            if content_type in ("archive", "rpm"):
205
                try:
206
                    fpaths = common.extract_data(
207
                        str(dest_filename),
208
                        str(dest_filename.parent)
209
                    )
210
                except common.ExtractionError as err:
211
                    msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}"
212
                    log.error(msg)
213
                    raise err
214
215
            elif content_type == "file":
216
                fpaths = [str(dest_filename)]
217
            else:
218
                raise common.OSCAPaddonError("Unsupported content type")
219
        return fpaths
220
221
    def use_downloaded_content(self, content):
222
        preferred_content = self.get_preferred_content(content)
223
224
        # We know that we have ended up with a datastream-like content,
225
        # but if we can't convert an archive to a datastream.
226
        # self._addon_data.content_type = "datastream"
227
        self._addon_data.content_path = str(preferred_content.relative_to(content.root))
228
229
        preferred_tailoring = self.get_preferred_tailoring(content)
230
        if content.tailoring:
231
            self._addon_data.tailoring_path = str(preferred_tailoring.relative_to(content.root))
232
233
    def use_system_content(self, content=None):
234
        self._addon_data.clear_all()
235
        self._addon_data.content_type = "scap-security-guide"
236
        self._addon_data.content_path = common.get_ssg_path()
237
238
    def get_preferred_content(self, content):
239
        if self._addon_data.content_path:
240
            preferred_content = content.find_expected_usable_content(self._addon_data.content_path)
241
        else:
242
            preferred_content = content.select_main_usable_content()
243
        return preferred_content
244
245
    def get_preferred_tailoring(self, content):
246
        if self._addon_data.tailoring_path:
247
            if self._addon_data.tailoring_path != str(content.tailoring.relative_to(content.root)):
248
                msg = f"Expected a tailoring {self.tailoring_path}, but it couldn't be found"
249
                raise content_handling.ContentHandlingError(msg)
250
        return content.tailoring
251
252
253
class ObtainedContent:
254
    """
255
    This class aims to assist the gathered files discovery -
256
    the addon can downloaded files directly, or they can be extracted for an archive.
257
    The class enables user to quickly understand what is available,
258
    and whether the current set of contents is usable for further processing.
259
    """
260
    def __init__(self, root):
261
        self.labelled_files = dict()
262
        self.datastream = ""
263
        self.xccdf = ""
264
        self.ovals = []
265
        self.tailoring = ""
266
        self.archive = ""
267
        self.verified = ""
268
        self.root = pathlib.Path(root)
269
270
    def record_verification(self, path):
271
        """
272
        Declare a file as verified (typically by means of a checksum)
273
        """
274
        path = pathlib.Path(path)
275
        assert path in self.labelled_files
276
        self.verified = path
277
278
    def add_content_archive(self, fname):
279
        """
280
        If files come from an archive, record this information using this function.
281
        """
282
        path = pathlib.Path(fname)
283
        self.labelled_files[path] = None
284
        self.archive = path
285
286
    def _assign_content_type(self, attribute_name, new_value):
287
        old_value = getattr(self, attribute_name)
288
        if old_value:
289
            msg = (
290
                f"When dealing with {attribute_name}, "
291
                f"there was already the {old_value.name} when setting the new {new_value.name}")
292
            raise content_handling.ContentHandlingError(msg)
293
        setattr(self, attribute_name, new_value)
294
295
    def add_file(self, fname, label):
296
        path = pathlib.Path(fname)
297
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
298
            self._assign_content_type("tailoring", path)
299
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
300
            self._assign_content_type("datastream", path)
301
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
302
            self.ovals.append(path)
303
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
304
            self._assign_content_type("xccdf", path)
305
        self.labelled_files[path] = label
306
307
    def _datastream_content(self):
308
        if not self.datastream:
309
            return None
310
        if not self.datastream.exists():
311
            return None
312
        return self.datastream
313
314
    def _xccdf_content(self):
315
        if not self.xccdf or not self.ovals:
316
            return None
317
        some_ovals_exist = any([path.exists() for path in self.ovals])
318
        if not (self.xccdf.exists() and some_ovals_exist):
319
            return None
320
        return self.xccdf
321
322
    def find_expected_usable_content(self, relative_expected_content_path):
323
        content_path = self.root / relative_expected_content_path
324
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
325
326
        if content_path in eligible_main_content:
327
            return content_path
328
        else:
329
            if not content_path.exists():
330
                msg = f"Couldn't find '{content_path}' among the available content"
331
            else:
332
                msg = (
333
                    f"File '{content_path}' is not a valid datastream "
334
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
335
            raise content_handling.ContentHandlingError(msg)
336
337
    def select_main_usable_content(self):
338
        if self._datastream_content():
339
            return self._datastream_content()
340
        elif self._xccdf_content():
341
            return self._xccdf_content()
342
        else:
343
            msg = (
344
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
345
                "among the available content")
346
            raise content_handling.ContentHandlingError(msg)
347