Completed
Push — master ( c97ece...6b05e2 )
by Jan
27s queued 17s
created

org_fedora_oscap.service.kickstart   B

Complexity

Total Complexity 46

Size/Duplication

Total Lines 264
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 153
dl 0
loc 264
rs 8.72
c 0
b 0
f 0
wmc 46

16 Methods

Rating   Name   Duplication   Size   Complexity  
A OSCAPKickstartData._parse_cpe_path() 0 3 1
A OSCAPKickstartData._parse_datastream_id() 0 3 1
A OSCAPKickstartData._parse_fingerprint() 0 10 3
A OSCAPKickstartData._parse_content_type() 0 8 2
A OSCAPKickstartData._parse_certificates() 0 2 1
A OSCAPKickstartData._parse_profile_id() 0 3 1
A OSCAPKickstartData._parse_tailoring_path() 0 3 1
A OSCAPKickstartData._parse_content_path() 0 3 1
A OSCAPKickstartData._parse_content_url() 0 7 2
A OSCAPKickstartData._parse_xccdf_id() 0 3 1
A OSCAPKickstartData.handle_header() 0 8 1
A OSCAPKickstartData.__init__() 0 7 1
D OSCAPKickstartData.handle_end() 0 47 13
D OSCAPKickstartData.__str__() 0 47 12
A OSCAPKickstartData.handle_line() 0 33 2
A OSCAPKickstartData._parse_remediate() 0 3 1

2 Functions

Rating   Name   Duplication   Size   Complexity  
A key_value_pair() 0 2 1
A get_oscap_kickstart_data() 0 7 1

How to fix   Complexity   

Complexity

Complex classes like org_fedora_oscap.service.kickstart often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import re
20
21
from pyanaconda.core.kickstart import KickstartSpecification
22
from pyanaconda.core.kickstart.addon import AddonData
23
from pykickstart.errors import KickstartValueError, KickstartParseError
24
25
from org_fedora_oscap import common, utils
26
from org_fedora_oscap.structures import PolicyData
27
28
log = logging.getLogger("anaconda")
29
30
__all__ = ["OSCAPKickstartSpecification"]
31
32
33
FINGERPRINT_REGEX = re.compile(r'^[a-z0-9]+$')
34
35
36
def key_value_pair(key, value, indent=4):
37
    return "%s%s = %s" % (indent * " ", key, value)
38
39
40
class OSCAPKickstartData(AddonData):
41
    """The kickstart data for the add-on."""
42
43
    def __init__(self):
44
        super().__init__()
45
        self.policy_data = PolicyData()
46
47
        """The name of the %addon section."""
48
        self.name = common.ADDON_NAMES[0]
49
        self.addon_section_present = False
50
51
    def handle_header(self, args, line_number=None):
52
        """Handle the arguments of the %addon line.
53
54
        :param args: a list of additional arguments
55
        :param line_number: a line number
56
        :raise: KickstartParseError for invalid arguments
57
        """
58
        self.addon_section_present = True
59
60
    def handle_line(self, line, line_number=None):
61
        """Handle one line of the section.
62
63
        :param line: a line to parse
64
        :param line_number: a line number
65
        :raise: KickstartParseError for invalid lines
66
        """
67
        actions = {
68
            "content-type": self._parse_content_type,
69
            "content-url": self._parse_content_url,
70
            "content-path": self._parse_content_path,
71
            "datastream-id": self._parse_datastream_id,
72
            "profile": self._parse_profile_id,
73
            "xccdf-id": self._parse_xccdf_id,
74
            "xccdf-path": self._parse_content_path,
75
            "cpe-path": self._parse_cpe_path,
76
            "tailoring-path": self._parse_tailoring_path,
77
            "fingerprint": self._parse_fingerprint,
78
            "certificates": self._parse_certificates,
79
            "remediate": self._parse_remediate,
80
        }
81
82
        line = line.strip()
83
        (pre, sep, post) = line.partition("=")
84
        pre = pre.strip()
85
        post = post.strip()
86
        post = post.strip('"')
87
88
        try:
89
            actions[pre](post)
90
        except KeyError:
91
            msg = "Unknown item '%s' for %s addon" % (line, self.name)
92
            raise KickstartParseError(msg)
93
94
    def _parse_content_type(self, value):
95
        value_low = value.lower()
96
        if value_low in common.SUPPORTED_CONTENT_TYPES:
97
            self.policy_data.content_type = value_low
98
        else:
99
            msg = "Unsupported content type '%s' in the %s addon" % (value,
100
                                                                     self.name)
101
            raise KickstartValueError(msg)
102
103
    def _parse_content_url(self, value):
104
        if any(value.startswith(prefix)
105
               for prefix in common.SUPPORTED_URL_PREFIXES):
106
            self.policy_data.content_url = value
107
        else:
108
            msg = "Unsupported url '%s' in the %s addon" % (value, self.name)
109
            raise KickstartValueError(msg)
110
111
    def _parse_datastream_id(self, value):
112
        # need to be checked?
113
        self.policy_data.datastream_id = value
114
115
    def _parse_xccdf_id(self, value):
116
        # need to be checked?
117
        self.policy_data.xccdf_id = value
118
119
    def _parse_profile_id(self, value):
120
        # need to be checked?
121
        self.policy_data.profile_id = value
122
123
    def _parse_content_path(self, value):
124
        # need to be checked?
125
        self.policy_data.content_path = value
126
127
    def _parse_cpe_path(self, value):
128
        # need to be checked?
129
        self.policy_data.cpe_path = value
130
131
    def _parse_tailoring_path(self, value):
132
        # need to be checked?
133
        self.policy_data.tailoring_path = value
134
135
    def _parse_fingerprint(self, value):
136
        if FINGERPRINT_REGEX.match(value) is None:
137
            msg = "Unsupported or invalid fingerprint"
138
            raise KickstartValueError(msg)
139
140
        if utils.get_hashing_algorithm(value) is None:
141
            msg = "Unsupported fingerprint"
142
            raise KickstartValueError(msg)
143
144
        self.policy_data.fingerprint = value
145
146
    def _parse_certificates(self, value):
147
        self.policy_data.certificates = value
148
149
    def _parse_remediate(self, value):
150
        assert value in ("none", "post", "firstboot", "both")
151
        self.policy_data.remediate = value
152
153
    def handle_end(self):
154
        """Handle the end of the section."""
155
        tmpl = "%s missing for the %s addon"
156
157
        # check provided data
158
        if not self.policy_data.content_type:
159
            raise KickstartValueError(tmpl % ("content-type", self.name))
160
161
        if (
162
                self.policy_data.content_type != "scap-security-guide"
163
                and not self.policy_data.content_url):
164
            raise KickstartValueError(tmpl % ("content-url", self.name))
165
166
        if not self.policy_data.profile_id:
167
            self.policy_data.profile_id = "default"
168
169
        if (
170
                self.policy_data.content_type in ("rpm", "archive")
171
                and not self.policy_data.content_path):
172
            msg = "Path to the XCCDF file has to be given if content in RPM "\
173
                  "or archive is used"
174
            raise KickstartValueError(msg)
175
176
        if (
177
                self.policy_data.content_type == "rpm"
178
                and not self.policy_data.content_url.endswith(".rpm")):
179
            msg = "Content type set to RPM, but the content URL doesn't end "\
180
                  "with '.rpm'"
181
            raise KickstartValueError(msg)
182
183
        if self.policy_data.content_type == "archive":
184
            supported_archive = any(
185
                self.policy_data.content_url.endswith(arch_type)
186
                for arch_type in common.SUPPORTED_ARCHIVES
187
            )
188
            if not supported_archive:
189
                msg = "Unsupported archive type of the content "\
190
                      "file '%s'" % self.policy_data.content_url
191
                raise KickstartValueError(msg)
192
193
        # do some initialization magic in case of SSG
194
        if self.policy_data.content_type == "scap-security-guide":
195
            if not common.ssg_available():
196
                msg = "SCAP Security Guide not found on the system"
197
                raise KickstartValueError(msg)
198
199
            self.policy_data.content_path = common.SSG_DIR + common.SSG_CONTENT
200
201
    def __str__(self):
202
        """Generate the kickstart representation.
203
204
        What should end up in the resulting kickstart file,
205
        i.e. string representation of the stored data.
206
207
        :return: a string
208
        """
209
        if not self.policy_data.profile_id:
210
            return ""
211
212
        ret = "%%addon %s" % self.name
213
        ret += "\n%s" % key_value_pair("content-type", self.policy_data.content_type)
214
215
        if self.policy_data.content_url:
216
            ret += "\n%s" % key_value_pair("content-url", self.policy_data.content_url)
217
218
        if self.policy_data.datastream_id:
219
            ret += "\n%s" % key_value_pair("datastream-id", self.policy_data.datastream_id)
220
221
        if self.policy_data.xccdf_id:
222
            ret += "\n%s" % key_value_pair("xccdf-id", self.policy_data.xccdf_id)
223
224
        if (
225
                self.policy_data.content_path
226
                and self.policy_data.content_type != "scap-security-guide"):
227
            ret += "\n%s" % key_value_pair("content-path", self.policy_data.content_path)
228
229
        if self.policy_data.cpe_path:
230
            ret += "\n%s" % key_value_pair("cpe-path", self.policy_data.cpe_path)
231
232
        if self.policy_data.tailoring_path:
233
            ret += "\n%s" % key_value_pair("tailoring-path", self.policy_data.tailoring_path)
234
235
        ret += "\n%s" % key_value_pair("profile", self.policy_data.profile_id)
236
237
        if self.policy_data.fingerprint:
238
            ret += "\n%s" % key_value_pair("fingerprint", self.policy_data.fingerprint)
239
240
        if self.policy_data.certificates:
241
            ret += "\n%s" % key_value_pair("certificates", self.policy_data.certificates)
242
243
        if self.policy_data.remediate:
244
            ret += "\n%s" % key_value_pair("remediate", self.policy_data.remediate)
245
246
        ret += "\n%end\n\n"
247
        return ret
248
249
250
def get_oscap_kickstart_data(name):
251
    class NamedOSCAPKickstartData(OSCAPKickstartData):
252
        def __init__(self):
253
            super().__init__()
254
            self.name = name
255
256
    return NamedOSCAPKickstartData
257
258
259
class OSCAPKickstartSpecification(KickstartSpecification):
260
    """The kickstart specification of the OSCAP service."""
261
262
    addons = {
263
        name: get_oscap_kickstart_data(name) for name in common.ADDON_NAMES
264
    }
265