Test Failed
Push — master ( 220a8c...f5a3e8 )
by Jan
01:14 queued 12s
created

ContainerTestEnv.run_container()   B

Complexity

Conditions 6

Size

Total Lines 20
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 0
Metric Value
cc 6
eloc 17
nop 3
dl 0
loc 20
ccs 0
cts 16
cp 0
crap 42
rs 8.6166
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import contextlib
4
import sys
5
import os
6
import time
7
import subprocess
8
import json
9
10
import ssg_test_suite
11
from ssg_test_suite import common
12
13
14
class SavedState(object):
15
    def __init__(self, environment, name):
16
        self.name = name
17
        self.environment = environment
18
        self.initial_running_state = True
19
20
    def map_on_top(self, function, args_list):
21
        if not args_list:
22
            return
23
        current_running_state = self.initial_running_state
24
        function(* args_list[0])
25
        for idx, args in enumerate(args_list[1:], 1):
26
            current_running_state = self.environment.reset_state_to(
27
                self.name, "running_%d" % idx)
28
            function(* args)
29
        current_running_state = self.environment.reset_state_to(
30
            self.name, "running_last")
31
32
    @classmethod
33
    @contextlib.contextmanager
34
    def create_from_environment(cls, environment, state_name):
35
        state = cls(environment, state_name)
36
37
        state_handle = environment.save_state(state_name)
38
        exception_to_reraise = None
39
        try:
40
            yield state
41
        except KeyboardInterrupt as exc:
42
            print("Hang on for a minute, cleaning up the saved state '{0}'."
43
                  .format(state_name), file=sys.stderr)
44
            exception_to_reraise = exc
45
        finally:
46
            try:
47
                environment._delete_saved_state(state_handle)
48
            except KeyboardInterrupt as exc:
49
                print("Hang on for a minute, cleaning up the saved state '{0}'."
50
                      .format(state_name), file=sys.stderr)
51
                environment._delete_saved_state(state_handle)
52
            finally:
53
                if exception_to_reraise:
54
                    raise exception_to_reraise
55
56
57
class TestEnv(object):
58
    def __init__(self, scanning_mode):
59
        self.running_state_base = None
60
        self.running_state = None
61
62
        self.scanning_mode = scanning_mode
63
        self.backend = None
64
        self.ssh_port = 22
65
66
    def start(self):
67
        """
68
        Run the environment and
69
        ensure that the environment will not be permanently modified
70
        by subsequent procedures.
71
        """
72
        pass
73
74
    def finalize(self):
75
        """
76
        Perform the environment cleanup and shut it down.
77
        """
78
        pass
79
80
    def reset_state_to(self, state_name, new_running_state_name):
81
        raise NotImplementedError()
82
83
    def save_state(self, state_name):
84
        self.running_state_base = state_name
85
        running_state = self.running_state
86
        return self._save_state(state_name)
87
88
    def _delete_saved_state(self, state_name):
89
        raise NotImplementedError()
90
91
    def _stop_state(self, state):
92
        pass
93
94
    def _oscap_ssh_base_arguments(self):
95
        full_hostname = 'root@{}'.format(self.domain_ip)
96
        return ['oscap-ssh', full_hostname, "{}".format(self.ssh_port), 'xccdf', 'eval']
97
98
    def scan(self, args, verbose_path):
99
        if self.scanning_mode == "online":
100
            return self.online_scan(args, verbose_path)
101
        elif self.scanning_mode == "offline":
102
            return self.offline_scan(args, verbose_path)
103
        else:
104
            msg = "Invalid scanning mode {mode}".format(mode=self.scanning_mode)
105
            raise KeyError(msg)
106
107
    def online_scan(self, args, verbose_path):
108
        os.environ["SSH_ADDITIONAL_OPTIONS"] = " ".join(common.SSH_ADDITIONAL_OPTS)
109
        command_list = self._oscap_ssh_base_arguments() + args
110
        return common.run_cmd_local(command_list, verbose_path)
111
112
    def offline_scan(self, args, verbose_path):
113
        raise NotImplementedError()
114
115
116
class VMTestEnv(TestEnv):
117
    name = "libvirt-based"
118
119
    def __init__(self, mode, hypervisor, domain_name):
120
        super(VMTestEnv, self).__init__(mode)
121
122
        try:
123
            import libvirt
124
        except ImportError:
125
            raise RuntimeError("Can't import libvirt module, libvirt backend will therefore not work.")
126
127
        self.domain = None
128
129
        self.hypervisor = hypervisor
130
        self.domain_name = domain_name
131
        self.snapshot_stack = None
132
133
        self._origin = None
134
135
    def start(self):
136
        from ssg_test_suite import virt
137
138
        self.domain = virt.connect_domain(
139
            self.hypervisor, self.domain_name)
140
141
        self.snapshot_stack = virt.SnapshotStack(self.domain)
142
143
        virt.start_domain(self.domain)
144
        self.domain_ip = virt.determine_ip(self.domain)
145
146
        self._origin = self._save_state("origin")
147
148
    def finalize(self):
149
        self._delete_saved_state(self._origin)
150
        # self.domain.shutdown()
151
        # logging.debug('Shut the domain off')
152
153
    def reset_state_to(self, state_name, new_running_state_name):
154
        last_snapshot_name = self.snapshot_stack.snapshot_stack[-1].getName()
155
        assert last_snapshot_name == state_name, (
156
            "You can only revert to the last snapshot, which is {0}, not {1}"
157
            .format(last_snapshot_name, state_name))
158
        state = self.snapshot_stack.revert(delete=False)
159
        return state
160
161
    def _save_state(self, state_name):
162
        state = self.snapshot_stack.create(state_name)
163
        return state
164
165
    def _delete_saved_state(self, snapshot):
166
        self.snapshot_stack.revert()
167
168
    def _local_oscap_check_base_arguments(self):
169
        return ['oscap-vm', "domain", self.domain_name, 'xccdf', 'eval']
170
171
    def offline_scan(self, args, verbose_path):
172
        command_list = self._local_oscap_check_base_arguments() + args
173
174
        return common.run_cmd_local(command_list, verbose_path)
175
176
177
class ContainerTestEnv(TestEnv):
178
    def __init__(self, scanning_mode, image_name):
179
        super(ContainerTestEnv, self).__init__(scanning_mode)
180
        self._name_stem = "ssg_test"
181
        self.base_image = image_name
182
        self.created_images = []
183
        self.containers = []
184
        self.domain_ip = None
185
        self.internal_ssh_port = 22222
186
187
    def start(self):
188
        self.run_container(self.base_image)
189
190
    def finalize(self):
191
        self._terminate_current_running_container_if_applicable()
192
193
    def image_stem2fqn(self, stem):
194
        image_name = "{0}_{1}".format(self.base_image, stem)
195
        return image_name
196
197
    @property
198
    def current_container(self):
199
        if self.containers:
200
            return self.containers[-1]
201
        return None
202
203
    @property
204
    def current_image(self):
205
        if self.created_images:
206
            return self.created_images[-1]
207
        return self.base_image
208
209
    def _create_new_image(self, from_container, name):
210
        new_image_name = self.image_stem2fqn(name)
211
        if not from_container:
212
            from_container = self.run_container(self.current_image)
213
        self._commit(from_container, new_image_name)
214
        self.created_images.append(new_image_name)
215
        return new_image_name
216
217
    def _save_state(self, state_name):
218
        state = self._create_new_image(self.current_container, state_name)
219
        return state
220
221
    def run_container(self, image_name, container_name="running"):
222
        new_container = self._new_container_from_image(image_name, container_name)
223
        self.containers.append(new_container)
224
        # Get the container time to fully start its service
225
        time.sleep(0.2)
226
227
        self.domain_ip = self._get_container_ip(new_container)
228
        if not self.domain_ip:
229
            self.domain_ip = 'localhost'
230
            ports = self._get_container_ports(new_container)
231
            if self.internal_ssh_port in ports:
232
                self.ssh_port = ports[self.internal_ssh_port]
233
        else:
234
            self.ssh_port = self.internal_ssh_port
235
        port_opt = ('-o', 'Port={}'.format(self.ssh_port),)
236
        common.SSH_ADDITIONAL_OPTS = tuple(map(lambda x: port_opt[1] if x.startswith('Port=') else x,
237
                                               common.SSH_ADDITIONAL_OPTS))
238
        if port_opt[1] not in common.SSH_ADDITIONAL_OPTS:
239
            common.SSH_ADDITIONAL_OPTS = port_opt + common.SSH_ADDITIONAL_OPTS
240
        return new_container
241
242
    def reset_state_to(self, state_name, new_running_state_name):
243
        self._terminate_current_running_container_if_applicable()
244
        image_name = self.image_stem2fqn(state_name)
245
246
        new_container = self.run_container(image_name, new_running_state_name)
247
248
        return new_container
249
250
    def _delete_saved_state(self, image):
251
        self._terminate_current_running_container_if_applicable()
252
253
        assert self.created_images
254
255
        associated_image = self.created_images.pop()
256
        assert associated_image == image
257
        self._remove_image(associated_image)
258
259
    def offline_scan(self, args, verbose_path):
260
        command_list = self._local_oscap_check_base_arguments() + args
261
262
        return common.run_cmd_local(command_list, verbose_path)
263
264
    def _commit(self, container, image):
265
        raise NotImplementedError
266
267
    def _new_container_from_image(self, image_name, container_name):
268
        raise NotImplementedError
269
270
    def _get_container_ip(self, container):
271
        raise NotImplementedError
272
273
    def _get_container_ports(self, container):
274
        raise NotImplementedError
275
276
    def _terminate_current_running_container_if_applicable(self):
277
        raise NotImplementedError
278
279
    def _remove_image(self, image):
280
        raise NotImplementedError
281
282
    def _local_oscap_check_base_arguments(self):
283
        raise NotImplementedError
284
285
286
class DockerTestEnv(ContainerTestEnv):
287
    name = "docker-based"
288
289
    def __init__(self, mode, image_name):
290
        super(DockerTestEnv, self).__init__(mode, image_name)
291
        try:
292
            import docker
293
        except ImportError:
294
            raise RuntimeError("Can't import the docker module, Docker backend will not work.")
295
        try:
296
            self.client = docker.from_env(version="auto")
297
            self.client.ping()
298
        except Exception as exc:
299
            msg = (
300
                "Unable to start the Docker test environment, "
301
                "is the Docker service started "
302
                "and do you have rights to access it?"
303
                .format(str(exc)))
304
            raise RuntimeError(msg)
305
306
    def _commit(self, container, image):
307
        container.commit(repository=image)
308
309
    def _new_container_from_image(self, image_name, container_name):
310
        img = self.client.images.get(image_name)
311
        result = self.client.containers.run(
312
            img, "/usr/sbin/sshd -p {} -D".format(self.internal_ssh_port),
313
            name="{0}_{1}".format(self._name_stem, container_name),
314
            ports={"{}".format(self.internal_ssh_port): None},
315
            detach=True)
316
        return result
317
318
    def _get_container_ip(self, container):
319
        container.reload()
320
        container_ip = container.attrs["NetworkSettings"]["Networks"]["bridge"]["IPAddress"]
321
        return container_ip
322
323
    def _terminate_current_running_container_if_applicable(self):
324
        if self.containers:
325
            running_state = self.containers.pop()
326
            running_state.stop()
327
            running_state.remove()
328
329
    def _remove_image(self, image):
330
        self.client.images.remove(image)
331
332
    def _local_oscap_check_base_arguments(self):
333
        return ['oscap-docker', "container", self.current_container.id,
334
                                                            'xccdf', 'eval']
335
336
337
class PodmanTestEnv(ContainerTestEnv):
338
    # TODO: Rework this class using Podman Python bindings (python3-podman)
339
    # at the moment when their API will provide methods to run containers,
340
    # commit images and inspect containers
341
    name = "podman-based"
342
343
    def __init__(self, scanning_mode, image_name):
344
        super(PodmanTestEnv, self).__init__(scanning_mode, image_name)
345
346
    def _commit(self, container, image):
347
        podman_cmd = ["podman", "commit", container, image]
348
        try:
349
            subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
350
        except subprocess.CalledProcessError as e:
351
            msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
352
            raise RuntimeError(msg)
353
354
    def _new_container_from_image(self, image_name, container_name):
355
        long_name = "{0}_{1}".format(self._name_stem, container_name)
356
        podman_cmd = ["podman", "run", "--name", long_name,
357
                      "--publish", "{}".format(self.internal_ssh_port), "--detach", image_name,
358
                      "/usr/sbin/sshd", "-p", "{}".format(self.internal_ssh_port), "-D"]
359
        try:
360
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
361
        except subprocess.CalledProcessError as e:
362
            msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
363
            raise RuntimeError(msg)
364
        container_id = podman_output.decode("utf-8").strip()
365
        return container_id
366
367
    def _get_container_ip(self, container):
368
        podman_cmd = ["podman", "inspect", container, "--format", "{{.NetworkSettings.IPAddress}}"]
369
        try:
370
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
371
        except subprocess.CalledProcessError as e:
372
            msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
373
            raise RuntimeError(msg)
374
        ip_address = podman_output.decode("utf-8").strip()
375
        return ip_address
376
377
    def _get_container_ports(self, container):
378
        podman_cmd = ["podman", "inspect", container, "--format", "{{json .NetworkSettings.Ports}}"]
379
        try:
380
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
381
        except subprocess.CalledProcessError as e:
382
            msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
383
            raise RuntimeError(msg)
384
        ports = {}
385
        for pb in json.loads(podman_output):
386
            ports[pb['containerPort']] = pb['hostPort']
387
        return ports
388
389
    def _terminate_current_running_container_if_applicable(self):
390
        if self.containers:
391
            running_state = self.containers.pop()
392
            podman_cmd = ["podman", "stop", running_state]
393
            try:
394
                subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
395
            except subprocess.CalledProcessError as e:
396
                msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
397
                raise RuntimeError(msg)
398
            podman_cmd = ["podman", "rm", running_state]
399
            try:
400
                subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
401
            except subprocess.CalledProcessError as e:
402
                msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
403
                raise RuntimeError(msg)
404
405
    def _remove_image(self, image):
406
        podman_cmd = ["podman", "rmi", image]
407
        try:
408
            subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
409
        except subprocess.CalledProcessError as e:
410
            msg = "Command '{0}' returned {1}:\n{2}".format(" ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
411
            raise RuntimeError(msg)
412
413
    def _local_oscap_check_base_arguments(self):
414
        raise NotImplementedError("OpenSCAP doesn't support offline scanning of Podman Containers")
415