Passed
Pull Request — master (#3212)
by Matěj
02:53
created

DockerTestEnv._create_new_image()   A

Complexity

Conditions 2

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 3
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import contextlib
4
import sys
5
import os
6
import time
7
8
import docker
9
10
import ssg_test_suite
11
from ssg_test_suite.virt import SnapshotStack
12
from ssg_test_suite import common
13
14
15
class SavedState(object):
16
    def __init__(self, environment, name):
17
        self.name = name
18
        self.environment = environment
19
        self.initial_running_state = True
20
21
    def map_on_top(self, function, args_list):
22
        current_running_state = self.initial_running_state
23
        function(* args_list[0])
24
        for idx, args in enumerate(args_list[1:], 1):
25
            current_running_state = self.environment.reset_state_to(
26
                self.name, "running_%d" % idx)
27
            function(* args)
28
        current_running_state = self.environment.reset_state_to(
29
            self.name, "running_last")
30
31
    @classmethod
32
    @contextlib.contextmanager
33
    def create_from_environment(cls, environment, state_name):
34
        state = cls(environment, state_name)
35
36
        state_handle = environment._save_state(state_name)
37
        exception_to_reraise = None
38
        try:
39
            yield state
40
        except KeyboardInterrupt as exc:
41
            print("Hang on for a minute, cleaning up the saved state '{0}'."
42
                  .format(state_name), file=sys.stderr)
43
            exception_to_reraise = exc
44
        finally:
45
            try:
46
                environment._delete_saved_state(state_handle)
47
            except KeyboardInterrupt as exc:
48
                print("Hang on for a minute, cleaning up the saved state '{0}'."
49
                      .format(state_name), file=sys.stderr)
50
                environment._delete_saved_state(state_handle)
51
            finally:
52
                if exception_to_reraise:
53
                    raise exception_to_reraise
54
55
56
class TestEnv(object):
57
    def __init__(self, scanning_mode):
58
        self.running_state_base = None
59
        self.running_state = None
60
61
        self.scanning_mode = scanning_mode
62
63
    def start(self):
64
        """
65
        Run the environment and
66
        ensure that the environment will not be permanently modified
67
        by subsequent procedures.
68
        """
69
        pass
70
71
    def finalize(self):
72
        """
73
        Perform the environment cleanup and shut it down.
74
        """
75
        pass
76
77
    def reset_state_to(self, state_name, new_running_state_name):
78
        raise NotImplementedError()
79
80
    def _save_state(self, state_name):
81
        self.running_state_base = state_name
82
        running_state = self.running_state
83
        return None
84
85
    def _delete_saved_state(self, state_name):
86
        raise NotImplementedError()
87
88
    def _stop_state(self, state):
89
        pass
90
91
    def _oscap_ssh_base_arguments(self):
92
        full_hostname = 'root@{}'.format(self.domain_ip)
93
        command_base = []
94
        command_base.extend(
95
            ['oscap-ssh', full_hostname, '22',
96
             'xccdf', 'eval'])
97
        return command_base
98
99
    def scan(self, args, verbose_path):
100
        if self.scanning_mode == "online":
101
            return self.online_scan(args, verbose_path)
102
        elif self.scanning_mode == "offline":
103
            return self.offline_scan(args, verbose_path)
104
        else:
105
            msg = "Invalid scanning mode {mode}".format(mode=self.scanning_mode)
106
            raise KeyError(msg)
107
108
    def online_scan(self, args, verbose_path):
109
        command_list = self._oscap_ssh_base_arguments() + args
110
111
        env = dict(SSH_ADDITIONAL_OPTIONS=" ".join(common.IGNORE_KNOWN_HOSTS_OPTIONS))
112
        env.update(os.environ)
113
114
        return common.run_cmd_local(command_list, verbose_path, env=env)
115
116
    def offline_scan(self, args, verbose_path):
117
        raise NotImplementedError()
118
119
120
class VMTestEnv(TestEnv):
121
    name = "libvirt-based"
122
123
    def __init__(self, mode, hypervisor, domain_name):
124
        super(VMTestEnv, self).__init__(mode)
125
        self.domain = None
126
127
        self.hypervisor = hypervisor
128
        self.domain_name = domain_name
129
        self.snapshot_stack = None
130
131
        self._origin = None
132
133
    def start(self):
134
        self.domain = ssg_test_suite.virt.connect_domain(
135
            self.hypervisor, self.domain_name)
136
        self.snapshot_stack = SnapshotStack(self.domain)
137
138
        ssg_test_suite.virt.start_domain(self.domain)
139
        self.domain_ip = ssg_test_suite.virt.determine_ip(self.domain)
140
141
        self._origin = self._save_state("origin")
142
143
    def finalize(self):
144
        self._delete_saved_state(self._origin)
145
        # self.domain.shutdown()
146
        # logging.debug('Shut the domain off')
147
148
    def reset_state_to(self, state_name, new_running_state_name):
149
        last_snapshot_name = self.snapshot_stack.snapshot_stack[-1].getName()
150
        assert last_snapshot_name == state_name, (
151
            "You can only revert to the last snapshot, which is {0}, not {1}"
152
            .format(last_snapshot_name, state_name))
153
        state = self.snapshot_stack.revert(delete=False)
154
        return state
155
156
    def discard_running_state(self, state_handle):
157
        pass
158
159
    def _save_state(self, state_name):
160
        super(VMTestEnv, self)._save_state(state_name)
161
        state = self.snapshot_stack.create(state_name)
162
        return state
163
164
    def _delete_saved_state(self, snapshot):
165
        self.snapshot_stack.revert()
166
167
    def _local_oscap_check_base_arguments(self):
168
        command_base = []
169
        command_base.extend(
170
            ['oscap-vm', "domain", self.domain_name,
171
             'xccdf', 'eval'])
172
        return command_base
173
174
    def offline_scan(self, args, verbose_path):
175
        command_list = self._local_oscap_check_base_arguments() + args
176
177
        return common.run_cmd_local(command_list, verbose_path)
178
179
180
class DockerTestEnv(TestEnv):
181
    name = "container-based"
182
183
    def __init__(self, mode, image_name):
184
        super(DockerTestEnv, self).__init__(mode)
185
186
        self._name_stem = "ssg_test"
187
188
        try:
189
            self.client = docker.from_env(version="auto")
190
            self.client.ping()
191
        except Exception as exc:
192
            msg = (
193
                "Unable to start the Docker test environment, "
194
                "is the Docker service started "
195
                "and do you have rights to access it?"
196
                .format(str(exc)))
197
            raise RuntimeError(msg)
198
199
        self.base_image = image_name
200
        self.created_images = []
201
        self.containers = []
202
203
    def start(self):
204
        self.run_container(self.base_image)
205
206
    def finalize(self):
207
        self._terminate_current_running_container_if_applicable()
208
209
    def image_stem2fqn(self, stem):
210
        image_name = "{0}_{1}".format(self.base_image, stem)
211
        return image_name
212
213
    @property
214
    def current_container(self):
215
        if self.containers:
216
            return self.containers[-1]
217
        return None
218
219
    @property
220
    def current_image(self):
221
        if self.created_images:
222
            return self.created_images[-1]
223
        return self.base_image
224
225
    def _create_new_image(self, from_container, name):
226
        new_image_name = self.image_stem2fqn(name)
227
        if not from_container:
228
            from_container = self.run_container(self.current_image)
229
        from_container.commit(repository=new_image_name)
230
        self.created_images.append(new_image_name)
231
        return new_image_name
232
233
    def _save_state(self, state_name):
234
        super(DockerTestEnv, self)._save_state(state_name)
235
        state = self._create_new_image(self.current_container, state_name)
236
        return state
237
238
    def run_container(self, image_name, container_name="running"):
239
        new_container = self._new_container_from_image(image_name, container_name)
240
        self.containers.append(new_container)
241
242
        # Get the container time to fully start its service
243
        time.sleep(0.2)
244
245
        new_container.reload()
246
        self.domain_ip = new_container.attrs["NetworkSettings"]["Networks"]["bridge"]["IPAddress"]
247
248
        return new_container
249
250
    def reset_state_to(self, state_name, new_running_state_name):
251
        self._terminate_current_running_container_if_applicable()
252
        image_name = self.image_stem2fqn(state_name)
253
254
        new_container = self.run_container(image_name, new_running_state_name)
255
256
        return new_container
257
258
    def _find_image_by_name(self, image_name):
259
        return self.client.images.get(image_name)
260
261
    def _new_container_from_image(self, image_name, container_name):
262
        img = self._find_image_by_name(image_name)
263
        result = self.client.containers.run(
264
            img, "/usr/sbin/sshd -D",
265
            name="{0}_{1}".format(self._name_stem, container_name), ports={"22": None},
266
            detach=True)
267
        return result
268
269
    def _terminate_current_running_container_if_applicable(self):
270
        if self.containers:
271
            running_state = self.containers.pop()
272
            running_state.stop()
273
            running_state.remove()
274
275
    def _delete_saved_state(self, image):
276
        self._terminate_current_running_container_if_applicable()
277
278
        assert self.created_images
279
280
        associated_image = self.created_images.pop()
281
        assert associated_image == image
282
        self.client.images.remove(associated_image)
283
284
    def discard_running_state(self, state_handle):
285
        self._terminate_current_running_container_if_applicable()
286
287
    def _local_oscap_check_base_arguments(self):
288
        command_base = []
289
        command_base.extend(
290
            ['oscap-docker', "container", self.current_container.id,
291
             'xccdf', 'eval'])
292
        return command_base
293
294
    def offline_scan(self, args, verbose_path):
295
        command_list = self._local_oscap_check_base_arguments() + args
296
297
        return common.run_cmd_local(command_list, verbose_path)
298