Test Failed
Push — master ( 05831d...bcc24a )
by Jan
02:33 queued 12s
created

TestEnv.arf_to_html()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 2
dl 0
loc 9
ccs 0
cts 7
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import contextlib
4
import sys
5
import os
6
import re
7
import time
8
import subprocess
9
import json
10
import logging
11
12
import ssg_test_suite
13
from ssg_test_suite import common
14
15
16
class SavedState(object):
17
    def __init__(self, environment, name):
18
        self.name = name
19
        self.environment = environment
20
        self.initial_running_state = True
21
22
    def map_on_top(self, function, args_list):
23
        if not args_list:
24
            return
25
        current_running_state = self.initial_running_state
26
        function(* args_list[0])
27
        for idx, args in enumerate(args_list[1:], 1):
28
            current_running_state = self.environment.reset_state_to(
29
                self.name, "running_%d" % idx)
30
            function(* args)
31
        current_running_state = self.environment.reset_state_to(
32
            self.name, "running_last")
33
34
    @classmethod
35
    @contextlib.contextmanager
36
    def create_from_environment(cls, environment, state_name):
37
        state = cls(environment, state_name)
38
39
        state_handle = environment.save_state(state_name)
40
        exception_to_reraise = None
41
        try:
42
            yield state
43
        except KeyboardInterrupt as exc:
44
            print("Hang on for a minute, cleaning up the saved state '{0}'."
45
                  .format(state_name), file=sys.stderr)
46
            exception_to_reraise = exc
47
        finally:
48
            try:
49
                environment._delete_saved_state(state_handle)
50
            except KeyboardInterrupt:
51
                print("Hang on for a minute, cleaning up the saved state '{0}'."
52
                      .format(state_name), file=sys.stderr)
53
                environment._delete_saved_state(state_handle)
54
            finally:
55
                if exception_to_reraise:
56
                    raise exception_to_reraise
57
58
59
class TestEnv(object):
60
    def __init__(self, scanning_mode):
61
        self.running_state_base = None
62
        self.running_state = None
63
64
        self.scanning_mode = scanning_mode
65
        self.backend = None
66
        self.ssh_port = None
67
68
        self.domain_ip = None
69
        self.ssh_additional_options = []
70
71
        self.product = None
72
73
        self.have_local_oval_graph = False
74
        p = subprocess.run(['arf-to-graph', '--version'], capture_output=True)
75
        if p.returncode == 0:
76
            self.have_local_oval_graph = True
77
78
    def arf_to_html(self, arf_filename):
79
        if not self.have_local_oval_graph:
80
            return
81
82
        html_filename = re.sub(r"\barf\b", "graph", arf_filename)
83
        html_filename = re.sub(r".xml", ".html", html_filename)
84
85
        cmd = ['arf-to-graph', '--all-in-one', '--output', html_filename, arf_filename, '.']
86
        p = subprocess.run(cmd, capture_output=True)
87
88
    def start(self):
89
        """
90
        Run the environment and
91
        ensure that the environment will not be permanently modified
92
        by subsequent procedures.
93
        """
94
        self.refresh_connection_parameters()
95
96
    def refresh_connection_parameters(self):
97
        self.domain_ip = self.get_ip_address()
98
        self.ssh_port = self.get_ssh_port()
99
        self.ssh_additional_options = self.get_ssh_additional_options()
100
101
    def get_ip_address(self):
102
        raise NotImplementedError()
103
104
    def get_ssh_port(self):
105
        return 22
106
107
    def get_ssh_additional_options(self):
108
        return list(common.SSH_ADDITIONAL_OPTS)
109
110
    def execute_ssh_command(self, command, log_file, error_msg=None):
111
        remote_dest = "root@{ip}".format(ip=self.domain_ip)
112
        if not error_msg:
113
            error_msg = (
114
                "Failed to execute '{command}' on {remote_dest}"
115
                .format(command=command, remote_dest=remote_dest))
116
        try:
117
            stdout = common.run_with_stdout_logging(
118
                "ssh", tuple(self.ssh_additional_options) + (remote_dest, command), log_file)
119
        except Exception as exc:
120
            logging.error(error_msg + ": " + str(exc))
121
            raise RuntimeError(error_msg)
122
        return stdout
123
124
    def scp_download_file(self, source, destination, log_file, error_msg=None):
125
        scp_src = "root@{ip}:{source}".format(ip=self.domain_ip, source=source)
126
        return self.scp_transfer_file(scp_src, destination, log_file, error_msg)
127
128
    def scp_upload_file(self, source, destination, log_file, error_msg=None):
129
        scp_dest = "root@{ip}:{dest}".format(ip=self.domain_ip, dest=destination)
130
        return self.scp_transfer_file(source, scp_dest, log_file, error_msg)
131
132
    def scp_transfer_file(self, source, destination, log_file, error_msg=None):
133
        if not error_msg:
134
            error_msg = (
135
                "Failed to copy {source} to {destination}"
136
                .format(source=source, destination=destination))
137
        try:
138
            common.run_with_stdout_logging(
139
                "scp", tuple(self.ssh_additional_options) + (source, destination), log_file)
140
        except Exception as exc:
141
            error_msg = error_msg + ": " + str(exc)
142
            logging.error(error_msg)
143
            raise RuntimeError(error_msg)
144
145
    def finalize(self):
146
        """
147
        Perform the environment cleanup and shut it down.
148
        """
149
        pass
150
151
    def reset_state_to(self, state_name, new_running_state_name):
152
        raise NotImplementedError()
153
154
    def save_state(self, state_name):
155
        self.running_state_base = state_name
156
        running_state = self.running_state
157
        return self._save_state(state_name)
158
159
    def _delete_saved_state(self, state_name):
160
        raise NotImplementedError()
161
162
    def _stop_state(self, state):
163
        pass
164
165
    def _oscap_ssh_base_arguments(self):
166
        full_hostname = 'root@{}'.format(self.domain_ip)
167
        return ['oscap-ssh', full_hostname, "{}".format(self.ssh_port), 'xccdf', 'eval']
168
169
    def scan(self, args, verbose_path):
170
        if self.scanning_mode == "online":
171
            return self.online_scan(args, verbose_path)
172
        elif self.scanning_mode == "offline":
173
            return self.offline_scan(args, verbose_path)
174
        else:
175
            msg = "Invalid scanning mode {mode}".format(mode=self.scanning_mode)
176
            raise KeyError(msg)
177
178
    def online_scan(self, args, verbose_path):
179
        os.environ["SSH_ADDITIONAL_OPTIONS"] = " ".join(common.SSH_ADDITIONAL_OPTS)
180
        command_list = self._oscap_ssh_base_arguments() + args
181
        return common.run_cmd_local(command_list, verbose_path)
182
183
    def offline_scan(self, args, verbose_path):
184
        raise NotImplementedError()
185
186
187
class VMTestEnv(TestEnv):
188
    name = "libvirt-based"
189
190
    def __init__(self, mode, hypervisor, domain_name):
191
        super(VMTestEnv, self).__init__(mode)
192
193
        try:
194
            import libvirt
195
        except ImportError:
196
            raise RuntimeError("Can't import libvirt module, libvirt backend will "
197
                               "therefore not work.")
198
199
        self.domain = None
200
201
        self.hypervisor = hypervisor
202
        self.domain_name = domain_name
203
        self.snapshot_stack = None
204
205
        self._origin = None
206
207
    def start(self):
208
        from ssg_test_suite import virt
209
210
        self.domain = virt.connect_domain(
211
            self.hypervisor, self.domain_name)
212
213
        self.snapshot_stack = virt.SnapshotStack(self.domain)
214
215
        virt.start_domain(self.domain)
216
217
        self._origin = self._save_state("origin")
218
219
        super().start()
220
221
    def get_ip_address(self):
222
        from ssg_test_suite import virt
223
224
        return virt.determine_ip(self.domain)
225
226
    def reboot(self):
227
        from ssg_test_suite import virt
228
229
        if self.domain is None:
230
            self.domain = virt.connect_domain(
231
                self.hypervisor, self.domain_name)
232
233
        virt.reboot_domain(self.domain, self.domain_ip, self.ssh_port)
234
235
    def finalize(self):
236
        self._delete_saved_state(self._origin)
237
        # self.domain.shutdown()
238
        # logging.debug('Shut the domain off')
239
240
    def reset_state_to(self, state_name, new_running_state_name):
241
        last_snapshot_name = self.snapshot_stack.snapshot_stack[-1].getName()
242
        assert last_snapshot_name == state_name, (
243
            "You can only revert to the last snapshot, which is {0}, not {1}"
244
            .format(last_snapshot_name, state_name))
245
        state = self.snapshot_stack.revert(delete=False)
246
        return state
247
248
    def _save_state(self, state_name):
249
        state = self.snapshot_stack.create(state_name)
250
        return state
251
252
    def _delete_saved_state(self, snapshot):
253
        self.snapshot_stack.revert()
254
255
    def _local_oscap_check_base_arguments(self):
256
        return ['oscap-vm', "domain", self.domain_name, 'xccdf', 'eval']
257
258
    def offline_scan(self, args, verbose_path):
259
        command_list = self._local_oscap_check_base_arguments() + args
260
261
        return common.run_cmd_local(command_list, verbose_path)
262
263
264
class ContainerTestEnv(TestEnv):
265
    def __init__(self, scanning_mode, image_name):
266
        super(ContainerTestEnv, self).__init__(scanning_mode)
267
        self._name_stem = "ssg_test"
268
        self.base_image = image_name
269
        self.created_images = []
270
        self.containers = []
271
        self.domain_ip = None
272
        self.internal_ssh_port = 22222
273
274
    def start(self):
275
        self.run_container(self.base_image)
276
        super().start()
277
278
    def finalize(self):
279
        self._terminate_current_running_container_if_applicable()
280
281
    def image_stem2fqn(self, stem):
282
        image_name = "{0}_{1}".format(self.base_image, stem)
283
        return image_name
284
285
    @property
286
    def current_container(self):
287
        if self.containers:
288
            return self.containers[-1]
289
        return None
290
291
    @property
292
    def current_image(self):
293
        if self.created_images:
294
            return self.created_images[-1]
295
        return self.base_image
296
297
    def _create_new_image(self, from_container, name):
298
        new_image_name = self.image_stem2fqn(name)
299
        if not from_container:
300
            from_container = self.run_container(self.current_image)
301
        self._commit(from_container, new_image_name)
302
        self.created_images.append(new_image_name)
303
        return new_image_name
304
305
    def _save_state(self, state_name):
306
        state = self._create_new_image(self.current_container, state_name)
307
        return state
308
309
    def get_ssh_port(self):
310
        if self.domain_ip == 'localhost':
311
            try:
312
                ports = self._get_container_ports(self.current_container)
313
            except Exception as exc:
314
                msg = (
315
                    "Unable to extract SSH ports from the container. "
316
                    "This usually means that the container backend reported its configuration "
317
                    "in an unexpected format."
318
                )
319
                raise RuntimeError(msg)
320
321
            if self.internal_ssh_port in ports:
322
                ssh_port = ports[self.internal_ssh_port]
323
            else:
324
                msg = "Unable to detect the SSH port for the container."
325
                raise RuntimeError(msg)
326
        else:
327
            ssh_port = self.internal_ssh_port
328
        return ssh_port
329
330
    def get_ssh_additional_options(self):
331
        ssh_additional_options = super().get_ssh_additional_options()
332
333
        # Assure that the -o option is followed by Port=<correct value> argument
334
        # If there is Port, assume that -o precedes it and just set the correct value
335
        port_opt = ['-o', 'Port={}'.format(self.ssh_port)]
336
        for index, opt in enumerate(ssh_additional_options):
337
            if opt.startswith('Port='):
338
                ssh_additional_options[index] = port_opt[1]
339
340
        # Put both arguments to the list of arguments if Port is not there.
341
        if port_opt[1] not in ssh_additional_options:
342
            ssh_additional_options = port_opt + ssh_additional_options
343
        return ssh_additional_options
344
345
    def run_container(self, image_name, container_name="running"):
346
        new_container = self._new_container_from_image(image_name, container_name)
347
        self.containers.append(new_container)
348
        # Get the container time to fully start its service
349
        time.sleep(0.2)
350
351
        self.refresh_connection_parameters()
352
353
        return new_container
354
355
    def reset_state_to(self, state_name, new_running_state_name):
356
        self._terminate_current_running_container_if_applicable()
357
        image_name = self.image_stem2fqn(state_name)
358
359
        new_container = self.run_container(image_name, new_running_state_name)
360
361
        return new_container
362
363
    def _delete_saved_state(self, image):
364
        self._terminate_current_running_container_if_applicable()
365
366
        assert self.created_images
367
368
        associated_image = self.created_images.pop()
369
        assert associated_image == image
370
        self._remove_image(associated_image)
371
372
    def offline_scan(self, args, verbose_path):
373
        command_list = self._local_oscap_check_base_arguments() + args
374
375
        return common.run_cmd_local(command_list, verbose_path)
376
377
    def _commit(self, container, image):
378
        raise NotImplementedError
379
380
    def _new_container_from_image(self, image_name, container_name):
381
        raise NotImplementedError
382
383
    def get_ip_address(self):
384
        raise NotImplementedError
385
386
    def _get_container_ports(self, container):
387
        raise NotImplementedError
388
389
    def _terminate_current_running_container_if_applicable(self):
390
        raise NotImplementedError
391
392
    def _remove_image(self, image):
393
        raise NotImplementedError
394
395
    def _local_oscap_check_base_arguments(self):
396
        raise NotImplementedError
397
398
399
class DockerTestEnv(ContainerTestEnv):
400
    name = "docker-based"
401
402
    def __init__(self, mode, image_name):
403
        super(DockerTestEnv, self).__init__(mode, image_name)
404
        try:
405
            import docker
406
        except ImportError:
407
            raise RuntimeError("Can't import the docker module, Docker backend will not work.")
408
        try:
409
            self.client = docker.from_env(version="auto")
410
            self.client.ping()
411
        except Exception as exc:
412
            msg = (
413
                "{}\n"
414
                "Unable to start the Docker test environment, "
415
                "is the Docker service started "
416
                "and do you have rights to access it?"
417
                .format(str(exc)))
418
            raise RuntimeError(msg)
419
420
    def _commit(self, container, image):
421
        container.commit(repository=image)
422
423
    def _new_container_from_image(self, image_name, container_name):
424
        img = self.client.images.get(image_name)
425
        result = self.client.containers.run(
426
            img, "/usr/sbin/sshd -p {} -D".format(self.internal_ssh_port),
427
            name="{0}_{1}".format(self._name_stem, container_name),
428
            ports={"{}".format(self.internal_ssh_port): None},
429
            detach=True)
430
        return result
431
432
    def get_ip_address(self):
433
        container = self.current_container
434
        container.reload()
435
        container_ip = container.attrs["NetworkSettings"]["Networks"]["bridge"]["IPAddress"]
436
        if not container_ip:
437
            container_ip = 'localhost'
438
        return container_ip
439
440
    def _terminate_current_running_container_if_applicable(self):
441
        if self.containers:
442
            running_state = self.containers.pop()
443
            running_state.stop()
444
            running_state.remove()
445
446
    def _remove_image(self, image):
447
        self.client.images.remove(image)
448
449
    def _local_oscap_check_base_arguments(self):
450
        return ['oscap-docker', "container", self.current_container.id,
451
                'xccdf', 'eval']
452
453
    def _get_container_ports(self, container):
454
        raise NotImplementedError("This method shouldn't be needed.")
455
456
457
class PodmanTestEnv(ContainerTestEnv):
458
    # TODO: Rework this class using Podman Python bindings (python3-podman)
459
    # at the moment when their API will provide methods to run containers,
460
    # commit images and inspect containers
461
    name = "podman-based"
462
463
    def __init__(self, scanning_mode, image_name):
464
        super(PodmanTestEnv, self).__init__(scanning_mode, image_name)
465
466
    def _commit(self, container, image):
467
        podman_cmd = ["podman", "commit", container, image]
468
        try:
469
            subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
470
        except subprocess.CalledProcessError as e:
471
            msg = "Command '{0}' returned {1}:\n{2}".format(
472
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
473
            raise RuntimeError(msg)
474
475
    def _new_container_from_image(self, image_name, container_name):
476
        long_name = "{0}_{1}".format(self._name_stem, container_name)
477
        # Podman drops cap_audit_write which causes that it is not possible
478
        # run sshd by default. Therefore, we need to add the capability.
479
        # We also need cap_sys_admin so it can perform mount/umount.
480
        podman_cmd = ["podman", "run", "--name", long_name,
481
                      "--cap-add=cap_audit_write",
482
                      "--cap-add=cap_sys_admin",
483
                    #   "--privileged",
484
                      "--publish", "{}".format(self.internal_ssh_port), "--detach", image_name,
485
                      "/usr/sbin/sshd", "-p", "{}".format(self.internal_ssh_port), "-D"]
486
        try:
487
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
488
        except subprocess.CalledProcessError as e:
489
            msg = "Command '{0}' returned {1}:\n{2}".format(
490
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
491
            raise RuntimeError(msg)
492
        container_id = podman_output.decode("utf-8").strip()
493
        return container_id
494
495
    def get_ip_address(self):
496
        podman_cmd = [
497
                "podman", "inspect", self.current_container,
498
                "--format", "{{.NetworkSettings.IPAddress}}",
499
        ]
500
        try:
501
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
502
        except subprocess.CalledProcessError as e:
503
            msg = "Command '{0}' returned {1}:\n{2}".format(
504
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
505
            raise RuntimeError(msg)
506
        ip_address = podman_output.decode("utf-8").strip()
507
        if not ip_address:
508
            ip_address = "localhost"
509
        return ip_address
510
511
    def _get_container_ports(self, container):
512
        podman_cmd = ["podman", "inspect", container, "--format",
513
                      "{{json .NetworkSettings.Ports}}"]
514
        try:
515
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
516
        except subprocess.CalledProcessError as e:
517
            msg = "Command '{0}' returned {1}:\n{2}".format(
518
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
519
            raise RuntimeError(msg)
520
        return self.extract_port_map(json.loads(podman_output))
521
522
    def extract_port_map(self, podman_network_data):
523
        if 'containerPort' in podman_network_data:
524
            container_port = podman_network_data['containerPort']
525
            host_port = podman_network_data['hostPort']
526
        else:
527
            container_port_with_protocol, host_data = podman_network_data.popitem()
528
            container_port = container_port_with_protocol.split("/")[0]
529
            host_port = host_data[0]['HostPort']
530
        port_map = {int(container_port): int(host_port)}
531
        return port_map
532
533
    def _terminate_current_running_container_if_applicable(self):
534
        if self.containers:
535
            running_state = self.containers.pop()
536
            podman_cmd = ["podman", "stop", running_state]
537
            try:
538
                subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
539
            except subprocess.CalledProcessError as e:
540
                msg = "Command '{0}' returned {1}:\n{2}".format(
541
                    " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
542
                raise RuntimeError(msg)
543
            podman_cmd = ["podman", "rm", running_state]
544
            try:
545
                subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
546
            except subprocess.CalledProcessError as e:
547
                msg = "Command '{0}' returned {1}:\n{2}".format(
548
                    " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
549
                raise RuntimeError(msg)
550
551
    def _remove_image(self, image):
552
        podman_cmd = ["podman", "rmi", image]
553
        try:
554
            subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
555
        except subprocess.CalledProcessError as e:
556
            msg = "Command '{0}' returned {1}:\n{2}".format(
557
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
558
            raise RuntimeError(msg)
559
560
    def _local_oscap_check_base_arguments(self):
561
        raise NotImplementedError("OpenSCAP doesn't support offline scanning of Podman Containers")
562