Completed
Push — rhel9-branch ( 6651d4...608ad6 )
by Jan
24s queued 13s
created

OSCAPKickstartData.__str__()   C

Complexity

Conditions 11

Size

Total Lines 44
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
eloc 26
nop 1
dl 0
loc 44
rs 5.4
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.service.kickstart.OSCAPKickstartData.__str__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import re
20
21
from pyanaconda.core.kickstart import KickstartSpecification
22
from pyanaconda.core.kickstart.addon import AddonData
23
from pykickstart.errors import KickstartValueError, KickstartParseError
24
25
from org_fedora_oscap import common, utils
26
from org_fedora_oscap.structures import PolicyData
27
28
log = logging.getLogger("anaconda")
29
30
__all__ = ["OSCAPKickstartSpecification"]
31
32
33
FINGERPRINT_REGEX = re.compile(r'^[a-z0-9]+$')
34
35
36
def key_value_pair(key, value, indent=4):
37
    return "%s%s = %s" % (indent * " ", key, value)
38
39
40
class AdditionalPropertiesMixin:
41
    @property
42
    def content_name(self) -> str:
43
        return common.get_content_name(self.policy_data)
44
45
    @property
46
    def preinst_content_path(self) -> str:
47
        return common.get_preinst_content_path(self.policy_data)
48
49
    @property
50
    def preinst_tailoring_path(self) -> str:
51
        return common.get_preinst_tailoring_path(self.policy_data)
52
53
    @property
54
    def postinst_content_path(self) -> str:
55
        return common.get_postinst_content_path(self.policy_data)
56
57
    @property
58
    def postinst_tailoring_path(self) -> str:
59
        return common.get_postinst_tailoring_path(self.policy_data)
60
61
    @property
62
    def raw_preinst_content_path(self) -> str:
63
        return common.get_raw_preinst_content_path(self.policy_data)
64
65
66
class OSCAPKickstartData(AddonData, AdditionalPropertiesMixin):
67
    """The kickstart data for the add-on."""
68
69
    def __init__(self):
70
        super().__init__()
71
        self.policy_data = PolicyData()
72
73
        """The name of the %addon section."""
74
        self.name = common.ADDON_NAMES[0]
75
        self.addon_section_present = False
76
77
    def handle_header(self, args, line_number=None):
78
        """Handle the arguments of the %addon line.
79
80
        :param args: a list of additional arguments
81
        :param line_number: a line number
82
        :raise: KickstartParseError for invalid arguments
83
        """
84
        self.addon_section_present = True
85
86
    def handle_line(self, line, line_number=None):
87
        """Handle one line of the section.
88
89
        :param line: a line to parse
90
        :param line_number: a line number
91
        :raise: KickstartParseError for invalid lines
92
        """
93
        actions = {
94
            "content-type": self._parse_content_type,
95
            "content-url": self._parse_content_url,
96
            "content-path": self._parse_content_path,
97
            "datastream-id": self._parse_datastream_id,
98
            "profile": self._parse_profile_id,
99
            "xccdf-id": self._parse_xccdf_id,
100
            "xccdf-path": self._parse_content_path,
101
            "cpe-path": self._parse_cpe_path,
102
            "tailoring-path": self._parse_tailoring_path,
103
            "fingerprint": self._parse_fingerprint,
104
            "certificates": self._parse_certificates,
105
        }
106
107
        line = line.strip()
108
        (pre, sep, post) = line.partition("=")
109
        pre = pre.strip()
110
        post = post.strip()
111
        post = post.strip('"')
112
113
        try:
114
            actions[pre](post)
115
        except KeyError:
116
            msg = "Unknown item '%s' for %s addon" % (line, self.name)
117
            raise KickstartParseError(msg)
118
119
    def _parse_content_type(self, value):
120
        value_low = value.lower()
121
        if value_low in common.SUPPORTED_CONTENT_TYPES:
122
            self.policy_data.content_type = value_low
123
        else:
124
            msg = "Unsupported content type '%s' in the %s addon" % (value,
125
                                                                     self.name)
126
            raise KickstartValueError(msg)
127
128
    def _parse_content_url(self, value):
129
        if any(value.startswith(prefix)
130
               for prefix in common.SUPPORTED_URL_PREFIXES):
131
            self.policy_data.content_url = value
132
        else:
133
            msg = "Unsupported url '%s' in the %s addon" % (value, self.name)
134
            raise KickstartValueError(msg)
135
136
    def _parse_datastream_id(self, value):
137
        # need to be checked?
138
        self.policy_data.datastream_id = value
139
140
    def _parse_xccdf_id(self, value):
141
        # need to be checked?
142
        self.policy_data.xccdf_id = value
143
144
    def _parse_profile_id(self, value):
145
        # need to be checked?
146
        self.policy_data.profile_id = value
147
148
    def _parse_content_path(self, value):
149
        # need to be checked?
150
        self.policy_data.content_path = value
151
152
    def _parse_cpe_path(self, value):
153
        # need to be checked?
154
        self.policy_data.cpe_path = value
155
156
    def _parse_tailoring_path(self, value):
157
        # need to be checked?
158
        self.policy_data.tailoring_path = value
159
160
    def _parse_fingerprint(self, value):
161
        if FINGERPRINT_REGEX.match(value) is None:
162
            msg = "Unsupported or invalid fingerprint"
163
            raise KickstartValueError(msg)
164
165
        if utils.get_hashing_algorithm(value) is None:
166
            msg = "Unsupported fingerprint"
167
            raise KickstartValueError(msg)
168
169
        self.policy_data.fingerprint = value
170
171
    def _parse_certificates(self, value):
172
        self.policy_data.certificates = value
173
174
    def handle_end(self):
175
        """Handle the end of the section."""
176
        tmpl = "%s missing for the %s addon"
177
178
        # check provided data
179
        if not self.policy_data.content_type:
180
            raise KickstartValueError(tmpl % ("content-type", self.name))
181
182
        if (
183
                self.policy_data.content_type != "scap-security-guide"
184
                and not self.policy_data.content_url):
185
            raise KickstartValueError(tmpl % ("content-url", self.name))
186
187
        if not self.policy_data.profile_id:
188
            self.policy_data.profile_id = "default"
189
190
        if (
191
                self.policy_data.content_type in ("rpm", "archive")
192
                and not self.policy_data.content_path):
193
            msg = "Path to the XCCDF file has to be given if content in RPM "\
194
                  "or archive is used"
195
            raise KickstartValueError(msg)
196
197
        if (
198
                self.policy_data.content_type == "rpm"
199
                and not self.policy_data.content_url.endswith(".rpm")):
200
            msg = "Content type set to RPM, but the content URL doesn't end "\
201
                  "with '.rpm'"
202
            raise KickstartValueError(msg)
203
204
        if self.policy_data.content_type == "archive":
205
            supported_archive = any(
206
                self.policy_data.content_url.endswith(arch_type)
207
                for arch_type in common.SUPPORTED_ARCHIVES
208
            )
209
            if not supported_archive:
210
                msg = "Unsupported archive type of the content "\
211
                      "file '%s'" % self.policy_data.content_url
212
                raise KickstartValueError(msg)
213
214
        # do some initialization magic in case of SSG
215
        if self.policy_data.content_type == "scap-security-guide":
216
            if not common.ssg_available():
217
                msg = "SCAP Security Guide not found on the system"
218
                raise KickstartValueError(msg)
219
220
            self.policy_data.content_path = common.SSG_DIR + common.SSG_CONTENT
221
222
    def __str__(self):
223
        """Generate the kickstart representation.
224
225
        What should end up in the resulting kickstart file,
226
        i.e. string representation of the stored data.
227
228
        :return: a string
229
        """
230
        if not self.policy_data.profile_id:
231
            return ""
232
233
        ret = "%%addon %s" % self.name
234
        ret += "\n%s" % key_value_pair("content-type", self.policy_data.content_type)
235
236
        if self.policy_data.content_url:
237
            ret += "\n%s" % key_value_pair("content-url", self.policy_data.content_url)
238
239
        if self.policy_data.datastream_id:
240
            ret += "\n%s" % key_value_pair("datastream-id", self.policy_data.datastream_id)
241
242
        if self.policy_data.xccdf_id:
243
            ret += "\n%s" % key_value_pair("xccdf-id", self.policy_data.xccdf_id)
244
245
        if (
246
                self.policy_data.content_path
247
                and self.policy_data.content_type != "scap-security-guide"):
248
            ret += "\n%s" % key_value_pair("content-path", self.policy_data.content_path)
249
250
        if self.policy_data.cpe_path:
251
            ret += "\n%s" % key_value_pair("cpe-path", self.policy_data.cpe_path)
252
253
        if self.policy_data.tailoring_path:
254
            ret += "\n%s" % key_value_pair("tailoring-path", self.policy_data.tailoring_path)
255
256
        ret += "\n%s" % key_value_pair("profile", self.policy_data.profile_id)
257
258
        if self.policy_data.fingerprint:
259
            ret += "\n%s" % key_value_pair("fingerprint", self.policy_data.fingerprint)
260
261
        if self.policy_data.certificates:
262
            ret += "\n%s" % key_value_pair("certificates", self.policy_data.certificates)
263
264
        ret += "\n%end\n\n"
265
        return ret
266
267
268
def get_oscap_kickstart_data(name):
269
    class NamedOSCAPKickstartData(OSCAPKickstartData):
270
        def __init__(self):
271
            super().__init__()
272
            self.name = name
273
274
    return NamedOSCAPKickstartData
275
276
277
class OSCAPKickstartSpecification(KickstartSpecification):
278
    """The kickstart specification of the OSCAP service."""
279
280
    addons = {
281
        name: get_oscap_kickstart_data(name) for name in common.ADDON_NAMES
282
    }
283