Passed
Push — master ( c31780...4a3629 )
by Matěj
01:09 queued 13s
created

DockerTestEnv._get_container_ip()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 4
ccs 0
cts 4
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import contextlib
4
import sys
5
import os
6
import time
7
import subprocess
8
9
import ssg_test_suite
10
from ssg_test_suite.virt import SnapshotStack
11
from ssg_test_suite import common
12
13
14
class SavedState(object):
15
    def __init__(self, environment, name):
16
        self.name = name
17
        self.environment = environment
18
        self.initial_running_state = True
19
20
    def map_on_top(self, function, args_list):
21
        if not args_list:
22
            return
23
        current_running_state = self.initial_running_state
24
        function(* args_list[0])
25
        for idx, args in enumerate(args_list[1:], 1):
26
            current_running_state = self.environment.reset_state_to(
27
                self.name, "running_%d" % idx)
28
            function(* args)
29
        current_running_state = self.environment.reset_state_to(
30
            self.name, "running_last")
31
32
    @classmethod
33
    @contextlib.contextmanager
34
    def create_from_environment(cls, environment, state_name):
35
        state = cls(environment, state_name)
36
37
        state_handle = environment.save_state(state_name)
38
        exception_to_reraise = None
39
        try:
40
            yield state
41
        except KeyboardInterrupt as exc:
42
            print("Hang on for a minute, cleaning up the saved state '{0}'."
43
                  .format(state_name), file=sys.stderr)
44
            exception_to_reraise = exc
45
        finally:
46
            try:
47
                environment._delete_saved_state(state_handle)
48
            except KeyboardInterrupt as exc:
49
                print("Hang on for a minute, cleaning up the saved state '{0}'."
50
                      .format(state_name), file=sys.stderr)
51
                environment._delete_saved_state(state_handle)
52
            finally:
53
                if exception_to_reraise:
54
                    raise exception_to_reraise
55
56
57
class TestEnv(object):
58
    def __init__(self, scanning_mode):
59
        self.running_state_base = None
60
        self.running_state = None
61
62
        self.scanning_mode = scanning_mode
63
        self.backend = None
64
65
    def start(self):
66
        """
67
        Run the environment and
68
        ensure that the environment will not be permanently modified
69
        by subsequent procedures.
70
        """
71
        pass
72
73
    def finalize(self):
74
        """
75
        Perform the environment cleanup and shut it down.
76
        """
77
        pass
78
79
    def reset_state_to(self, state_name, new_running_state_name):
80
        raise NotImplementedError()
81
82
    def save_state(self, state_name):
83
        self.running_state_base = state_name
84
        running_state = self.running_state
85
        return self._save_state(state_name)
86
87
    def _delete_saved_state(self, state_name):
88
        raise NotImplementedError()
89
90
    def _stop_state(self, state):
91
        pass
92
93
    def _oscap_ssh_base_arguments(self):
94
        full_hostname = 'root@{}'.format(self.domain_ip)
95
        return ['oscap-ssh', full_hostname, '22', 'xccdf', 'eval']
96
97
    def scan(self, args, verbose_path):
98
        if self.scanning_mode == "online":
99
            return self.online_scan(args, verbose_path)
100
        elif self.scanning_mode == "offline":
101
            return self.offline_scan(args, verbose_path)
102
        else:
103
            msg = "Invalid scanning mode {mode}".format(mode=self.scanning_mode)
104
            raise KeyError(msg)
105
106
    def online_scan(self, args, verbose_path):
107
        command_list = self._oscap_ssh_base_arguments() + args
108
109
        env = dict(SSH_ADDITIONAL_OPTIONS=" ".join(common.IGNORE_KNOWN_HOSTS_OPTIONS))
110
        env.update(os.environ)
111
112
        return common.run_cmd_local(command_list, verbose_path, env=env)
113
114
    def offline_scan(self, args, verbose_path):
115
        raise NotImplementedError()
116
117
118
class VMTestEnv(TestEnv):
119
    name = "libvirt-based"
120
121
    def __init__(self, mode, hypervisor, domain_name):
122
        super(VMTestEnv, self).__init__(mode)
123
        self.domain = None
124
125
        self.hypervisor = hypervisor
126
        self.domain_name = domain_name
127
        self.snapshot_stack = None
128
129
        self._origin = None
130
131
    def start(self):
132
        self.domain = ssg_test_suite.virt.connect_domain(
133
            self.hypervisor, self.domain_name)
134
        self.snapshot_stack = SnapshotStack(self.domain)
135
136
        ssg_test_suite.virt.start_domain(self.domain)
137
        self.domain_ip = ssg_test_suite.virt.determine_ip(self.domain)
138
139
        self._origin = self._save_state("origin")
140
141
    def finalize(self):
142
        self._delete_saved_state(self._origin)
143
        # self.domain.shutdown()
144
        # logging.debug('Shut the domain off')
145
146
    def reset_state_to(self, state_name, new_running_state_name):
147
        last_snapshot_name = self.snapshot_stack.snapshot_stack[-1].getName()
148
        assert last_snapshot_name == state_name, (
149
            "You can only revert to the last snapshot, which is {0}, not {1}"
150
            .format(last_snapshot_name, state_name))
151
        state = self.snapshot_stack.revert(delete=False)
152
        return state
153
154
    def _save_state(self, state_name):
155
        state = self.snapshot_stack.create(state_name)
156
        return state
157
158
    def _delete_saved_state(self, snapshot):
159
        self.snapshot_stack.revert()
160
161
    def _local_oscap_check_base_arguments(self):
162
        return ['oscap-vm', "domain", self.domain_name, 'xccdf', 'eval']
163
164
    def offline_scan(self, args, verbose_path):
165
        command_list = self._local_oscap_check_base_arguments() + args
166
167
        return common.run_cmd_local(command_list, verbose_path)
168
169
170
class ContainerTestEnv(TestEnv):
171
    def __init__(self, scanning_mode, image_name):
172
        super(ContainerTestEnv, self).__init__(scanning_mode)
173
        self._name_stem = "ssg_test"
174
        self.base_image = image_name
175
        self.created_images = []
176
        self.containers = []
177
        self.domain_ip = None
178
179
    def start(self):
180
        self.run_container(self.base_image)
181
182
    def finalize(self):
183
        self._terminate_current_running_container_if_applicable()
184
185
    def image_stem2fqn(self, stem):
186
        image_name = "{0}_{1}".format(self.base_image, stem)
187
        return image_name
188
189
    @property
190
    def current_container(self):
191
        if self.containers:
192
            return self.containers[-1]
193
        return None
194
195
    @property
196
    def current_image(self):
197
        if self.created_images:
198
            return self.created_images[-1]
199
        return self.base_image
200
201
    def _create_new_image(self, from_container, name):
202
        new_image_name = self.image_stem2fqn(name)
203
        if not from_container:
204
            from_container = self.run_container(self.current_image)
205
        self._commit(from_container, new_image_name)
206
        self.created_images.append(new_image_name)
207
        return new_image_name
208
209
    def _save_state(self, state_name):
210
        state = self._create_new_image(self.current_container, state_name)
211
        return state
212
213
    def run_container(self, image_name, container_name="running"):
214
        new_container = self._new_container_from_image(image_name, container_name)
215
        self.containers.append(new_container)
216
        # Get the container time to fully start its service
217
        time.sleep(0.2)
218
219
        self.domain_ip = self._get_container_ip(new_container)
220
        return new_container
221
222
    def reset_state_to(self, state_name, new_running_state_name):
223
        self._terminate_current_running_container_if_applicable()
224
        image_name = self.image_stem2fqn(state_name)
225
226
        new_container = self.run_container(image_name, new_running_state_name)
227
228
        return new_container
229
230
    def _delete_saved_state(self, image):
231
        self._terminate_current_running_container_if_applicable()
232
233
        assert self.created_images
234
235
        associated_image = self.created_images.pop()
236
        assert associated_image == image
237
        self._remove_image(associated_image)
238
239
    def offline_scan(self, args, verbose_path):
240
        command_list = self._local_oscap_check_base_arguments() + args
241
242
        return common.run_cmd_local(command_list, verbose_path)
243
244
    def _commit(self, container, image):
245
        raise NotImplementedError
246
247
    def _new_container_from_image(self, image_name, container_name):
248
        raise NotImplementedError
249
250
    def _get_container_ip(self, container):
251
        raise NotImplementedError
252
253
    def _terminate_current_running_container_if_applicable(self):
254
        raise NotImplementedError
255
256
    def _remove_image(self, image):
257
        raise NotImplementedError
258
259
    def _local_oscap_check_base_arguments(self):
260
        raise NotImplementedError
261
262
263
class DockerTestEnv(ContainerTestEnv):
264
    name = "docker-based"
265
266
    def __init__(self, mode, image_name):
267
        super(DockerTestEnv, self).__init__(mode, image_name)
268
        try:
269
            import docker
270
        except ImportError:
271
            raise RuntimeError("Can't import Docker, Docker backend will not work.")
272
        try:
273
            self.client = docker.from_env(version="auto")
274
            self.client.ping()
275
        except Exception as exc:
276
            msg = (
277
                "Unable to start the Docker test environment, "
278
                "is the Docker service started "
279
                "and do you have rights to access it?"
280
                .format(str(exc)))
281
            raise RuntimeError(msg)
282
283
    def _commit(self, container, image):
284
        container.commit(repository=image)
285
286
    def _new_container_from_image(self, image_name, container_name):
287
        img = self.client.images.get(image_name)
288
        result = self.client.containers.run(
289
            img, "/usr/sbin/sshd -D",
290
            name="{0}_{1}".format(self._name_stem, container_name), ports={"22": None},
291
            detach=True)
292
        return result
293
294
    def _get_container_ip(self, container):
295
        container.reload()
296
        container_ip = container.attrs["NetworkSettings"]["Networks"]["bridge"]["IPAddress"]
297
        return container_ip
298
299
    def _terminate_current_running_container_if_applicable(self):
300
        if self.containers:
301
            running_state = self.containers.pop()
302
            running_state.stop()
303
            running_state.remove()
304
305
    def _remove_image(self, image):
306
        self.client.images.remove(image)
307
308
    def _local_oscap_check_base_arguments(self):
309
        return ['oscap-docker', "container", self.current_container.id,
310
                                                            'xccdf', 'eval']
311
312
313
class PodmanTestEnv(ContainerTestEnv):
314
    # TODO: Rework this class using Podman Python bindings (python3-podman)
315
    # at the moment when their API will provide methods to run containers,
316
    # commit images and inspect containers
317
    name = "podman-based"
318
319
    def __init__(self, scanning_mode, image_name):
320
        super(PodmanTestEnv, self).__init__(scanning_mode, image_name)
321
322
    def _commit(self, container, image):
323
        podman_cmd = ["podman", "commit", container, image]
324
        try:
325
            subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
326
        except subprocess.CalledProcessError as e:
327
            msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
328
            raise RuntimeError(msg)
329
330
    def _new_container_from_image(self, image_name, container_name):
331
        long_name = "{0}_{1}".format(self._name_stem, container_name)
332
        podman_cmd = ["podman", "run", "--name", long_name,
333
                      "--publish", "22", "--detach", image_name,
334
                      "/usr/sbin/sshd", "-D"]
335
        try:
336
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
337
        except subprocess.CalledProcessError as e:
338
            msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
339
            raise RuntimeError(msg)
340
        container_id = podman_output.decode("utf-8").strip()
341
        return container_id
342
343
    def _get_container_ip(self, container):
344
        podman_cmd = ["podman", "inspect", container, "--format", "{{.NetworkSettings.IPAddress}}"]
345
        try:
346
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
347
        except subprocess.CalledProcessError as e:
348
            msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
349
            raise RuntimeError(msg)
350
        ip_address = podman_output.decode("utf-8")
351
        return ip_address
352
353
    def _terminate_current_running_container_if_applicable(self):
354
        if self.containers:
355
            running_state = self.containers.pop()
356
            podman_cmd = ["podman", "stop", running_state]
357
            try:
358
                subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
359
            except subprocess.CalledProcessError as e:
360
                msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
361
                raise RuntimeError(msg)
362
            podman_cmd = ["podman", "rm", running_state]
363
            try:
364
                subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
365
            except subprocess.CalledProcessError as e:
366
                msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
367
                raise RuntimeError(msg)
368
369
    def _remove_image(self, image):
370
        podman_cmd = ["podman", "rmi", image]
371
        try:
372
            subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
373
        except subprocess.CalledProcessError as e:
374
            msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
375
            raise RuntimeError(msg)
376
377
    def _local_oscap_check_base_arguments(self):
378
        raise NotImplementedError("OpenSCAP doesn't support offline scanning of Podman Containers")
379