Passed
Pull Request — rhel8-branch (#148)
by Matěj
01:28
created

org_fedora_oscap.model.Model._verify_fingerprint()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 14
rs 9.85
c 0
b 0
f 0
cc 3
nop 3
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
from glob import glob
6
7
from pyanaconda.core import constants
8
from pyanaconda.threading import threadMgr
9
from pykickstart.errors import KickstartValueError
10
11
from org_fedora_oscap import data_fetch, utils
12
from org_fedora_oscap import common
13
from org_fedora_oscap import content_handling
14
15
from org_fedora_oscap.common import _
16
17
log = logging.getLogger("anaconda")
18
19
20
def is_network(scheme):
21
    return any(
22
        scheme.startswith(net_prefix)
23
        for net_prefix in data_fetch.NET_URL_PREFIXES)
24
25
26
class Model:
27
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR) / "content-download"
28
    DEFAULT_CONTENT = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
29
30
    def __init__(self, addon_data):
31
        self.content_uri_scheme = ""
32
        self.content_uri_path = ""
33
        self.fetched_content = ""
34
35
        self.activity_lock = threading.Lock()
36
        self.now_fetching_or_processing = False
37
38
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
39
40
        self.addon_data = addon_data
41
42
    def get_content_type(self, url):
43
        if url.endswith(".rpm"):
44
            return "rpm"
45
        elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES):
46
            return "archive"
47
        else:
48
            return "file"
49
50
    @property
51
    def content_uri(self):
52
        return self.content_uri_scheme + "://" + self.content_uri_path
53
54
    @content_uri.setter
55
    def content_uri(self, uri):
56
        scheme, path = uri.split("://", 1)
57
        self.content_uri_path = path
58
        self.content_uri_scheme = scheme
59
60
    def fetch_content(self, what_if_fail, cert=""):
61
        """
62
        Initiate fetch of the content into an appropriate directory
63
64
        Args:
65
            what_if_fail: Callback accepting exception as an argument that
66
                should handle them in the calling layer.
67
            cert: HTTPS certificates
68
        """
69
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
70
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
71
        fetching_thread_name = self._fetch_files(
72
            self.content_uri_scheme, self.content_uri_path,
73
            self.CONTENT_DOWNLOAD_LOCATION, cert, what_if_fail)
74
        return fetching_thread_name
75
76
    def _fetch_files(self, scheme, path, destdir, cert, what_if_fail):
77
        with self.activity_lock:
78
            if self.now_fetching_or_processing:
79
                msg = "Strange, it seems that we are already fetching something."
80
                log.warn(msg)
81
                return
82
            self.now_fetching_or_processing = True
83
84
        fetching_thread_name = None
85
        try:
86
            fetching_thread_name = self._start_actual_fetch(scheme, path, destdir, cert)
87
        except Exception as exc:
88
            with self.activity_lock:
89
                self.now_fetching_or_processing = False
90
            what_if_fail(exc)
91
92
        # We are not finished yet with the fetch
93
        return fetching_thread_name
94
95
    def _start_actual_fetch(self, scheme, path, destdir, cert):
96
        fetching_thread_name = None
97
        url = scheme + "://" + path
98
99
        if "/" not in path:
100
            msg = f"Missing the path component of the '{url}' URL"
101
            raise KickstartValueError(msg)
102
        basename = path.rsplit("/", 1)[1]
103
        if not basename:
104
            msg = f"Unable to deduce basename from the '{url}' URL"
105
            raise KickstartValueError(msg)
106
107
        dest = destdir / basename
108
109
        if is_network(scheme):
110
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
111
                url,
112
                dest,
113
                cert
114
            )
115
        else:  # invalid schemes are handled down the road
116
            fetching_thread_name = data_fetch.fetch_local_data(
117
                url,
118
                dest,
119
            )
120
        return fetching_thread_name
121
122
    def finish_content_fetch(self, fetching_thread_name, fingerprint, report_callback, dest_filename,
123
                             what_if_fail):
124
        """
125
        Finish any ongoing fetch and analyze what has been fetched.
126
127
        After the fetch is completed, it analyzes verifies fetched content if applicable,
128
        analyzes it and compiles into an instance of ObtainedContent.
129
130
        Args:
131
            fetching_thread_name: Name of the fetching thread
132
                or None if we are only after the analysis
133
            fingerprint: A checksum for downloaded file verification
134
            report_callback: Means for the method to send user-relevant messages outside
135
            dest_filename: The target of the fetch operation. Can be falsy -
136
                in this case there is no content filename defined
137
            what_if_fail: Callback accepting exception as an argument
138
                that should handle them in the calling layer.
139
140
        Returns:
141
            Instance of ObtainedContent if everything went well, or None.
142
        """
143
        try:
144
            content = self._finish_actual_fetch(fetching_thread_name, fingerprint, report_callback, dest_filename)
145
        except Exception as exc:
146
            what_if_fail(exc)
147
            content = None
148
        finally:
149
            with self.activity_lock:
150
                self.now_fetching_or_processing = False
151
152
        return content
153
154
    def _verify_fingerprint(self, dest_filename, fingerprint=""):
155
        if not fingerprint:
156
            return
157
158
        hash_obj = utils.get_hashing_algorithm(fingerprint)
159
        digest = utils.get_file_fingerprint(dest_filename,
160
                                            hash_obj)
161
        if digest != fingerprint:
162
            log.error(
163
                f"File {dest_filename} failed integrity check - assumed a "
164
                f"{hash_obj.name} hash and '{fingerprint}', got '{digest}'"
165
            )
166
            msg = _(f"Integrity check of the content failed - {hash_obj.name} hash didn't match")
167
            raise content_handling.ContentCheckError(msg)
168
169
    def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
170
        threadMgr.wait(wait_for)
171
        actually_fetched_content = wait_for is not None
172
173
        if fingerprint and dest_filename:
174
            self._verify_fingerprint(dest_filename, fingerprint)
175
176
        fpaths = self._gather_available_files(actually_fetched_content, dest_filename)
177
178
        structured_content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION)
179
        content_type = self.get_content_type(str(dest_filename))
180
        if content_type in ("archive", "rpm"):
181
            structured_content.add_content_archive(dest_filename)
182
183
        labelled_files = content_handling.identify_files(fpaths)
184
        for fname, label in labelled_files.items():
185
            structured_content.add_file(fname, label)
186
187
        if fingerprint and dest_filename:
188
            structured_content.record_verification(dest_filename)
189
190
        return structured_content
191
192
    def _gather_available_files(self, actually_fetched_content, dest_filename):
193
        fpaths = []
194
        if not actually_fetched_content:
195
            if not dest_filename:  # using scap-security-guide
196
                fpaths = [self.DEFAULT_CONTENT]
197
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
198
                fpaths = glob(str(self.CONTENT_DOWNLOAD_LOCATION / "*.xml"))
199
        else:
200
            dest_filename = pathlib.Path(dest_filename)
201
            # RPM is an archive at this phase
202
            content_type = self.get_content_type(str(dest_filename))
203
            if content_type in ("archive", "rpm"):
204
                try:
205
                    fpaths = common.extract_data(
206
                        str(dest_filename),
207
                        str(dest_filename.parent)
208
                    )
209
                except common.ExtractionError as err:
210
                    msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}"
211
                    log.error(msg)
212
                    raise err
213
214
            elif content_type == "file":
215
                fpaths = [str(dest_filename)]
216
            else:
217
                raise common.OSCAPaddonError("Unsupported content type")
218
        return fpaths
219
220
    def use_downloaded_content(self, content):
221
        preferred_content = self.get_preferred_content(content)
222
223
        self.policy_data.content_type = "datastream"
224
        self.policy_data.content_path = str(preferred_content.relative_to(content.root))
225
226
        preferred_tailoring = self._get_preferred_tailoring(content)
227
        if content.tailoring:
228
            self.policy_data.tailoring_path = str(preferred_tailoring.relative_to(content.root))
229
230
    def use_system_content(self, content):
231
        self._addon_data.clear_all()
232
        self._addon_data.content_type = "scap-security-guide"
233
        self._addon_data.content_path = common.get_ssg_path()
234
235
    def _get_preferred_content(self, content):
236
        if self.content_path:
237
            preferred_content = content.find_expected_usable_content(self.content_path)
238
        else:
239
            preferred_content = content.select_main_usable_content()
240
        return preferred_content
241
242
    def _get_preferred_tailoring(self, content):
243
        if self._addon_data.tailoring_path:
244
            if self._addon_data.tailoring_path != str(content.tailoring.relative_to(content.root)):
245
                msg = f"Expected a tailoring {self.tailoring_path}, but it couldn't be found"
246
                raise content_handling.ContentHandlingError(msg)
247
        return content.tailoring
248
249
250
class ObtainedContent:
251
    """
252
    This class aims to assist the gathered files discovery -
253
    the addon can downloaded files directly, or they can be extracted for an archive.
254
    The class enables user to quickly understand what is available,
255
    and whether the current set of contents is usable for further processing.
256
    """
257
    def __init__(self, root):
258
        self.labelled_files = dict()
259
        self.datastream = ""
260
        self.xccdf = ""
261
        self.oval = ""
262
        self.tailoring = ""
263
        self.archive = ""
264
        self.verified = ""
265
        self.root = pathlib.Path(root)
266
267
    def record_verification(self, path):
268
        """
269
        Declare a file as verified (typically by means of a checksum)
270
        """
271
        path = pathlib.Path(path)
272
        assert path in self.labelled_files
273
        self.verified = path
274
275
    def add_content_archive(self, fname):
276
        """
277
        If files come from an archive, record this information using this function.
278
        """
279
        path = pathlib.Path(fname)
280
        self.labelled_files[path] = None
281
        self.archive = path
282
283
    def _assign_content_type(self, attribute_name, new_value):
284
        old_value = getattr(self, attribute_name)
285
        if old_value:
286
            msg = (
287
                f"When dealing with {attribute_name}, "
288
                f"there was already the {old_value.name} when setting the new {new_value.name}")
289
            raise RuntimeError(msg)
290
        setattr(self, attribute_name, new_value)
291
292
    def add_file(self, fname, label):
293
        path = pathlib.Path(fname)
294
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
295
            self._assign_content_type("tailoring", path)
296
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
297
            self._assign_content_type("datastream", path)
298
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
299
            self._assign_content_type("oval", path)
300
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
301
            self._assign_content_type("xccdf", path)
302
        self.labelled_files[path] = label
303
304
    def _datastream_content(self):
305
        if not self.datastream:
306
            return None
307
        if not self.datastream.exists():
308
            return None
309
        return self.datastream
310
311
    def _xccdf_content(self):
312
        if not self.xccdf or not self.oval:
313
            return None
314
        if not (self.xccdf.exists() and self.oval.exists()):
315
            return None
316
        return self.xccdf
317
318
    def find_expected_usable_content(self, relative_expected_content_path):
319
        content_path = self.root / relative_expected_content_path
320
        eligible_main_content = (self._datastream_content(), self._xccdf_content())
321
322
        if content_path in eligible_main_content:
323
            return content_path
324
        else:
325
            if not content_path.exists():
326
                msg = f"Couldn't find '{content_path}' among the available content"
327
            else:
328
                msg = (
329
                    "File '{content_path}' is not a valid datastream "
330
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
331
            raise content_handling.ContentHandlingError(msg)
332
333
    def select_main_usable_content(self):
334
        if self._datastream_content():
335
            return self._datastream_content()
336
        elif self._xccdf_content():
337
            return self._xccdf_content()
338
        else:
339
            msg = (
340
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
341
                "among the available content")
342
            raise content_handling.ContentHandlingError(msg)
343