Completed
Push — master ( f4924a...c72d79 )
by Matěj
16s queued 13s
created

OSCAPService.configure_with_tasks()   A

Complexity

Conditions 2

Size

Total Lines 28
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 18
nop 1
dl 0
loc 28
rs 9.5
c 0
b 0
f 0
1
#
2
# Copyright (C) 2020 Red Hat, Inc.
3
#
4
# This copyrighted material is made available to anyone wishing to use,
5
# modify, copy, or redistribute it subject to the terms and conditions of
6
# the GNU General Public License v.2, or (at your option) any later version.
7
# This program is distributed in the hope that it will be useful, but WITHOUT
8
# ANY WARRANTY expressed or implied, including the implied warranties of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
10
# Public License for more details.  You should have received a copy of the
11
# GNU General Public License along with this program; if not, write to the
12
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
13
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
14
# source code or documentation are not subject to the GNU General Public
15
# License and may only be used or replicated with the express permission of
16
# Red Hat, Inc.
17
#
18
import logging
19
20
from pyanaconda.core.configuration.anaconda import conf
21
from pyanaconda.core.dbus import DBus
22
from pyanaconda.core.signal import Signal
23
from pyanaconda.modules.common.base import KickstartService
24
from pyanaconda.modules.common.containers import TaskContainer
25
from pyanaconda.modules.common.structures.requirement import Requirement
26
27
from org_fedora_oscap import common
28
from org_fedora_oscap.constants import OSCAP
29
from org_fedora_oscap.service.installation import FetchContentTask, CheckFingerprintTask, \
30
    EvaluateRulesTask, InstallContentTask, RemediateSystemTask
31
from org_fedora_oscap.service.kickstart import OSCAPKickstartSpecification
32
from org_fedora_oscap.service.oscap_interface import OSCAPInterface
33
from org_fedora_oscap.structures import PolicyData
34
35
log = logging.getLogger(__name__)
36
37
__all__ = ["OSCAPService"]
38
39
40
class OSCAPService(KickstartService):
41
    """The implementation of the OSCAP service."""
42
43
    def __init__(self):
44
        """Create a service."""
45
        super().__init__()
46
        self._policy_enabled = True
47
        self.policy_enabled_changed = Signal()
48
49
        self._policy_data = PolicyData()
50
        self.policy_data_changed = Signal()
51
52
        self.installation_canceled = Signal()
53
54
    @property
55
    def policy_enabled(self):
56
        """Is the security policy enabled?
57
58
        :return: True or False
59
        """
60
        return self._policy_enabled
61
62
    @policy_enabled.setter
63
    def policy_enabled(self, value):
64
        """Should be the security policy enabled?
65
66
        :param value: True or False
67
        """
68
        self._policy_enabled = value
69
        self.policy_enabled_changed.emit()
70
        log.debug("Policy enabled is set to '%s'.", value)
71
72
    @property
73
    def policy_data(self):
74
        """The security policy data.
75
76
        :return: an instance of PolicyData
77
        """
78
        return self._policy_data
79
80
    @policy_data.setter
81
    def policy_data(self, value):
82
        """Set the security policy data.
83
84
        :param value: an instance of PolicyData
85
        """
86
        self._policy_data = value
87
        self.policy_data_changed.emit()
88
        log.debug("Policy data is set to '%s'.", value)
89
90
    @property
91
    def installation_enabled(self):
92
        """Is the installation enabled?
93
94
        :return: True or False
95
        """
96
        return self.policy_enabled and self.policy_data.profile_id
97
98
    def publish(self):
99
        """Publish the DBus objects."""
100
        TaskContainer.set_namespace(OSCAP.namespace)
101
        DBus.publish_object(OSCAP.object_path, OSCAPInterface(self))
102
        DBus.register_service(OSCAP.service_name)
103
104
    @property
105
    def kickstart_specification(self):
106
        """Return the kickstart specification."""
107
        return OSCAPKickstartSpecification
108
109
    def process_kickstart(self, data):
110
        """Process the kickstart data."""
111
        addon_data = data.addons.org_fedora_oscap
112
        policy_data = PolicyData()
113
114
        policy_data.content_type = addon_data.content_type
115
        policy_data.content_url = addon_data.content_url
116
        policy_data.datastream_id = addon_data.datastream_id
117
        policy_data.xccdf_id = addon_data.xccdf_id
118
        policy_data.profile_id = addon_data.profile_id
119
        policy_data.content_path = addon_data.content_path
120
        policy_data.cpe_path = addon_data.cpe_path
121
        policy_data.tailoring_path = addon_data.tailoring_path
122
        policy_data.fingerprint = addon_data.fingerprint
123
        policy_data.certificates = addon_data.certificates
124
125
        self.policy_data = policy_data
126
127
    def setup_kickstart(self, data):
128
        """Set the given kickstart data."""
129
        policy_data = self.policy_data
130
        addon_data = data.addons.org_fedora_oscap
131
132
        addon_data.content_type = policy_data.content_type
133
        addon_data.content_url = policy_data.content_url
134
        addon_data.datastream_id = policy_data.datastream_id
135
        addon_data.xccdf_id = policy_data.xccdf_id
136
        addon_data.profile_id = policy_data.profile_id
137
        addon_data.content_path = policy_data.content_path
138
        addon_data.cpe_path = policy_data.cpe_path
139
        addon_data.tailoring_path = policy_data.tailoring_path
140
        addon_data.fingerprint = policy_data.fingerprint
141
        addon_data.certificates = policy_data.certificates
142
143
    def collect_requirements(self):
144
        """Return installation requirements.
145
146
        :return: a list of requirements
147
        """
148
        if not self.installation_enabled:
149
            log.debug("The installation is disabled. Skip the requirements.")
150
            return []
151
152
        requirements = [
153
            Requirement.for_package(
154
                package_name="openscap",
155
                reason="Required by oscap add-on."
156
            ),
157
            Requirement.for_package(
158
                package_name="openscap-scanner",
159
                reason="Required by oscap add-on."
160
            )
161
        ]
162
163
        if self.policy_data.content_type == "scap-security-guide":
164
            requirements.append(
165
                Requirement.for_package(
166
                    package_name="scap-security-guide",
167
                    reason="Required by oscap add-on."
168
                )
169
            )
170
171
        return requirements
172
173
    def configure_with_tasks(self):
174
        """Return configuration tasks.
175
176
        :return: a list of tasks
177
        """
178
        if not self.installation_enabled:
179
            log.debug("The installation is disabled. Skip the configuration.")
180
            return []
181
182
        tasks = [
183
            FetchContentTask(
184
                policy_data=self.policy_data,
185
                file_path=common.get_raw_preinst_content_path(self.policy_data),
186
                content_path=common.get_preinst_content_path(self.policy_data),
187
            ),
188
            CheckFingerprintTask(
189
                policy_data=self.policy_data,
190
                file_path=common.get_raw_preinst_content_path(self.policy_data),
191
            ),
192
            EvaluateRulesTask(
193
                policy_data=self.policy_data,
194
                content_path=common.get_preinst_content_path(self.policy_data),
195
                tailoring_path=common.get_preinst_tailoring_path(self.policy_data),
196
            ),
197
        ]
198
199
        self._cancel_tasks_on_error(tasks)
200
        return tasks
201
202
    def install_with_tasks(self):
203
        """Return installation tasks.
204
205
        :return: a list of tasks
206
        """
207
        if not self.installation_enabled:
208
            log.debug("The installation is disabled. Skip the installation.")
209
            return []
210
211
        tasks = [
212
            InstallContentTask(
213
                sysroot=conf.target.system_root,
214
                policy_data=self.policy_data,
215
                file_path=common.get_raw_preinst_content_path(self.policy_data),
216
                content_path=common.get_preinst_content_path(self.policy_data),
217
                tailoring_path=common.get_preinst_tailoring_path(self.policy_data),
218
                target_directory=common.TARGET_CONTENT_DIR,
219
            ),
220
            RemediateSystemTask(
221
                sysroot=conf.target.system_root,
222
                policy_data=self.policy_data,
223
                target_content_path=common.get_postinst_content_path(self.policy_data),
224
                target_tailoring_path=common.get_preinst_tailoring_path(self.policy_data)
225
            )
226
        ]
227
228
        self._cancel_tasks_on_error(tasks)
229
        return tasks
230
231
    def _cancel_tasks_on_error(self, tasks):
232
        """Cancel all tasks on error.
233
234
        If one of the installation tasks fails, we will emit the
235
        installation_canceled signal that will cancel all scheduled
236
        installation tasks.
237
238
        This signal allows to cancel tasks from the install_with_tasks
239
        method based on a failure of a task from the configure_with_tasks
240
        method. All these tasks are created and scheduled before Anaconda
241
        starts to execute them.
242
243
        :param tasks: a list of tasks
244
        """
245
        for task in tasks:
246
            # Cancel the installation if the task fails.
247
            task.failed_signal.connect(self.installation_canceled.emit)
248
249
            # Cancel the task if the installation was canceled.
250
            self.installation_canceled.connect(task.cancel)
251