OSCAPKickstartData._parse_content_url()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import re
20
21
from pyanaconda.core.kickstart import KickstartSpecification
22
from pyanaconda.core.kickstart.addon import AddonData
23
from pykickstart.errors import KickstartValueError, KickstartParseError
24
25
from org_fedora_oscap import common, utils
26
from org_fedora_oscap.structures import PolicyData
27
28
log = logging.getLogger("anaconda")
29
30
__all__ = ["OSCAPKickstartSpecification"]
31
32
33
FINGERPRINT_REGEX = re.compile(r'^[a-z0-9]+$')
34
35
36
def key_value_pair(key, value, indent=4):
37
    return "%s%s = %s" % (indent * " ", key, value)
38
39
40
class AdditionalPropertiesMixin:
41
    @property
42
    def content_name(self) -> str:
43
        return common.get_content_name(self.policy_data)
44
45
    @property
46
    def preinst_content_path(self) -> str:
47
        return common.get_preinst_content_path(self.policy_data)
48
49
    @property
50
    def preinst_tailoring_path(self) -> str:
51
        return common.get_preinst_tailoring_path(self.policy_data)
52
53
    @property
54
    def postinst_content_path(self) -> str:
55
        return common.get_postinst_content_path(self.policy_data)
56
57
    @property
58
    def postinst_tailoring_path(self) -> str:
59
        return common.get_postinst_tailoring_path(self.policy_data)
60
61
    @property
62
    def raw_preinst_content_path(self) -> str:
63
        return common.get_raw_preinst_content_path(self.policy_data)
64
65
66
class OSCAPKickstartData(AddonData, AdditionalPropertiesMixin):
67
    """The kickstart data for the add-on."""
68
69
    def __init__(self):
70
        super().__init__()
71
        self.policy_data = PolicyData()
72
73
        """The name of the %addon section."""
74
        self.name = common.ADDON_NAMES[0]
75
        self.addon_section_present = False
76
77
    def handle_header(self, args, line_number=None):
78
        """Handle the arguments of the %addon line.
79
80
        :param args: a list of additional arguments
81
        :param line_number: a line number
82
        :raise: KickstartParseError for invalid arguments
83
        """
84
        self.addon_section_present = True
85
86
    def handle_line(self, line, line_number=None):
87
        """Handle one line of the section.
88
89
        :param line: a line to parse
90
        :param line_number: a line number
91
        :raise: KickstartParseError for invalid lines
92
        """
93
        actions = {
94
            "content-type": self._parse_content_type,
95
            "content-url": self._parse_content_url,
96
            "content-path": self._parse_content_path,
97
            "datastream-id": self._parse_datastream_id,
98
            "profile": self._parse_profile_id,
99
            "xccdf-id": self._parse_xccdf_id,
100
            "xccdf-path": self._parse_content_path,
101
            "cpe-path": self._parse_cpe_path,
102
            "tailoring-path": self._parse_tailoring_path,
103
            "fingerprint": self._parse_fingerprint,
104
            "certificates": self._parse_certificates,
105
            "remediate": self._parse_remediate,
106
        }
107
108
        line = line.strip()
109
        (pre, sep, post) = line.partition("=")
110
        pre = pre.strip()
111
        post = post.strip()
112
        post = post.strip('"')
113
114
        try:
115
            actions[pre](post)
116
        except KeyError:
117
            msg = "Unknown item '%s' for %s addon" % (line, self.name)
118
            raise KickstartParseError(msg)
119
120
    def _parse_content_type(self, value):
121
        value_low = value.lower()
122
        if value_low in common.SUPPORTED_CONTENT_TYPES:
123
            self.policy_data.content_type = value_low
124
        else:
125
            msg = "Unsupported content type '%s' in the %s addon" % (value,
126
                                                                     self.name)
127
            raise KickstartValueError(msg)
128
129
    def _parse_content_url(self, value):
130
        if any(value.startswith(prefix)
131
               for prefix in common.SUPPORTED_URL_PREFIXES):
132
            self.policy_data.content_url = value
133
        else:
134
            msg = "Unsupported url '%s' in the %s addon" % (value, self.name)
135
            raise KickstartValueError(msg)
136
137
    def _parse_datastream_id(self, value):
138
        # need to be checked?
139
        self.policy_data.datastream_id = value
140
141
    def _parse_xccdf_id(self, value):
142
        # need to be checked?
143
        self.policy_data.xccdf_id = value
144
145
    def _parse_profile_id(self, value):
146
        # need to be checked?
147
        self.policy_data.profile_id = value
148
149
    def _parse_content_path(self, value):
150
        # need to be checked?
151
        self.policy_data.content_path = value
152
153
    def _parse_cpe_path(self, value):
154
        # need to be checked?
155
        self.policy_data.cpe_path = value
156
157
    def _parse_tailoring_path(self, value):
158
        # need to be checked?
159
        self.policy_data.tailoring_path = value
160
161
    def _parse_fingerprint(self, value):
162
        if FINGERPRINT_REGEX.match(value) is None:
163
            msg = "Unsupported or invalid fingerprint"
164
            raise KickstartValueError(msg)
165
166
        if utils.get_hashing_algorithm(value) is None:
167
            msg = "Unsupported fingerprint"
168
            raise KickstartValueError(msg)
169
170
        self.policy_data.fingerprint = value
171
172
    def _parse_certificates(self, value):
173
        self.policy_data.certificates = value
174
175
    def _parse_remediate(self, value):
176
        assert value in ("none", "post", "firstboot", "both")
177
        self.policy_data.remediate = value
178
179
    def handle_end(self):
180
        """Handle the end of the section."""
181
        tmpl = "%s missing for the %s addon"
182
183
        # check provided data
184
        if not self.policy_data.content_type:
185
            raise KickstartValueError(tmpl % ("content-type", self.name))
186
187
        if (
188
                self.policy_data.content_type != "scap-security-guide"
189
                and not self.policy_data.content_url):
190
            raise KickstartValueError(tmpl % ("content-url", self.name))
191
192
        if not self.policy_data.profile_id:
193
            self.policy_data.profile_id = "default"
194
195
        if (
196
                self.policy_data.content_type in ("rpm", "archive")
197
                and not self.policy_data.content_path):
198
            msg = "Path to the XCCDF file has to be given if content in RPM "\
199
                  "or archive is used"
200
            raise KickstartValueError(msg)
201
202
        if (
203
                self.policy_data.content_type == "rpm"
204
                and not self.policy_data.content_url.endswith(".rpm")):
205
            msg = "Content type set to RPM, but the content URL doesn't end "\
206
                  "with '.rpm'"
207
            raise KickstartValueError(msg)
208
209
        if self.policy_data.content_type == "archive":
210
            supported_archive = any(
211
                self.policy_data.content_url.endswith(arch_type)
212
                for arch_type in common.SUPPORTED_ARCHIVES
213
            )
214
            if not supported_archive:
215
                msg = "Unsupported archive type of the content "\
216
                      "file '%s'" % self.policy_data.content_url
217
                raise KickstartValueError(msg)
218
219
        # do some initialization magic in case of SSG
220
        if self.policy_data.content_type == "scap-security-guide":
221
            if not common.ssg_available():
222
                msg = "SCAP Security Guide not found on the system"
223
                raise KickstartValueError(msg)
224
225
            self.policy_data.content_path = common.SSG_DIR + common.SSG_CONTENT
226
227
    def __str__(self):
228
        """Generate the kickstart representation.
229
230
        What should end up in the resulting kickstart file,
231
        i.e. string representation of the stored data.
232
233
        :return: a string
234
        """
235
        if not self.policy_data.profile_id:
236
            return ""
237
238
        ret = "%%addon %s" % self.name
239
        ret += "\n%s" % key_value_pair("content-type", self.policy_data.content_type)
240
241
        if self.policy_data.content_url:
242
            ret += "\n%s" % key_value_pair("content-url", self.policy_data.content_url)
243
244
        if self.policy_data.datastream_id:
245
            ret += "\n%s" % key_value_pair("datastream-id", self.policy_data.datastream_id)
246
247
        if self.policy_data.xccdf_id:
248
            ret += "\n%s" % key_value_pair("xccdf-id", self.policy_data.xccdf_id)
249
250
        if (
251
                self.policy_data.content_path
252
                and self.policy_data.content_type != "scap-security-guide"):
253
            ret += "\n%s" % key_value_pair("content-path", self.policy_data.content_path)
254
255
        if self.policy_data.cpe_path:
256
            ret += "\n%s" % key_value_pair("cpe-path", self.policy_data.cpe_path)
257
258
        if self.policy_data.tailoring_path:
259
            ret += "\n%s" % key_value_pair("tailoring-path", self.policy_data.tailoring_path)
260
261
        ret += "\n%s" % key_value_pair("profile", self.policy_data.profile_id)
262
263
        if self.policy_data.fingerprint:
264
            ret += "\n%s" % key_value_pair("fingerprint", self.policy_data.fingerprint)
265
266
        if self.policy_data.certificates:
267
            ret += "\n%s" % key_value_pair("certificates", self.policy_data.certificates)
268
269
        if self.policy_data.remediate:
270
            ret += "\n%s" % key_value_pair("remediate", self.policy_data.remediate)
271
272
        ret += "\n%end\n\n"
273
        return ret
274
275
276
def get_oscap_kickstart_data(name):
277
    class NamedOSCAPKickstartData(OSCAPKickstartData):
278
        def __init__(self):
279
            super().__init__()
280
            self.name = name
281
282
    return NamedOSCAPKickstartData
283
284
285
class OSCAPKickstartSpecification(KickstartSpecification):
286
    """The kickstart specification of the OSCAP service."""
287
288
    addons = {
289
        name: get_oscap_kickstart_data(name) for name in common.ADDON_NAMES
290
    }
291