Passed
Pull Request — rhel8-branch (#148)
by Matěj
01:54
created

org_fedora_oscap.model.ObtainedContent.add_file()   A

Complexity

Conditions 5

Size

Total Lines 11
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 11
rs 9.3333
c 0
b 0
f 0
cc 5
nop 3
1
import threading
2
import logging
3
import pathlib
4
import shutil
5
from glob import glob
6
7
from pyanaconda.core import constants
8
from pyanaconda.threading import threadMgr
9
from pykickstart.errors import KickstartValueError
10
11
from org_fedora_oscap import data_fetch, utils
12
from org_fedora_oscap import common
13
from org_fedora_oscap import content_handling
14
15
from org_fedora_oscap.common import _
16
17
log = logging.getLogger("anaconda")
18
19
20
def is_network(scheme):
21
    return any(
22
        scheme.startswith(net_prefix)
23
        for net_prefix in data_fetch.NET_URL_PREFIXES)
24
25
26
class Model:
27
    CONTENT_DOWNLOAD_LOCATION = pathlib.Path(common.INSTALLATION_CONTENT_DIR) / "content-download"
28
    DEFAULT_CONTENT = f"{common.SSG_DIR}/{common.SSG_CONTENT}"
29
30
    def __init__(self, policy_data):
31
        self.content_uri_scheme = ""
32
        self.content_uri_path = ""
33
        self.fetched_content = ""
34
35
        self.activity_lock = threading.Lock()
36
        self.now_fetching_or_processing = False
37
38
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
39
40
    def get_content_type(self, url):
41
        if url.endswith(".rpm"):
42
            return "rpm"
43
        elif any(url.endswith(arch_type) for arch_type in common.SUPPORTED_ARCHIVES):
44
            return "archive"
45
        else:
46
            return "file"
47
48
    @property
49
    def content_uri(self):
50
        return self.content_uri_scheme + "://" + self.content_uri_path
51
52
    @content_uri.setter
53
    def content_uri(self, uri):
54
        scheme, path = uri.split("://", 1)
55
        self.content_uri_path = path
56
        self.content_uri_scheme = scheme
57
58
    def fetch_content(self, what_if_fail, cert=""):
59
        """
60
        Initiate fetch of the content into an appropriate directory
61
62
        Args:
63
            what_if_fail: Callback accepting exception as an argument that
64
                should handle them in the calling layer.
65
            cert: HTTPS certificates
66
        """
67
        shutil.rmtree(self.CONTENT_DOWNLOAD_LOCATION, ignore_errors=True)
68
        self.CONTENT_DOWNLOAD_LOCATION.mkdir(parents=True, exist_ok=True)
69
        fetching_thread_name = self._fetch_files(
70
            self.content_uri_scheme, self.content_uri_path,
71
            self.CONTENT_DOWNLOAD_LOCATION, cert, what_if_fail)
72
        return fetching_thread_name
73
74
    def _fetch_files(self, scheme, path, destdir, cert, what_if_fail):
75
        with self.activity_lock:
76
            if self.now_fetching_or_processing:
77
                msg = "Strange, it seems that we are already fetching something."
78
                log.warn(msg)
79
                return
80
            self.now_fetching_or_processing = True
81
82
        fetching_thread_name = None
83
        try:
84
            fetching_thread_name = self._start_actual_fetch(scheme, path, destdir, cert)
85
        except Exception as exc:
86
            with self.activity_lock:
87
                self.now_fetching_or_processing = False
88
            what_if_fail(exc)
89
90
        # We are not finished yet with the fetch
91
        return fetching_thread_name
92
93
    def _start_actual_fetch(self, scheme, path, destdir, cert):
94
        fetching_thread_name = None
95
        url = scheme + "://" + path
96
97
        if "/" not in path:
98
            msg = f"Missing the path component of the '{url}' URL"
99
            raise KickstartValueError(msg)
100
        basename = path.rsplit("/", 1)[1]
101
        if not basename:
102
            msg = f"Unable to deduce basename from the '{url}' URL"
103
            raise KickstartValueError(msg)
104
105
        dest = destdir / basename
106
107
        if is_network(scheme):
108
            fetching_thread_name = data_fetch.wait_and_fetch_net_data(
109
                url,
110
                dest,
111
                cert
112
            )
113
        else:  # invalid schemes are handled down the road
114
            fetching_thread_name = data_fetch.fetch_local_data(
115
                url,
116
                dest,
117
            )
118
        return fetching_thread_name
119
120
    def finish_content_fetch(self, fetching_thread_name, fingerprint, report_callback, dest_filename,
121
                             what_if_fail):
122
        """
123
        Finish any ongoing fetch and analyze what has been fetched.
124
125
        After the fetch is completed, it analyzes verifies fetched content if applicable,
126
        analyzes it and compiles into an instance of ObtainedContent.
127
128
        Args:
129
            fetching_thread_name: Name of the fetching thread
130
                or None if we are only after the analysis
131
            fingerprint: A checksum for downloaded file verification
132
            report_callback: Means for the method to send user-relevant messages outside
133
            dest_filename: The target of the fetch operation. Can be falsy -
134
                in this case there is no content filename defined
135
            what_if_fail: Callback accepting exception as an argument
136
                that should handle them in the calling layer.
137
138
        Returns:
139
            Instance of ObtainedContent if everything went well, or None.
140
        """
141
        try:
142
            content = self._finish_actual_fetch(fetching_thread_name, fingerprint, report_callback, dest_filename)
143
        except Exception as exc:
144
            what_if_fail(exc)
145
            content = None
146
        finally:
147
            with self.activity_lock:
148
                self.now_fetching_or_processing = False
149
150
        return content
151
152
    def _verify_fingerprint(self, dest_filename, fingerprint=""):
153
        if not fingerprint:
154
            return
155
156
        hash_obj = utils.get_hashing_algorithm(fingerprint)
157
        digest = utils.get_file_fingerprint(dest_filename,
158
                                            hash_obj)
159
        if digest != fingerprint:
160
            log.error(
161
                f"File {dest_filename} failed integrity check - assumed a "
162
                f"{hash_obj.name} hash and '{fingerprint}', got '{digest}'"
163
            )
164
            msg = _(f"Integrity check of the content failed - {hash_obj.name} hash didn't match")
165
            raise content_handling.ContentCheckError(msg)
166
167
    def _finish_actual_fetch(self, wait_for, fingerprint, report_callback, dest_filename):
168
        threadMgr.wait(wait_for)
169
        actually_fetched_content = wait_for is not None
170
171
        if fingerprint and dest_filename:
172
            self._verify_fingerprint(dest_filename, fingerprint)
173
174
        fpaths = self._gather_available_files(actually_fetched_content, dest_filename)
175
176
        structured_content = ObtainedContent(self.CONTENT_DOWNLOAD_LOCATION)
177
        content_type = self.get_content_type(str(dest_filename))
178
        if content_type in ("archive", "rpm"):
179
            structured_content.add_content_archive(dest_filename)
180
181
        labelled_files = content_handling.identify_files(fpaths)
182
        for fname, label in labelled_files.items():
183
            structured_content.add_file(fname, label)
184
185
        if fingerprint and dest_filename:
186
            structured_content.record_verification(dest_filename)
187
188
        return structured_content
189
190
    def _gather_available_files(self, actually_fetched_content, dest_filename):
191
        fpaths = []
192
        if not actually_fetched_content:
193
            if not dest_filename:  # using scap-security-guide
194
                fpaths = [self.DEFAULT_CONTENT]
195
            else:  # Using downloaded XCCDF/OVAL/DS/tailoring
196
                fpaths = glob(str(self.CONTENT_DOWNLOAD_LOCATION / "*.xml"))
197
        else:
198
            dest_filename = pathlib.Path(dest_filename)
199
            # RPM is an archive at this phase
200
            content_type = self.get_content_type(str(dest_filename))
201
            if content_type in ("archive", "rpm"):
202
                try:
203
                    fpaths = common.extract_data(
204
                        str(dest_filename),
205
                        str(dest_filename.parent)
206
                    )
207
                except common.ExtractionError as err:
208
                    msg = f"Failed to extract the '{dest_filename}' archive: {str(err)}"
209
                    log.error(msg)
210
                    raise err
211
212
            elif content_type == "file":
213
                fpaths = [str(dest_filename)]
214
            else:
215
                raise common.OSCAPaddonError("Unsupported content type")
216
        return fpaths
217
218
219
class ObtainedContent:
220
    """
221
    This class aims to assist the gathered files discovery -
222
    the addon can downloaded files directly, or they can be extracted for an archive.
223
    The class enables user to quickly understand what is available,
224
    and whether the current set of contents is usable for further processing.
225
    """
226
    def __init__(self, root):
227
        self.labelled_files = dict()
228
        self.datastream = ""
229
        self.xccdf = ""
230
        self.oval = ""
231
        self.tailoring = ""
232
        self.archive = ""
233
        self.verified = ""
234
        self.root = pathlib.Path(root)
235
236
    def record_verification(self, path):
237
        """
238
        Declare a file as verified (typically by means of a checksum)
239
        """
240
        path = pathlib.Path(path)
241
        assert path in self.labelled_files
242
        self.verified = path
243
244
    def add_content_archive(self, fname):
245
        """
246
        If files come from an archive, record this information using this function.
247
        """
248
        path = pathlib.Path(fname)
249
        self.labelled_files[path] = None
250
        self.archive = path
251
252
    def _assign_content_type(self, attribute_name, new_value):
253
        old_value = getattr(self, attribute_name)
254
        if old_value:
255
            msg = (
256
                f"When dealing with {attribute_name}, "
257
                f"there was already the {old_value.name} when setting the new {new_value.name}")
258
            raise RuntimeError(msg)
259
        setattr(self, attribute_name, new_value)
260
261
    def add_file(self, fname, label):
262
        path = pathlib.Path(fname)
263
        if label == content_handling.CONTENT_TYPES["TAILORING"]:
264
            self._assign_content_type("tailoring", path)
265
        elif label == content_handling.CONTENT_TYPES["DATASTREAM"]:
266
            self._assign_content_type("datastream", path)
267
        elif label == content_handling.CONTENT_TYPES["OVAL"]:
268
            self._assign_content_type("oval", path)
269
        elif label == content_handling.CONTENT_TYPES["XCCDF_CHECKLIST"]:
270
            self._assign_content_type("xccdf", path)
271
        self.labelled_files[path] = label
272
273
    def _datastream_content(self):
274
        if not self.datastream:
275
            return None
276
        if not self.datastream.exists():
277
            return None
278
        return self.datastream
279
280
    def _xccdf_content(self):
281
        if not self.xccdf or not self.oval:
282
            return None
283
        if not (self.xccdf.exists() and self.oval.exists()):
284
            return None
285
        return self.xccdf
286
287
    def find_expected_usable_content(self, relative_expected_content_path):
288
        content_path = self.root / relative_expected_content_path
289
        elligible_main_content = (self._datastream_content(), self._xccdf_content())
290
291
        if content_path in elligible_main_content:
292
            return content_path
293
        else:
294
            if not content_path.exists():
295
                msg = f"Couldn't find '{content_path}' among the available content"
296
            else:
297
                msg = (
298
                    "File '{content_path}' is not a valid datastream "
299
                    "or a valid XCCDF of a XCCDF-OVAL file tuple")
300
            raise content_handling.ContentHandlingError(msg)
301
302
    def select_main_usable_content(self):
303
        if self._datastream_content():
304
            return self._datastream_content()
305
        elif self._xccdf_content():
306
            return self._xccdf_content()
307
        else:
308
            msg = (
309
                "Couldn't find a valid datastream or a valid XCCDF-OVAL file tuple "
310
                "among the available content")
311
            raise content_handling.ContentHandlingError(msg)
312
313
    def get_file_handler(self, path):
314
        if path == self.datastream:
315
            return content_handling.DataStreamHandler
316
        elif path == self.xccdf:
317
            return content_handling.BenchmarkHandler
318
        else:
319
            msg = (
320
                f"We don't know of content '{path}' "
321
                "so we can't make claims regarding its handler.")
322
            raise content_handling.ContentHandlingError(msg)
323