Passed
Pull Request — rhel8-branch (#148)
by Matěj
01:18
created

org_fedora_oscap.model.ObtainedContent.add_file()   A

Complexity

Conditions 5

Size

Total Lines 11
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 11
rs 9.3333
c 0
b 0
f 0
cc 5
nop 3
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
from glob import glob
6
7
from pyanaconda.core import constants
8
from pyanaconda.threading import threadMgr
9
from pykickstart.errors import KickstartValueError
10
11
from org_fedora_oscap import data_fetch, utils
12
from org_fedora_oscap import common
13
from org_fedora_oscap import content_handling
14
15
log = logging.getLogger("anaconda")
16
17
18
def is_network(scheme):
19
    return any(
20
        scheme.startswith(net_prefix)
21
        for net_prefix in data_fetch.NET_URL_PREFIXES)
22
23
24
class Model:
25
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR) / "content-download"
26
27
    def __init__(self, policy_data):
28
        self.content_uri_scheme = ""
29
        self.content_uri_path = ""
30
        self.fetched_content = ""
31
32
        self.activity_lock = threading.Lock()
33
        self.now_fetching_or_processing = False
34
35
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
36
37
    def get_content_type(self, url):
38
        if url.endswith(".rpm"):
39
            return "rpm"
40
        elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES):
41
            return "archive"
42
        else:
43
            return "file"
44
45
    @property
46
    def content_uri(self):
47
        return self.content_uri_scheme + "://" + self.content_uri_path
48
49
    @content_uri.setter
50
    def content_uri(self, uri):
51
        scheme, path = uri.split("://", 1)
52
        self.content_uri_path = path
53
        self.content_uri_scheme = scheme
54
55
    def fetch_content(self, cert, what_if_fail):
56
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
57
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
58
        return self.fetch_files(self.content_uri_scheme, self.content_uri_path, self.CONTENT_DOWNLOAD_LOCATION, cert, what_if_fail)
59
60
    def fetch_files(self, scheme, path, destdir, cert, what_if_fail):
61
        with self.activity_lock:
62
            if self.now_fetching_or_processing:
63
                msg = "Strange, it seems that we are already fetching something."
64
                log.warn(msg)
65
                return
66
            self.now_fetching_or_processing = True
67
68
        thread_name = None
69
        try:
70
            thread_name = self._start_actual_fetch(scheme, path, destdir, cert)
71
        except Exception as exc:
72
            with self.activity_lock:
73
                self.now_fetching_or_processing = False
74
            what_if_fail(exc)
75
76
        # We are not finished yet with the fetch
77
        return thread_name
78
79
    def _start_actual_fetch(self, scheme, path, destdir, cert):
80
        thread_name = None
81
        url = scheme + "://" + path
82
83
        if "/" not in path:
84
            msg = f"Missing the path component of the '{url}' URL"
85
            raise KickstartValueError(msg)
86
        basename = path.rsplit("/", 1)[1]
87
        if not basename:
88
            msg = f"Unable to deduce basename from the '{url}' URL"
89
            raise KickstartValueError(msg)
90
91
        dest = destdir / basename
92
93
        if is_network(scheme):
94
            thread_name = data_fetch.wait_and_fetch_net_data(
95
                url,
96
                dest,
97
                cert
98
            )
99
        else:  # invalid schemes are handled down the road
100
            thread_name = data_fetch.fetch_local_data(
101
                url,
102
                dest,
103
            )
104
        return thread_name
105
106
    def finish_content_fetch(self, thread_name, fingerprint, report_callback, dest_filename, after_fetch, what_if_fail):
107
        """
108
        Args:
109
            what_if_fail: Callback accepting exception.
110
            after_fetch: Callback accepting the content class.
111
        """
112
        try:
113
            content = self._finish_actual_fetch(thread_name, fingerprint, report_callback, dest_filename)
114
        except Exception as exc:
115
            what_if_fail(exc)
116
            content = None
117
        finally:
118
            with self.activity_lock:
119
                self.now_fetching_or_processing = False
120
121
        after_fetch(content)
122
123
        return content
124
125
    def _verify_fingerprint(self, dest_filename, fingerprint=""):
126
        if not fingerprint:
127
            return
128
129
        hash_obj = utils.get_hashing_algorithm(fingerprint)
130
        digest = utils.get_file_fingerprint(dest_filename,
131
                                            hash_obj)
132
        if digest != fingerprint:
133
            log.error(
134
                "File {dest_filename} failed integrity check - assumed a "
135
                "{hash_obj.name} hash and '{fingerprint}', got '{digest}'"
136
            )
137
            msg = f"Integrity check of the content failed - {hash_obj.name} hash didn't match"
138
            raise content_handling.ContentCheckError(msg)
139
140
    def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
141
        threadMgr.wait(wait_for)
142
        actually_fetched_content = wait_for is not None
143
144
        self._verify_fingerprint(dest_filename, fingerprint)
145
146
        content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION)
147
148
        report_callback("Analyzing content.")
149
        if not actually_fetched_content:
150
            if not dest_filename:  # using scap-security-guide
151
                fpaths = [f"{common.SSG_DIR}/{common.SSG_CONTENT}"]
152
                labelled_files = content_handling.identify_files(fpaths)
153
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
154
                fpaths = glob(str(self.CONTENT_DOWNLOAD_LOCATION / "*.xml"))
155
                labelled_files = content_handling.identify_files(fpaths)
156
        else:
157
            dest_filename = pathlib.Path(dest_filename)
158
            # RPM is an archive at this phase
159
            content_type = self.get_content_type(str(dest_filename))
160
            if content_type in ("archive", "rpm"):
161
                # extract the content
162
                content.add_content_archive(dest_filename)
163
                try:
164
                    fpaths = common.extract_data(
165
                        str(dest_filename),
166
                        str(dest_filename.parent)
167
                    )
168
                except common.ExtractionError as err:
169
                    msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}"
170
                    log.error(msg)
171
                    raise err
172
173
                # and populate missing fields
174
                labelled_files = content_handling.identify_files(fpaths)
175
176
            elif content_type == "file":
177
                labelled_files = content_handling.identify_files([str(dest_filename)])
178
            else:
179
                raise common.OSCAPaddonError("Unsupported content type")
180
181
        for f, l in labelled_files.items():
182
            content.add_file(f, l)
183
184
        if fingerprint:
185
            content.record_verification(dest_filename)
186
187
        return content
188
189
190
class ObtainedContent:
191
    def __init__(self, root):
192
        self.labelled_files = dict()
193
        self.datastream = ""
194
        self.xccdf = ""
195
        self.oval = ""
196
        self.tailoring = ""
197
        self.archive = ""
198
        self.verified = ""
199
        self.root = pathlib.Path(root)
200
201
    def record_verification(self, path):
202
        assert path in self.labelled_files
203
        self.verified = path
204
205
    def add_content_archive(self, fname):
206
        path = pathlib.Path(fname)
207
        self.labelled_files[path] = None
208
        self.archive = path
209
210
    def _assign_content_type(self, attribute_name, new_value):
211
        old_value = getattr(self, attribute_name)
212
        if old_value:
213
            msg = (
214
                f"When dealing with {attribute_name}, "
215
                f"there was already the {old_value.name} when setting the new {new_value.name}")
216
            raise RuntimeError(msg)
217
        setattr(self, attribute_name, new_value)
218
219
    def add_file(self, fname, label):
220
        path = pathlib.Path(fname)
221
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
222
            self._assign_content_type("tailoring", path)
223
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
224
            self._assign_content_type("datastream", path)
225
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
226
            self._assign_content_type("oval", path)
227
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
228
            self._assign_content_type("xccdf", path)
229
        self.labelled_files[path] = label
230
231
    def _datastream_content(self):
232
        if not self.datastream:
233
            return None
234
        if not self.datastream.exists():
235
            return None
236
        return self.datastream
237
238
    def _xccdf_content(self):
239
        if not self.xccdf or not self.oval:
240
            return None
241
        if not (self.xccdf.exists() and self.oval.exists()):
242
            return None
243
        return self.xccdf
244
245
    def find_expected_usable_content(self, relative_expected_content_path):
246
        content_path = self.root / relative_expected_content_path
247
        elligible_main_content = (self._datastream_content(), self._xccdf_content())
248
249
        if content_path in elligible_main_content:
250
            return content_path
251
        else:
252
            if not content_path.exists():
253
                msg = f"Couldn't find '{content_path}' among the available content"
254
            else:
255
                msg = (
256
                    "File '{content_path}' is not a valid datastream "
257
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
258
            raise content_handling.ContentHandlingError(msg)
259
260
    def select_main_usable_content(self):
261
        elligible_main_content = (self._datastream_content(), self._xccdf_content())
262
        if not any(elligible_main_content):
263
            msg = (
264
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
265
                "among the available content")
266
            raise content_handling.ContentHandlingError(msg)
267
        if elligible_main_content[0]:
268
            return elligible_main_content[0]
269
        else:
270
            return elligible_main_content[1]
271
272
    def get_file_handler(self, path):
273
        if path == self.datastream:
274
            return content_handling.DataStreamHandler
275
        elif path == self.xccdf:
276
            return content_handling.BenchmarkHandler
277
        else:
278
            msg = (
279
                f"We don't know of content '{path}' "
280
                "so we can't make claims regarding its handler.")
281
            raise content_handling.ContentHandlingError(msg)
282