Passed
Pull Request — rhel8-branch (#148)
by Matěj
01:06
created

org_fedora_oscap.model.Model._verify_fingerprint()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 14
rs 9.85
c 0
b 0
f 0
cc 3
nop 3
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
from glob import glob
6
7
from pyanaconda.core import constants
8
from pyanaconda.threading import threadMgr
9
from pykickstart.errors import KickstartValueError
10
11
from org_fedora_oscap import data_fetch, utils
12
from org_fedora_oscap import common
13
from org_fedora_oscap import content_handling
14
15
log = logging.getLogger("anaconda")
16
17
18
def is_network(scheme):
19
    return any(
20
        scheme.startswith(net_prefix)
21
        for net_prefix in data_fetch.NET_URL_PREFIXES)
22
23
24
class Model:
25
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR) / "content-download"
26
27
    def __init__(self, policy_data):
28
        self.content_uri_scheme = ""
29
        self.content_uri_path = ""
30
        self.fetched_content = ""
31
32
        self.activity_lock = threading.Lock()
33
        self.now_fetching_or_processing = False
34
35
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
36
37
    def get_content_type(self, url):
38
        if url.endswith(".rpm"):
39
            return "rpm"
40
        elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES):
41
            return "archive"
42
        else:
43
            return "file"
44
45
    @property
46
    def content_uri(self):
47
        return self.content_uri_scheme + "://" + self.content_uri_path
48
49
    @content_uri.setter
50
    def content_uri(self, uri):
51
        scheme, path = uri.split("://", 1)
52
        self.content_uri_path = path
53
        self.content_uri_scheme = scheme
54
55
    def fetch_content(self, cert, what_if_fail):
56
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
57
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
58
        return self.fetch_files(self.content_uri_scheme, self.content_uri_path, self.CONTENT_DOWNLOAD_LOCATION, cert, what_if_fail)
59
60
    def fetch_files(self, scheme, path, destdir, cert, what_if_fail):
61
        with self.activity_lock:
62
            if self.now_fetching_or_processing:
63
                msg = "Strange, it seems that we are already fetching something."
64
                log.warn(msg)
65
                return
66
            self.now_fetching_or_processing = True
67
68
        thread_name = None
69
        try:
70
            thread_name = self._start_actual_fetch(scheme, path, destdir, cert)
71
        except Exception as exc:
72
            with self.activity_lock:
73
                self.now_fetching_or_processing = False
74
            what_if_fail(exc)
75
76
        # We are not finished yet with the fetch
77
        return thread_name
78
79
    def _start_actual_fetch(self, scheme, path, destdir, cert):
80
        thread_name = None
81
        url = scheme + "://" + path
82
83
        if "/" not in path:
84
            msg = f"Missing the path component of the '{url}' URL"
85
            raise KickstartValueError(msg)
86
        basename = path.rsplit("/", 1)[1]
87
        if not basename:
88
            msg = f"Unable to deduce basename from the '{url}' URL"
89
            raise KickstartValueError(msg)
90
91
        dest = destdir / basename
92
93
        if is_network(scheme):
94
            thread_name = common.wait_and_fetch_net_data(
95
                url,
96
                dest,
97
                cert
98
            )
99
        else:  # invalid schemes are handled down the road
100
            thread_name = common.fetch_local_data(
101
                url,
102
                dest,
103
            )
104
        return thread_name
105
106
    def finish_content_fetch(self, thread_name, fingerprint, report_callback, dest_filename, after_fetch, what_if_fail):
107
        """
108
        Args:
109
            what_if_fail: Callback accepting exception.
110
            after_fetch: Callback accepting the content class.
111
        """
112
        try:
113
            content = self._finish_actual_fetch(thread_name, fingerprint, report_callback, dest_filename)
114
        except Exception as exc:
115
            what_if_fail(exc)
116
            content = None
117
        finally:
118
            with self.activity_lock:
119
                self.now_fetching_or_processing = False
120
121
        after_fetch(content)
122
123
        return content
124
125
    def _verify_fingerprint(self, dest_filename, fingerprint=""):
126
        if not fingerprint:
127
            return
128
129
        hash_obj = utils.get_hashing_algorithm(fingerprint)
130
        digest = utils.get_file_fingerprint(dest_filename,
131
                                            hash_obj)
132
        if digest != fingerprint:
133
            log.error(
134
                "File {dest_filename} failed integrity check - assumed a "
135
                "{hash_obj.name} hash and '{fingerprint}', got '{digest}'"
136
            )
137
            msg = f"Integrity check of the content failed - {hash_obj.name} hash didn't match"
138
            raise content_handling.ContentCheckError(msg)
139
140
    def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
141
        threadMgr.wait(wait_for)
142
        actually_fetched_content = wait_for is not None
143
144
        self._verify_fingerprint(dest_filename, fingerprint)
145
146
        content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION)
147
148
        report_callback("Analyzing content.")
149
        if not actually_fetched_content:
150
            fpaths = [f"{common.SSG_DIR}/{common.SSG_CONTENT}"]
151
            labelled_files = content_handling.identify_files(fpaths)
152
        else:
153
            dest_filename = pathlib.Path(dest_filename)
154
            # RPM is an archive at this phase
155
            content_type = self.get_content_type(str(dest_filename))
156
            if content_type in ("archive", "rpm"):
157
                # extract the content
158
                content.add_content_archive(dest_filename)
159
                try:
160
                    fpaths = common.extract_data(
161
                        str(dest_filename),
162
                        str(dest_filename.parent)
163
                    )
164
                except common.ExtractionError as err:
165
                    msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}"
166
                    log.error(msg)
167
                    raise err
168
169
                # and populate missing fields
170
                labelled_files = content_handling.identify_files(fpaths)
171
172
            elif content_type == "file":
173
                labelled_files = content_handling.identify_files([str(dest_filename)])
174
            else:
175
                raise common.OSCAPaddonError("Unsupported content type")
176
177
        for f, l in labelled_files.items():
178
            content.add_file(f, l)
179
180
        if fingerprint:
181
            content.record_verification(dest_filename)
182
183
        return content
184
185
186
class ObtainedContent:
187
    def __init__(self, root):
188
        self.labelled_files = dict()
189
        self.datastream = ""
190
        self.xccdf = ""
191
        self.oval = ""
192
        self.tailoring = ""
193
        self.archive = ""
194
        self.verified = ""
195
        self.root = pathlib.Path(root)
196
197
    def record_verification(self, path):
198
        assert path in self.labelled_files
199
        self.verified = path
200
201
    def add_content_archive(self, fname):
202
        path = pathlib.Path(fname)
203
        self.labelled_files[path] = None
204
        self.archive = path
205
206
    def _assign_content_type(self, attribute_name, new_value):
207
        old_value = getattr(self, attribute_name)
208
        if old_value:
209
            msg = (
210
                f"When dealing with {attribute_name}, "
211
                f"there was already the {old_value.name} when setting the new {new_value.name}")
212
            raise RuntimeError(msg)
213
        setattr(self, attribute_name, new_value)
214
215
    def add_file(self, fname, label):
216
        path = pathlib.Path(fname)
217
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
218
            self._assign_content_type("tailoring", path)
219
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
220
            self._assign_content_type("datastream", path)
221
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
222
            self._assign_content_type("oval", path)
223
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
224
            self._assign_content_type("xccdf", path)
225
        self.labelled_files[path] = label
226
227
    def _datastream_content(self):
228
        if not self.datastream:
229
            return None
230
        if not self.datastream.exists():
231
            return None
232
        return self.datastream
233
234
    def _xccdf_content(self):
235
        if not self.xccdf or not self.oval:
236
            return None
237
        if not (self.xccdf.exists() and self.oval.exists()):
238
            return None
239
        return self.xccdf
240
241
    def find_expected_usable_content(self, relative_expected_content_path):
242
        content_path = self.root / relative_expected_content_path
243
        elligible_main_content = (self._datastream_content(), self._xccdf_content())
244
245
        if content_path in elligible_main_content:
246
            return content_path
247
        else:
248
            if not content_path.exists():
249
                msg = f"Couldn't find '{content_path}' among the available content"
250
            else:
251
                msg = (
252
                    "File '{content_path}' is not a valid datastream "
253
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
254
            raise content_handling.ContentHandlingError(msg)
255
256
    def select_main_usable_content(self):
257
        elligible_main_content = (self._datastream_content(), self._xccdf_content())
258
        if not any(elligible_main_content):
259
            msg = (
260
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
261
                "among the available content")
262
            raise content_handling.ContentHandlingError(msg)
263
        if elligible_main_content[0]:
264
            return elligible_main_content[0]
265
        else:
266
            return elligible_main_content[1]
267
268
    def get_file_handler(self, path):
269
        if path == self.datastream:
270
            return content_handling.DataStreamHandler
271
        elif path == self.xccdf:
272
            return content_handling.BenchmarkHandler
273
        else:
274
            msg = (
275
                f"We don't know of content '{path}' "
276
                "so we can't make claims regarding its handler.")
277
            raise content_handling.ContentHandlingError(msg)
278