Passed
Pull Request — master (#178)
by Matěj
01:01
created

OSCAPService.process_kickstart()   B

Complexity

Conditions 5

Size

Total Lines 26
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 19
nop 2
dl 0
loc 26
rs 8.9833
c 0
b 0
f 0
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
import warnings
20
21
from pykickstart.errors import KickstartDeprecationWarning
22
from pyanaconda.core.configuration.anaconda import conf
23
from pyanaconda.core.dbus import DBus
24
from pyanaconda.core.signal import Signal
25
from pyanaconda.modules.common.base import KickstartService
26
from pyanaconda.modules.common.containers import TaskContainer
27
from pyanaconda.modules.common.structures.requirement import Requirement
28
29
from org_fedora_oscap import common
30
from org_fedora_oscap.constants import OSCAP
31
from org_fedora_oscap.service.installation import PrepareValidContent, \
32
    EvaluateRulesTask, InstallContentTask, RemediateSystemTask
33
from org_fedora_oscap.service.kickstart import OSCAPKickstartSpecification, KickstartParseError
34
from org_fedora_oscap.service.oscap_interface import OSCAPInterface
35
from org_fedora_oscap.structures import PolicyData
36
37
log = logging.getLogger("anaconda")
38
39
__all__ = ["OSCAPService"]
40
41
42
class OSCAPService(KickstartService):
43
    """The implementation of the OSCAP service."""
44
45
    def __init__(self):
46
        """Create a service."""
47
        super().__init__()
48
        self._policy_enabled = True
49
        self.policy_enabled_changed = Signal()
50
51
        self._policy_data = PolicyData()
52
        self.policy_data_changed = Signal()
53
54
        self.installation_canceled = Signal()
55
56
        self.canonical_addon_name = common.ADDON_NAMES[0]
57
58
    @property
59
    def policy_enabled(self):
60
        """Is the security policy enabled?
61
62
        :return: True or False
63
        """
64
        return self._policy_enabled
65
66
    @policy_enabled.setter
67
    def policy_enabled(self, value):
68
        """Should be the security policy enabled?
69
70
        :param value: True or False
71
        """
72
        self._policy_enabled = value
73
        self.policy_enabled_changed.emit()
74
        log.debug("OSCAP Addon: Policy enabled is set to '%s'.", value)
75
76
    @property
77
    def policy_data(self):
78
        """The security policy data.
79
80
        :return: an instance of PolicyData
81
        """
82
        return self._policy_data
83
84
    @policy_data.setter
85
    def policy_data(self, value):
86
        """Set the security policy data.
87
88
        :param value: an instance of PolicyData
89
        """
90
        self._policy_data = value
91
        self.policy_data_changed.emit()
92
        log.debug("OSCAP Addon: Policy data is set to '%s'.", value)
93
94
    @property
95
    def installation_enabled(self):
96
        """Is the installation enabled?
97
98
        :return: True or False
99
        """
100
        return self.policy_enabled and self.policy_data.profile_id
101
102
    def publish(self):
103
        """Publish the DBus objects."""
104
        TaskContainer.set_namespace(OSCAP.namespace)
105
        DBus.publish_object(OSCAP.object_path, OSCAPInterface(self))
106
        DBus.register_service(OSCAP.service_name)
107
108
    @property
109
    def kickstart_specification(self):
110
        """Return the kickstart specification."""
111
        return OSCAPKickstartSpecification
112
113
    def process_kickstart(self, data):
114
        """Process the kickstart data."""
115
        preferred_section_header = f"%addon {self.canonical_addon_name}"
116
        all_addon_data = [
117
            getattr(data.addons, name) for name in common.ADDON_NAMES]
118
        relevant_data = [d for d in all_addon_data if d.addon_section_present]
119
        if len(relevant_data) > 1:
120
            msg = common._(
121
                "You have used more than one oscap addon sections in the kickstart. "
122
                f"Please use only one, preferably '{preferred_section_header}'.")
123
            raise KickstartParseError(msg)
124
        if len(relevant_data) == 0:
125
            addon_data = all_addon_data[0]
126
        else:
127
            addon_data = relevant_data[0]
128
129
        self.policy_data = addon_data.policy_data
130
131
        if (common.COMPLAIN_ABOUT_NON_CANONICAL_NAMES
132
                and addon_data.name != self.canonical_addon_name):
133
            used_section_header = f"%addon {addon_data.name}"
134
            msg = common._(
135
                f"You have configured the oscap addon using '{used_section_header}' section. "
136
                f"Please update your configuration and use '{preferred_section_header}'. "
137
                "Support for legacy sections will be removed in the future major version.")
138
            warnings.warn(msg, KickstartDeprecationWarning)
139
140
    def setup_kickstart(self, data):
141
        """Set the given kickstart data."""
142
        policy_data = self.policy_data
143
        addon_data = getattr(data.addons, self.canonical_addon_name)
144
145
        addon_data.policy_data = policy_data
146
147
    def collect_requirements(self):
148
        """Return installation requirements.
149
150
        :return: a list of requirements
151
        """
152
        if not self.installation_enabled:
153
            log.debug("OSCAP Addon: The installation is disabled. Skip the requirements.")
154
            return []
155
156
        requirements = [
157
            Requirement.for_package(
158
                package_name="openscap",
159
                reason="Required by oscap add-on."
160
            ),
161
            Requirement.for_package(
162
                package_name="openscap-scanner",
163
                reason="Required by oscap add-on."
164
            )
165
        ]
166
167
        if self.policy_data.content_type == "scap-security-guide":
168
            requirements.append(
169
                Requirement.for_package(
170
                    package_name="scap-security-guide",
171
                    reason="Required by oscap add-on."
172
                )
173
            )
174
175
        return requirements
176
177
    def configure_with_tasks(self):
178
        """Return configuration tasks.
179
180
        :return: a list of tasks
181
        """
182
        if not self.installation_enabled:
183
            log.debug("OSCAP Addon: The installation is disabled. Skip the configuration.")
184
            return []
185
186
        tasks = [
187
            PrepareValidContent(
188
                policy_data=self.policy_data,
189
                file_path=common.get_raw_preinst_content_path(self.policy_data),
190
                content_path=common.get_preinst_content_path(self.policy_data),
191
            ),
192
            EvaluateRulesTask(
193
                policy_data=self.policy_data,
194
                content_path=common.get_preinst_content_path(self.policy_data),
195
                tailoring_path=common.get_preinst_tailoring_path(self.policy_data),
196
            ),
197
        ]
198
199
        self._cancel_tasks_on_error(tasks)
200
        return tasks
201
202
    def install_with_tasks(self):
203
        """Return installation tasks.
204
205
        :return: a list of tasks
206
        """
207
        if not self.installation_enabled:
208
            log.debug("OSCAP Addon: The installation is disabled. Skip the installation.")
209
            return []
210
211
        tasks = [
212
            InstallContentTask(
213
                sysroot=conf.target.system_root,
214
                policy_data=self.policy_data,
215
                file_path=common.get_raw_preinst_content_path(self.policy_data),
216
                content_path=common.get_preinst_content_path(self.policy_data),
217
                tailoring_path=common.get_preinst_tailoring_path(self.policy_data),
218
                target_directory=common.TARGET_CONTENT_DIR,
219
            ),
220
            RemediateSystemTask(
221
                sysroot=conf.target.system_root,
222
                policy_data=self.policy_data,
223
                target_content_path=common.get_postinst_content_path(self.policy_data),
224
                target_tailoring_path=common.get_preinst_tailoring_path(self.policy_data)
225
            )
226
        ]
227
228
        self._cancel_tasks_on_error(tasks)
229
        return tasks
230
231
    def _cancel_tasks_on_error(self, tasks):
232
        """Cancel all tasks on error.
233
234
        If one of the installation tasks fails, we will emit the
235
        installation_canceled signal that will cancel all scheduled
236
        installation tasks.
237
238
        This signal allows to cancel tasks from the install_with_tasks
239
        method based on a failure of a task from the configure_with_tasks
240
        method. All these tasks are created and scheduled before Anaconda
241
        starts to execute them.
242
243
        :param tasks: a list of tasks
244
        """
245
        for task in tasks:
246
            # Cancel the installation if the task fails.
247
            task.failed_signal.connect(self.installation_canceled.emit)
248
249
            # Cancel the task if the installation was canceled.
250
            self.installation_canceled.connect(task.cancel)
251