Passed
Pull Request — rhel8-branch (#178)
by Matěj
01:59
created

OSCAPKickstartData.__str__()   C

Complexity

Conditions 11

Size

Total Lines 44
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
eloc 26
nop 1
dl 0
loc 44
rs 5.4
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.service.kickstart.OSCAPKickstartData.__str__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import re
20
21
from pyanaconda.core.kickstart import KickstartSpecification
22
from pyanaconda.core.kickstart.addon import AddonData
23
from pykickstart.errors import KickstartValueError, KickstartParseError
24
25
from org_fedora_oscap import common, utils
26
from org_fedora_oscap.structures import PolicyData
27
28
log = logging.getLogger("anaconda")
29
30
__all__ = ["OSCAPKickstartSpecification"]
31
32
33
FINGERPRINT_REGEX = re.compile(r'^[a-z0-9]+$')
34
35
36
def key_value_pair(key, value, indent=4):
37
    return "%s%s = %s" % (indent * " ", key, value)
38
39
40
class OSCAPKickstartData(AddonData):
41
    """The kickstart data for the add-on."""
42
43
    def __init__(self):
44
        super().__init__()
45
        self.policy_data = PolicyData()
46
47
        """The name of the %addon section."""
48
        self.name = common.ADDON_NAMES[0]
49
        self.addon_section_present = False
50
51
    def handle_header(self, args, line_number=None):
52
        """Handle the arguments of the %addon line.
53
54
        :param args: a list of additional arguments
55
        :param line_number: a line number
56
        :raise: KickstartParseError for invalid arguments
57
        """
58
        self.addon_section_present = True
59
60
    def handle_line(self, line, line_number=None):
61
        """Handle one line of the section.
62
63
        :param line: a line to parse
64
        :param line_number: a line number
65
        :raise: KickstartParseError for invalid lines
66
        """
67
        actions = {
68
            "content-type": self._parse_content_type,
69
            "content-url": self._parse_content_url,
70
            "content-path": self._parse_content_path,
71
            "datastream-id": self._parse_datastream_id,
72
            "profile": self._parse_profile_id,
73
            "xccdf-id": self._parse_xccdf_id,
74
            "xccdf-path": self._parse_content_path,
75
            "cpe-path": self._parse_cpe_path,
76
            "tailoring-path": self._parse_tailoring_path,
77
            "fingerprint": self._parse_fingerprint,
78
            "certificates": self._parse_certificates,
79
        }
80
81
        line = line.strip()
82
        (pre, sep, post) = line.partition("=")
83
        pre = pre.strip()
84
        post = post.strip()
85
        post = post.strip('"')
86
87
        try:
88
            actions[pre](post)
89
        except KeyError:
90
            msg = "Unknown item '%s' for %s addon" % (line, self.name)
91
            raise KickstartParseError(msg)
92
93
    def _parse_content_type(self, value):
94
        value_low = value.lower()
95
        if value_low in common.SUPPORTED_CONTENT_TYPES:
96
            self.policy_data.content_type = value_low
97
        else:
98
            msg = "Unsupported content type '%s' in the %s addon" % (value,
99
                                                                     self.name)
100
            raise KickstartValueError(msg)
101
102
    def _parse_content_url(self, value):
103
        if any(value.startswith(prefix)
104
               for prefix in common.SUPPORTED_URL_PREFIXES):
105
            self.policy_data.content_url = value
106
        else:
107
            msg = "Unsupported url '%s' in the %s addon" % (value, self.name)
108
            raise KickstartValueError(msg)
109
110
    def _parse_datastream_id(self, value):
111
        # need to be checked?
112
        self.policy_data.datastream_id = value
113
114
    def _parse_xccdf_id(self, value):
115
        # need to be checked?
116
        self.policy_data.xccdf_id = value
117
118
    def _parse_profile_id(self, value):
119
        # need to be checked?
120
        self.policy_data.profile_id = value
121
122
    def _parse_content_path(self, value):
123
        # need to be checked?
124
        self.policy_data.content_path = value
125
126
    def _parse_cpe_path(self, value):
127
        # need to be checked?
128
        self.policy_data.cpe_path = value
129
130
    def _parse_tailoring_path(self, value):
131
        # need to be checked?
132
        self.policy_data.tailoring_path = value
133
134
    def _parse_fingerprint(self, value):
135
        if FINGERPRINT_REGEX.match(value) is None:
136
            msg = "Unsupported or invalid fingerprint"
137
            raise KickstartValueError(msg)
138
139
        if utils.get_hashing_algorithm(value) is None:
140
            msg = "Unsupported fingerprint"
141
            raise KickstartValueError(msg)
142
143
        self.policy_data.fingerprint = value
144
145
    def _parse_certificates(self, value):
146
        self.policy_data.certificates = value
147
148
    def handle_end(self):
149
        """Handle the end of the section."""
150
        tmpl = "%s missing for the %s addon"
151
152
        # check provided data
153
        if not self.policy_data.content_type:
154
            raise KickstartValueError(tmpl % ("content-type", self.name))
155
156
        if (
157
                self.policy_data.content_type != "scap-security-guide"
158
                and not self.policy_data.content_url):
159
            raise KickstartValueError(tmpl % ("content-url", self.name))
160
161
        if not self.policy_data.profile_id:
162
            self.policy_data.profile_id = "default"
163
164
        if (
165
                self.policy_data.content_type in ("rpm", "archive")
166
                and not self.policy_data.content_path):
167
            msg = "Path to the XCCDF file has to be given if content in RPM "\
168
                  "or archive is used"
169
            raise KickstartValueError(msg)
170
171
        if (
172
                self.policy_data.content_type == "rpm"
173
                and not self.policy_data.content_url.endswith(".rpm")):
174
            msg = "Content type set to RPM, but the content URL doesn't end "\
175
                  "with '.rpm'"
176
            raise KickstartValueError(msg)
177
178
        if self.policy_data.content_type == "archive":
179
            supported_archive = any(
180
                self.policy_data.content_url.endswith(arch_type)
181
                for arch_type in common.SUPPORTED_ARCHIVES
182
            )
183
            if not supported_archive:
184
                msg = "Unsupported archive type of the content "\
185
                      "file '%s'" % self.policy_data.content_url
186
                raise KickstartValueError(msg)
187
188
        # do some initialization magic in case of SSG
189
        if self.policy_data.content_type == "scap-security-guide":
190
            if not common.ssg_available():
191
                msg = "SCAP Security Guide not found on the system"
192
                raise KickstartValueError(msg)
193
194
            self.policy_data.content_path = common.SSG_DIR + common.SSG_CONTENT
195
196
    def __str__(self):
197
        """Generate the kickstart representation.
198
199
        What should end up in the resulting kickstart file,
200
        i.e. string representation of the stored data.
201
202
        :return: a string
203
        """
204
        if not self.policy_data.profile_id:
205
            return ""
206
207
        ret = "%%addon %s" % self.name
208
        ret += "\n%s" % key_value_pair("content-type", self.policy_data.content_type)
209
210
        if self.policy_data.content_url:
211
            ret += "\n%s" % key_value_pair("content-url", self.policy_data.content_url)
212
213
        if self.policy_data.datastream_id:
214
            ret += "\n%s" % key_value_pair("datastream-id", self.policy_data.datastream_id)
215
216
        if self.policy_data.xccdf_id:
217
            ret += "\n%s" % key_value_pair("xccdf-id", self.policy_data.xccdf_id)
218
219
        if (
220
                self.policy_data.content_path
221
                and self.policy_data.content_type != "scap-security-guide"):
222
            ret += "\n%s" % key_value_pair("content-path", self.policy_data.content_path)
223
224
        if self.policy_data.cpe_path:
225
            ret += "\n%s" % key_value_pair("cpe-path", self.policy_data.cpe_path)
226
227
        if self.policy_data.tailoring_path:
228
            ret += "\n%s" % key_value_pair("tailoring-path", self.policy_data.tailoring_path)
229
230
        ret += "\n%s" % key_value_pair("profile", self.policy_data.profile_id)
231
232
        if self.policy_data.fingerprint:
233
            ret += "\n%s" % key_value_pair("fingerprint", self.policy_data.fingerprint)
234
235
        if self.policy_data.certificates:
236
            ret += "\n%s" % key_value_pair("certificates", self.policy_data.certificates)
237
238
        ret += "\n%end\n\n"
239
        return ret
240
241
242
def get_oscap_kickstart_data(name):
243
    class NamedOSCAPKickstartData(OSCAPKickstartData):
244
        def __init__(self):
245
            super().__init__()
246
            self.name = name
247
248
    return NamedOSCAPKickstartData
249
250
251
class OSCAPKickstartSpecification(KickstartSpecification):
252
    """The kickstart specification of the OSCAP service."""
253
254
    addons = {
255
        name: get_oscap_kickstart_data(name) for name in common.ADDON_NAMES
256
    }
257