Passed
Push — master ( 42467a...9d011f )
by Matěj
03:19 queued 11s
created

ContainerTestEnv.current_container()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 1
dl 0
loc 5
ccs 0
cts 4
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import contextlib
4
import sys
5
import os
6
import time
7
import subprocess
8
import json
9
import logging
10
11
import ssg_test_suite
12
from ssg_test_suite import common
13
14
15
class SavedState(object):
16
    def __init__(self, environment, name):
17
        self.name = name
18
        self.environment = environment
19
        self.initial_running_state = True
20
21
    def map_on_top(self, function, args_list):
22
        if not args_list:
23
            return
24
        current_running_state = self.initial_running_state
25
        function(* args_list[0])
26
        for idx, args in enumerate(args_list[1:], 1):
27
            current_running_state = self.environment.reset_state_to(
28
                self.name, "running_%d" % idx)
29
            function(* args)
30
        current_running_state = self.environment.reset_state_to(
31
            self.name, "running_last")
32
33
    @classmethod
34
    @contextlib.contextmanager
35
    def create_from_environment(cls, environment, state_name):
36
        state = cls(environment, state_name)
37
38
        state_handle = environment.save_state(state_name)
39
        exception_to_reraise = None
40
        try:
41
            yield state
42
        except KeyboardInterrupt as exc:
43
            print("Hang on for a minute, cleaning up the saved state '{0}'."
44
                  .format(state_name), file=sys.stderr)
45
            exception_to_reraise = exc
46
        finally:
47
            try:
48
                environment._delete_saved_state(state_handle)
49
            except KeyboardInterrupt:
50
                print("Hang on for a minute, cleaning up the saved state '{0}'."
51
                      .format(state_name), file=sys.stderr)
52
                environment._delete_saved_state(state_handle)
53
            finally:
54
                if exception_to_reraise:
55
                    raise exception_to_reraise
56
57
58
class TestEnv(object):
59
    def __init__(self, scanning_mode):
60
        self.running_state_base = None
61
        self.running_state = None
62
63
        self.scanning_mode = scanning_mode
64
        self.backend = None
65
        self.ssh_port = None
66
67
        self.domain_ip = None
68
        self.ssh_additional_options = []
69
70
    def start(self):
71
        """
72
        Run the environment and
73
        ensure that the environment will not be permanently modified
74
        by subsequent procedures.
75
        """
76
        self.refresh_connection_parameters()
77
78
    def refresh_connection_parameters(self):
79
        self.domain_ip = self.get_ip_address()
80
        self.ssh_port = self.get_ssh_port()
81
        self.ssh_additional_options = self.get_ssh_additional_options()
82
83
    def get_ip_address(self):
84
        raise NotImplementedError()
85
86
    def get_ssh_port(self):
87
        return 22
88
89
    def get_ssh_additional_options(self):
90
        return list(common.SSH_ADDITIONAL_OPTS)
91
92
    def execute_ssh_command(self, command, log_file, error_msg=None):
93
        remote_dest = "root@{ip}".format(ip=self.domain_ip)
94
        if not error_msg:
95
            error_msg = (
96
                "Failed to execute '{command}' on {remote_dest}"
97
                .format(command=command, remote_dest=remote_dest))
98
        try:
99
            stdout = common.run_with_stdout_logging(
100
                "ssh", tuple(self.ssh_additional_options) + (remote_dest, command), log_file)
101
        except Exception as exc:
102
            logging.error(error_msg + ": " + str(exc))
103
            raise RuntimeError(error_msg)
104
        return stdout
105
106
    def scp_download_file(self, source, destination, log_file, error_msg=None):
107
        scp_src = "root@{ip}:{source}".format(ip=self.domain_ip, source=source)
108
        return self.scp_transfer_file(scp_src, destination, log_file, error_msg)
109
110
    def scp_upload_file(self, source, destination, log_file, error_msg=None):
111
        scp_dest = "root@{ip}:{dest}".format(ip=self.domain_ip, dest=destination)
112
        return self.scp_transfer_file(source, scp_dest, log_file, error_msg)
113
114
    def scp_transfer_file(self, source, destination, log_file, error_msg=None):
115
        if not error_msg:
116
            error_msg = (
117
                "Failed to copy {source} to {destination}"
118
                .format(source=source, destination=destination))
119
        try:
120
            common.run_with_stdout_logging(
121
                "scp", tuple(self.ssh_additional_options) + (source, destination), log_file)
122
        except Exception as exc:
123
            error_msg = error_msg + ": " + str(exc)
124
            logging.error(error_msg)
125
            raise RuntimeError(error_msg)
126
127
    def finalize(self):
128
        """
129
        Perform the environment cleanup and shut it down.
130
        """
131
        pass
132
133
    def reset_state_to(self, state_name, new_running_state_name):
134
        raise NotImplementedError()
135
136
    def save_state(self, state_name):
137
        self.running_state_base = state_name
138
        running_state = self.running_state
139
        return self._save_state(state_name)
140
141
    def _delete_saved_state(self, state_name):
142
        raise NotImplementedError()
143
144
    def _stop_state(self, state):
145
        pass
146
147
    def _oscap_ssh_base_arguments(self):
148
        full_hostname = 'root@{}'.format(self.domain_ip)
149
        return ['oscap-ssh', full_hostname, "{}".format(self.ssh_port), 'xccdf', 'eval']
150
151
    def scan(self, args, verbose_path):
152
        if self.scanning_mode == "online":
153
            return self.online_scan(args, verbose_path)
154
        elif self.scanning_mode == "offline":
155
            return self.offline_scan(args, verbose_path)
156
        else:
157
            msg = "Invalid scanning mode {mode}".format(mode=self.scanning_mode)
158
            raise KeyError(msg)
159
160
    def online_scan(self, args, verbose_path):
161
        os.environ["SSH_ADDITIONAL_OPTIONS"] = " ".join(common.SSH_ADDITIONAL_OPTS)
162
        command_list = self._oscap_ssh_base_arguments() + args
163
        return common.run_cmd_local(command_list, verbose_path)
164
165
    def offline_scan(self, args, verbose_path):
166
        raise NotImplementedError()
167
168
169
class VMTestEnv(TestEnv):
170
    name = "libvirt-based"
171
172
    def __init__(self, mode, hypervisor, domain_name):
173
        super(VMTestEnv, self).__init__(mode)
174
175
        try:
176
            import libvirt
177
        except ImportError:
178
            raise RuntimeError("Can't import libvirt module, libvirt backend will "
179
                               "therefore not work.")
180
181
        self.domain = None
182
183
        self.hypervisor = hypervisor
184
        self.domain_name = domain_name
185
        self.snapshot_stack = None
186
187
        self._origin = None
188
189
    def start(self):
190
        from ssg_test_suite import virt
191
192
        self.domain = virt.connect_domain(
193
            self.hypervisor, self.domain_name)
194
195
        self.snapshot_stack = virt.SnapshotStack(self.domain)
196
197
        virt.start_domain(self.domain)
198
199
        self._origin = self._save_state("origin")
200
201
        super().start()
202
203
    def get_ip_address(self):
204
        from ssg_test_suite import virt
205
206
        return virt.determine_ip(self.domain)
207
208
    def reboot(self):
209
        from ssg_test_suite import virt
210
211
        if self.domain is None:
212
            self.domain = virt.connect_domain(
213
                self.hypervisor, self.domain_name)
214
215
        virt.reboot_domain(self.domain, self.domain_ip, self.ssh_port)
216
217
    def finalize(self):
218
        self._delete_saved_state(self._origin)
219
        # self.domain.shutdown()
220
        # logging.debug('Shut the domain off')
221
222
    def reset_state_to(self, state_name, new_running_state_name):
223
        last_snapshot_name = self.snapshot_stack.snapshot_stack[-1].getName()
224
        assert last_snapshot_name == state_name, (
225
            "You can only revert to the last snapshot, which is {0}, not {1}"
226
            .format(last_snapshot_name, state_name))
227
        state = self.snapshot_stack.revert(delete=False)
228
        return state
229
230
    def _save_state(self, state_name):
231
        state = self.snapshot_stack.create(state_name)
232
        return state
233
234
    def _delete_saved_state(self, snapshot):
235
        self.snapshot_stack.revert()
236
237
    def _local_oscap_check_base_arguments(self):
238
        return ['oscap-vm', "domain", self.domain_name, 'xccdf', 'eval']
239
240
    def offline_scan(self, args, verbose_path):
241
        command_list = self._local_oscap_check_base_arguments() + args
242
243
        return common.run_cmd_local(command_list, verbose_path)
244
245
246
class ContainerTestEnv(TestEnv):
247
    def __init__(self, scanning_mode, image_name):
248
        super(ContainerTestEnv, self).__init__(scanning_mode)
249
        self._name_stem = "ssg_test"
250
        self.base_image = image_name
251
        self.created_images = []
252
        self.containers = []
253
        self.domain_ip = None
254
        self.internal_ssh_port = 22222
255
256
    def start(self):
257
        self.run_container(self.base_image)
258
        super().start()
259
260
    def finalize(self):
261
        self._terminate_current_running_container_if_applicable()
262
263
    def image_stem2fqn(self, stem):
264
        image_name = "{0}_{1}".format(self.base_image, stem)
265
        return image_name
266
267
    @property
268
    def current_container(self):
269
        if self.containers:
270
            return self.containers[-1]
271
        return None
272
273
    @property
274
    def current_image(self):
275
        if self.created_images:
276
            return self.created_images[-1]
277
        return self.base_image
278
279
    def _create_new_image(self, from_container, name):
280
        new_image_name = self.image_stem2fqn(name)
281
        if not from_container:
282
            from_container = self.run_container(self.current_image)
283
        self._commit(from_container, new_image_name)
284
        self.created_images.append(new_image_name)
285
        return new_image_name
286
287
    def _save_state(self, state_name):
288
        state = self._create_new_image(self.current_container, state_name)
289
        return state
290
291
    def get_ssh_port(self):
292
        if self.domain_ip == 'localhost':
293
            ports = self._get_container_ports(self.current_container)
294
            if self.internal_ssh_port in ports:
295
                ssh_port = ports[self.internal_ssh_port]
296
            else:
297
                msg = "Unable to detect the SSH port for the container."
298
                raise RuntimeError(msg)
299
        else:
300
            ssh_port = self.internal_ssh_port
301
        return ssh_port
302
303
    def get_ssh_additional_options(self):
304
        ssh_additional_options = super().get_ssh_additional_options()
305
306
        # Assure that the -o option is followed by Port=<correct value> argument
307
        # If there is Port, assume that -o precedes it and just set the correct value
308
        port_opt = ['-o', 'Port={}'.format(self.ssh_port)]
309
        for index, opt in enumerate(ssh_additional_options):
310
            if opt.startswith('Port='):
311
                ssh_additional_options[index] = port_opt[1]
312
313
        # Put both arguments to the list of arguments if Port is not there.
314
        if port_opt[1] not in ssh_additional_options:
315
            ssh_additional_options = port_opt + ssh_additional_options
316
        return ssh_additional_options
317
318
    def run_container(self, image_name, container_name="running"):
319
        new_container = self._new_container_from_image(image_name, container_name)
320
        self.containers.append(new_container)
321
        # Get the container time to fully start its service
322
        time.sleep(0.2)
323
324
        self.refresh_connection_parameters()
325
326
        return new_container
327
328
    def reset_state_to(self, state_name, new_running_state_name):
329
        self._terminate_current_running_container_if_applicable()
330
        image_name = self.image_stem2fqn(state_name)
331
332
        new_container = self.run_container(image_name, new_running_state_name)
333
334
        return new_container
335
336
    def _delete_saved_state(self, image):
337
        self._terminate_current_running_container_if_applicable()
338
339
        assert self.created_images
340
341
        associated_image = self.created_images.pop()
342
        assert associated_image == image
343
        self._remove_image(associated_image)
344
345
    def offline_scan(self, args, verbose_path):
346
        command_list = self._local_oscap_check_base_arguments() + args
347
348
        return common.run_cmd_local(command_list, verbose_path)
349
350
    def _commit(self, container, image):
351
        raise NotImplementedError
352
353
    def _new_container_from_image(self, image_name, container_name):
354
        raise NotImplementedError
355
356
    def get_ip_address(self):
357
        raise NotImplementedError
358
359
    def _get_container_ports(self, container):
360
        raise NotImplementedError
361
362
    def _terminate_current_running_container_if_applicable(self):
363
        raise NotImplementedError
364
365
    def _remove_image(self, image):
366
        raise NotImplementedError
367
368
    def _local_oscap_check_base_arguments(self):
369
        raise NotImplementedError
370
371
372
class DockerTestEnv(ContainerTestEnv):
373
    name = "docker-based"
374
375
    def __init__(self, mode, image_name):
376
        super(DockerTestEnv, self).__init__(mode, image_name)
377
        try:
378
            import docker
379
        except ImportError:
380
            raise RuntimeError("Can't import the docker module, Docker backend will not work.")
381
        try:
382
            self.client = docker.from_env(version="auto")
383
            self.client.ping()
384
        except Exception as exc:
385
            msg = (
386
                "{}\n"
387
                "Unable to start the Docker test environment, "
388
                "is the Docker service started "
389
                "and do you have rights to access it?"
390
                .format(str(exc)))
391
            raise RuntimeError(msg)
392
393
    def _commit(self, container, image):
394
        container.commit(repository=image)
395
396
    def _new_container_from_image(self, image_name, container_name):
397
        img = self.client.images.get(image_name)
398
        result = self.client.containers.run(
399
            img, "/usr/sbin/sshd -p {} -D".format(self.internal_ssh_port),
400
            name="{0}_{1}".format(self._name_stem, container_name),
401
            ports={"{}".format(self.internal_ssh_port): None},
402
            detach=True)
403
        return result
404
405
    def get_ip_address(self):
406
        container = self.current_container
407
        container.reload()
408
        container_ip = container.attrs["NetworkSettings"]["Networks"]["bridge"]["IPAddress"]
409
        if not container_ip:
410
            container_ip = 'localhost'
411
        return container_ip
412
413
    def _terminate_current_running_container_if_applicable(self):
414
        if self.containers:
415
            running_state = self.containers.pop()
416
            running_state.stop()
417
            running_state.remove()
418
419
    def _remove_image(self, image):
420
        self.client.images.remove(image)
421
422
    def _local_oscap_check_base_arguments(self):
423
        return ['oscap-docker', "container", self.current_container.id,
424
                'xccdf', 'eval']
425
426
    def _get_container_ports(self, container):
427
        raise NotImplementedError("This method shouldn't be needed.")
428
429
430
class PodmanTestEnv(ContainerTestEnv):
431
    # TODO: Rework this class using Podman Python bindings (python3-podman)
432
    # at the moment when their API will provide methods to run containers,
433
    # commit images and inspect containers
434
    name = "podman-based"
435
436
    def __init__(self, scanning_mode, image_name):
437
        super(PodmanTestEnv, self).__init__(scanning_mode, image_name)
438
439
    def _commit(self, container, image):
440
        podman_cmd = ["podman", "commit", container, image]
441
        try:
442
            subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
443
        except subprocess.CalledProcessError as e:
444
            msg = "Command '{0}' returned {1}:\n{2}".format(
445
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
446
            raise RuntimeError(msg)
447
448
    def _new_container_from_image(self, image_name, container_name):
449
        long_name = "{0}_{1}".format(self._name_stem, container_name)
450
        podman_cmd = ["podman", "run", "--name", long_name,
451
                      "--publish", "{}".format(self.internal_ssh_port), "--detach", image_name,
452
                      "/usr/sbin/sshd", "-p", "{}".format(self.internal_ssh_port), "-D"]
453
        try:
454
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
455
        except subprocess.CalledProcessError as e:
456
            msg = "Command '{0}' returned {1}:\n{2}".format(
457
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
458
            raise RuntimeError(msg)
459
        container_id = podman_output.decode("utf-8").strip()
460
        return container_id
461
462
    def get_ip_address(self):
463
        podman_cmd = [
464
                "podman", "inspect", self.current_container,
465
                "--format", "{{.NetworkSettings.IPAddress}}",
466
        ]
467
        try:
468
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
469
        except subprocess.CalledProcessError as e:
470
            msg = "Command '{0}' returned {1}:\n{2}".format(
471
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
472
            raise RuntimeError(msg)
473
        ip_address = podman_output.decode("utf-8").strip()
474
        if not ip_address:
475
            ip_address = "localhost"
476
        return ip_address
477
478
    def _get_container_ports(self, container):
479
        podman_cmd = ["podman", "inspect", container, "--format",
480
                      "{{json .NetworkSettings.Ports}}"]
481
        try:
482
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
483
        except subprocess.CalledProcessError as e:
484
            msg = "Command '{0}' returned {1}:\n{2}".format(
485
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
486
            raise RuntimeError(msg)
487
        return self.extract_port_map(json.loads(podman_output))
488
489
    def extract_port_map(self, podman_network_data):
490
        if 'containerPort' in podman_network_data:
491
            container_port = podman_network_data['containerPort']
492
            host_port = podman_network_data['hostPort']
493
        else:
494
            container_port_with_protocol, host_data = podman_network_data.popitem()
495
            container_port = container_port_with_protocol.split("/")[0]
496
            host_port = host_data[0]['HostPort']
497
        port_map = {int(container_port): int(host_port)}
498
        return port_map
499
500
    def _terminate_current_running_container_if_applicable(self):
501
        if self.containers:
502
            running_state = self.containers.pop()
503
            podman_cmd = ["podman", "stop", running_state]
504
            try:
505
                subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
506
            except subprocess.CalledProcessError as e:
507
                msg = "Command '{0}' returned {1}:\n{2}".format(
508
                    " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
509
                raise RuntimeError(msg)
510
            podman_cmd = ["podman", "rm", running_state]
511
            try:
512
                subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
513
            except subprocess.CalledProcessError as e:
514
                msg = "Command '{0}' returned {1}:\n{2}".format(
515
                    " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
516
                raise RuntimeError(msg)
517
518
    def _remove_image(self, image):
519
        podman_cmd = ["podman", "rmi", image]
520
        try:
521
            subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
522
        except subprocess.CalledProcessError as e:
523
            msg = "Command '{0}' returned {1}:\n{2}".format(
524
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
525
            raise RuntimeError(msg)
526
527
    def _local_oscap_check_base_arguments(self):
528
        raise NotImplementedError("OpenSCAP doesn't support offline scanning of Podman Containers")
529