tests.ssg_test_suite.test_env   F
last analyzed

Complexity

Total Complexity 119

Size/Duplication

Total Lines 602
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 442
dl 0
loc 602
rs 2
c 0
b 0
f 0
wmc 119

74 Methods

Rating   Name   Duplication   Size   Complexity  
A PodmanTestEnv.extract_port_map() 0 10 2
A PodmanTestEnv._get_container_ports() 0 10 2
A ContainerTestEnv.get_ssh_port() 0 20 4
A TestEnv.__init__() 0 23 3
A DockerTestEnv._local_oscap_check_base_arguments() 0 3 1
A ContainerTestEnv.finalize() 0 2 1
A TestEnv._delete_saved_state() 0 2 1
A VMTestEnv.__init__() 0 17 2
A TestEnv.scp_upload_file() 0 3 1
A PodmanTestEnv._remove_image() 0 8 2
A ContainerTestEnv.current_image() 0 5 2
A VMTestEnv.get_ip_address() 0 4 1
A TestEnv.scp_transfer_file() 0 12 3
A ContainerTestEnv.offline_scan() 0 4 1
A VMTestEnv.snapshot_lookup() 0 2 1
A SavedState.map_on_top() 0 11 3
A DockerTestEnv._terminate_current_running_container_if_applicable() 0 5 2
A TestEnv.save_state() 0 4 1
A ContainerTestEnv.start() 0 3 1
A TestEnv._oscap_ssh_base_arguments() 0 3 1
A ContainerTestEnv._get_container_ports() 0 2 1
A TestEnv.finalize() 0 5 1
A TestEnv.scp_download_file() 0 3 1
A VMTestEnv._delete_saved_state() 0 2 1
A TestEnv.get_ssh_port() 0 2 1
A VMTestEnv.offline_scan() 0 4 1
A ContainerTestEnv.get_ssh_additional_options() 0 14 4
A ContainerTestEnv.image_stem2fqn() 0 3 1
A ContainerTestEnv._terminate_current_running_container_if_applicable() 0 2 1
A TestEnv.start() 0 7 1
A DockerTestEnv._new_container_from_image() 0 8 1
A DockerTestEnv.get_ip_address() 0 7 2
A TestEnv.scan() 0 8 3
A PodmanTestEnv._local_oscap_check_base_arguments() 0 2 1
A DockerTestEnv._commit() 0 2 1
A VMTestEnv.has_test_suite_prefix() 0 4 2
A ContainerTestEnv.get_ip_address() 0 2 1
A DockerTestEnv.__init__() 0 17 3
A PodmanTestEnv.__init__() 0 2 1
A ContainerTestEnv._create_new_image() 0 7 2
A VMTestEnv.reset_state_to() 0 7 1
A ContainerTestEnv.current_container() 0 5 2
A TestEnv._stop_state() 0 2 1
A ContainerTestEnv.__init__() 0 8 1
A TestEnv.get_ip_address() 0 2 1
A SavedState.__init__() 0 4 1
A SavedState.create_from_environment() 0 23 4
A ContainerTestEnv.run_container() 0 9 1
A DockerTestEnv._get_container_ports() 0 2 1
A DockerTestEnv._remove_image() 0 2 1
A VMTestEnv._local_oscap_check_base_arguments() 0 2 1
A PodmanTestEnv.get_ip_address() 0 15 3
A PodmanTestEnv._commit() 0 8 2
A VMTestEnv.start() 0 19 3
A PodmanTestEnv._new_container_from_image() 0 20 2
A VMTestEnv._save_state() 0 4 1
A VMTestEnv.snapshots_cleanup() 0 6 3
A PodmanTestEnv._terminate_current_running_container_if_applicable() 0 17 4
A TestEnv.online_scan() 0 4 1
A ContainerTestEnv._remove_image() 0 2 1
A TestEnv.arf_to_html() 0 12 3
A VMTestEnv.reboot() 0 8 2
A ContainerTestEnv._new_container_from_image() 0 2 1
A TestEnv.refresh_connection_parameters() 0 4 1
A ContainerTestEnv.reset_state_to() 0 7 1
A ContainerTestEnv._delete_saved_state() 0 8 1
A ContainerTestEnv._local_oscap_check_base_arguments() 0 2 1
A TestEnv.offline_scan() 0 2 1
A ContainerTestEnv._save_state() 0 4 1
A TestEnv.reset_state_to() 0 2 1
A ContainerTestEnv._commit() 0 2 1
A TestEnv.get_ssh_additional_options() 0 2 1
A TestEnv.execute_ssh_command() 0 19 3
A VMTestEnv.finalize() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like tests.ssg_test_suite.test_env often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import print_function
2
3
import contextlib
4
import json
5
import logging
6
import os
7
import re
8
import subprocess
9
import sys
10
import time
11
12
import ssg_test_suite
13
from ssg_test_suite import common
14
15
16
class SavedState(object):
17
    def __init__(self, environment, name):
18
        self.name = common.get_prefixed_name(name)
19
        self.environment = environment
20
        self.initial_running_state = True
21
22
    def map_on_top(self, function, args_list):
23
        if not args_list:
24
            return
25
        current_running_state = self.initial_running_state
26
        function(* args_list[0])
27
        for idx, args in enumerate(args_list[1:], 1):
28
            current_running_state = self.environment.reset_state_to(
29
                self.name, "running_%d" % idx)
30
            function(* args)
31
        current_running_state = self.environment.reset_state_to(
32
            self.name, "running_last")
33
34
    @classmethod
35
    @contextlib.contextmanager
36
    def create_from_environment(cls, environment, state_name):
37
        state = cls(environment, state_name)
38
39
        state_handle = environment.save_state(state_name)
40
        exception_to_reraise = None
41
        try:
42
            yield state
43
        except KeyboardInterrupt as exc:
44
            print("Hang on for a minute, cleaning up the saved state '{0}'."
45
                  .format(state_name), file=sys.stderr)
46
            exception_to_reraise = exc
47
        finally:
48
            try:
49
                environment._delete_saved_state(state_handle)
50
            except KeyboardInterrupt:
51
                print("Hang on for a minute, cleaning up the saved state '{0}'."
52
                      .format(state_name), file=sys.stderr)
53
                environment._delete_saved_state(state_handle)
54
            finally:
55
                if exception_to_reraise:
56
                    raise exception_to_reraise
57
58
59
class TestEnv(object):
60
    def __init__(self, scanning_mode):
61
        self.running_state_base = None
62
        self.running_state = None
63
64
        self.scanning_mode = scanning_mode
65
        self.backend = None
66
        self.ssh_port = None
67
68
        self.domain_ip = None
69
        self.ssh_additional_options = []
70
71
        self.product = None
72
73
        self.have_local_oval_graph = False
74
        try:
75
            p = subprocess.run(['arf-to-graph', '--version'],
76
                               stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
77
            if p.returncode == 0:
78
                self.have_local_oval_graph = True
79
        except FileNotFoundError:
80
            # There is no arf-to-graph - in that case the proces can't be even started,
81
            # not to mention return codes.
82
            pass
83
84
    def arf_to_html(self, arf_filename):
85
        if not self.have_local_oval_graph:
86
            return
87
88
        html_filename = re.sub(r"\barf\b", "graph", arf_filename)
89
        html_filename = re.sub(r".xml", ".html", html_filename)
90
91
        cmd = ['arf-to-graph', '--all-in-one', '--output', html_filename, arf_filename, '.']
92
        p = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
93
        if p.returncode != 0:
94
            print("Error generating OVAL check summaries: {stderr}".format(stderr=p.stderr),
95
                  file=sys.stderr)
96
97
    def start(self):
98
        """
99
        Run the environment and
100
        ensure that the environment will not be permanently modified
101
        by subsequent procedures.
102
        """
103
        self.refresh_connection_parameters()
104
105
    def refresh_connection_parameters(self):
106
        self.domain_ip = self.get_ip_address()
107
        self.ssh_port = self.get_ssh_port()
108
        self.ssh_additional_options = self.get_ssh_additional_options()
109
110
    def get_ip_address(self):
111
        raise NotImplementedError()
112
113
    def get_ssh_port(self):
114
        return 22
115
116
    def get_ssh_additional_options(self):
117
        return list(common.SSH_ADDITIONAL_OPTS)
118
119
    def execute_ssh_command(self, command, log_file, error_msg_template=None):
120
        """
121
        Args:
122
            - command: Command to execute remotely as a single string
123
            - log_file
124
            - error_msg_template: A string that can contain references to:
125
                ``command``, ``remote_dest``, ``rc``, and ``stderr``
126
        """
127
        if not error_msg_template:
128
            error_msg_template = "Return code of '{command}' on {remote_dest} is {rc}: {stderr}"
129
        remote_dest = "root@{ip}".format(ip=self.domain_ip)
130
        result = common.retry_with_stdout_logging(
131
            "ssh", tuple(self.ssh_additional_options) + (remote_dest, command), log_file)
132
        if result.returncode:
133
            error_msg = error_msg_template.format(
134
                command=command, remote_dest=remote_dest,
135
                rc=result.returncode, stderr=result.stderr)
136
            raise RuntimeError(error_msg)
137
        return result.stdout
138
139
    def scp_download_file(self, source, destination, log_file, error_msg=None):
140
        scp_src = "root@{ip}:{source}".format(ip=self.domain_ip, source=source)
141
        return self.scp_transfer_file(scp_src, destination, log_file, error_msg)
142
143
    def scp_upload_file(self, source, destination, log_file, error_msg=None):
144
        scp_dest = "root@{ip}:{dest}".format(ip=self.domain_ip, dest=destination)
145
        return self.scp_transfer_file(source, scp_dest, log_file, error_msg)
146
147
    def scp_transfer_file(self, source, destination, log_file, error_msg=None):
148
        if not error_msg:
149
            error_msg = (
150
                "Failed to copy {source} to {destination}"
151
                .format(source=source, destination=destination))
152
        try:
153
            result = common.run_with_stdout_logging(
154
                "scp", tuple(self.ssh_additional_options) + (source, destination), log_file)
155
        except Exception as exc:
156
            error_msg = error_msg + ": " + str(exc)
157
            logging.error(error_msg)
158
            raise RuntimeError(error_msg)
159
160
    def finalize(self):
161
        """
162
        Perform the environment cleanup and shut it down.
163
        """
164
        pass
165
166
    def reset_state_to(self, state_name, new_running_state_name):
167
        raise NotImplementedError()
168
169
    def save_state(self, state_name):
170
        self.running_state_base = common.get_prefixed_name(state_name)
171
        running_state = self.running_state
172
        return self._save_state(state_name)
173
174
    def _delete_saved_state(self, state_name):
175
        raise NotImplementedError()
176
177
    def _stop_state(self, state):
178
        pass
179
180
    def _oscap_ssh_base_arguments(self):
181
        full_hostname = 'root@{}'.format(self.domain_ip)
182
        return ['oscap-ssh', full_hostname, "{}".format(self.ssh_port), 'xccdf', 'eval']
183
184
    def scan(self, args, verbose_path):
185
        if self.scanning_mode == "online":
186
            return self.online_scan(args, verbose_path)
187
        elif self.scanning_mode == "offline":
188
            return self.offline_scan(args, verbose_path)
189
        else:
190
            msg = "Invalid scanning mode {mode}".format(mode=self.scanning_mode)
191
            raise KeyError(msg)
192
193
    def online_scan(self, args, verbose_path):
194
        os.environ["SSH_ADDITIONAL_OPTIONS"] = " ".join(common.SSH_ADDITIONAL_OPTS)
195
        command_list = self._oscap_ssh_base_arguments() + args
196
        return common.run_cmd_local(command_list, verbose_path)
197
198
    def offline_scan(self, args, verbose_path):
199
        raise NotImplementedError()
200
201
202
class VMTestEnv(TestEnv):
203
    name = "libvirt-based"
204
205
    def __init__(self, mode, hypervisor, domain_name, keep_snapshots):
206
        super(VMTestEnv, self).__init__(mode)
207
208
        try:
209
            import libvirt
210
        except ImportError:
211
            raise RuntimeError("Can't import libvirt module, libvirt backend will "
212
                               "therefore not work.")
213
214
        self.domain = None
215
216
        self.hypervisor = hypervisor
217
        self.domain_name = domain_name
218
        self.snapshot_stack = None
219
        self.keep_snapshots = keep_snapshots
220
221
        self._origin = None
222
223
    def has_test_suite_prefix(self, snapshot_name):
224
        if str(snapshot_name).startswith(common.TEST_SUITE_PREFIX):
225
            return True
226
        return False
227
228
    def snapshot_lookup(self, snapshot_name):
229
        return self.domain.snapshotLookupByName(snapshot_name)
230
231
    def snapshots_cleanup(self):
232
        snapshot_list = self.domain.snapshotListNames()
233
        for snapshot_name in snapshot_list:
234
            if self.has_test_suite_prefix(snapshot_name):
235
                snapshot = self.snapshot_lookup(snapshot_name)
236
                snapshot.delete()
237
238
    def start(self):
239
        from ssg_test_suite import virt
240
241
        self.domain = virt.connect_domain(
242
            self.hypervisor, self.domain_name)
243
244
        if self.domain is None:
245
            sys.exit(1)
246
247
        if not self.keep_snapshots:
248
            self.snapshots_cleanup()
249
250
        self.snapshot_stack = virt.SnapshotStack(self.domain)
251
252
        virt.start_domain(self.domain)
253
254
        self._origin = self._save_state("origin")
255
256
        super().start()
257
258
    def get_ip_address(self):
259
        from ssg_test_suite import virt
260
261
        return virt.determine_ip(self.domain)
262
263
    def reboot(self):
264
        from ssg_test_suite import virt
265
266
        if self.domain is None:
267
            self.domain = virt.connect_domain(
268
                self.hypervisor, self.domain_name)
269
270
        virt.reboot_domain(self.domain, self.domain_ip, self.ssh_port)
271
272
    def finalize(self):
273
        self._delete_saved_state(self._origin)
274
        # self.domain.shutdown()
275
        # logging.debug('Shut the domain off')
276
277
    def reset_state_to(self, state_name, new_running_state_name):
278
        last_snapshot_name = self.snapshot_stack.snapshot_stack[-1].getName()
279
        assert last_snapshot_name == state_name, (
280
            "You can only revert to the last snapshot, which is {0}, not {1}"
281
            .format(last_snapshot_name, state_name))
282
        state = self.snapshot_stack.revert(delete=False)
283
        return state
284
285
    def _save_state(self, state_name):
286
        prefixed_state_name = common.get_prefixed_name(state_name)
287
        state = self.snapshot_stack.create(prefixed_state_name)
288
        return state
289
290
    def _delete_saved_state(self, snapshot):
291
        self.snapshot_stack.revert()
292
293
    def _local_oscap_check_base_arguments(self):
294
        return ['oscap-vm', "domain", self.domain_name, 'xccdf', 'eval']
295
296
    def offline_scan(self, args, verbose_path):
297
        command_list = self._local_oscap_check_base_arguments() + args
298
299
        return common.run_cmd_local(command_list, verbose_path)
300
301
302
class ContainerTestEnv(TestEnv):
303
    def __init__(self, scanning_mode, image_name):
304
        super(ContainerTestEnv, self).__init__(scanning_mode)
305
        self._name_stem = "ssg_test"
306
        self.base_image = image_name
307
        self.created_images = []
308
        self.containers = []
309
        self.domain_ip = None
310
        self.internal_ssh_port = 22222
311
312
    def start(self):
313
        self.run_container(self.base_image)
314
        super().start()
315
316
    def finalize(self):
317
        self._terminate_current_running_container_if_applicable()
318
319
    def image_stem2fqn(self, stem):
320
        image_name = "{0}_{1}".format(self.base_image, stem)
321
        return image_name
322
323
    @property
324
    def current_container(self):
325
        if self.containers:
326
            return self.containers[-1]
327
        return None
328
329
    @property
330
    def current_image(self):
331
        if self.created_images:
332
            return self.created_images[-1]
333
        return self.base_image
334
335
    def _create_new_image(self, from_container, name):
336
        new_image_name = self.image_stem2fqn(name)
337
        if not from_container:
338
            from_container = self.run_container(self.current_image)
339
        self._commit(from_container, new_image_name)
340
        self.created_images.append(new_image_name)
341
        return new_image_name
342
343
    def _save_state(self, state_name):
344
        prefixed_state_name = common.get_prefixed_name(state_name)
345
        state = self._create_new_image(self.current_container, prefixed_state_name)
346
        return state
347
348
    def get_ssh_port(self):
349
        if self.domain_ip == 'localhost':
350
            try:
351
                ports = self._get_container_ports(self.current_container)
352
            except Exception as exc:
353
                msg = (
354
                    "Unable to extract SSH ports from the container. "
355
                    "This usually means that the container backend reported its configuration "
356
                    "in an unexpected format."
357
                )
358
                raise RuntimeError(msg)
359
360
            if self.internal_ssh_port in ports:
361
                ssh_port = ports[self.internal_ssh_port]
362
            else:
363
                msg = "Unable to detect the SSH port for the container."
364
                raise RuntimeError(msg)
365
        else:
366
            ssh_port = self.internal_ssh_port
367
        return ssh_port
368
369
    def get_ssh_additional_options(self):
370
        ssh_additional_options = super().get_ssh_additional_options()
371
372
        # Assure that the -o option is followed by Port=<correct value> argument
373
        # If there is Port, assume that -o precedes it and just set the correct value
374
        port_opt = ['-o', 'Port={}'.format(self.ssh_port)]
375
        for index, opt in enumerate(ssh_additional_options):
376
            if opt.startswith('Port='):
377
                ssh_additional_options[index] = port_opt[1]
378
379
        # Put both arguments to the list of arguments if Port is not there.
380
        if port_opt[1] not in ssh_additional_options:
381
            ssh_additional_options = port_opt + ssh_additional_options
382
        return ssh_additional_options
383
384
    def run_container(self, image_name, container_name="running"):
385
        new_container = self._new_container_from_image(image_name, container_name)
386
        self.containers.append(new_container)
387
        # Get the container time to fully start its service
388
        time.sleep(0.2)
389
390
        self.refresh_connection_parameters()
391
392
        return new_container
393
394
    def reset_state_to(self, state_name, new_running_state_name):
395
        self._terminate_current_running_container_if_applicable()
396
        image_name = self.image_stem2fqn(state_name)
397
398
        new_container = self.run_container(image_name, new_running_state_name)
399
400
        return new_container
401
402
    def _delete_saved_state(self, image):
403
        self._terminate_current_running_container_if_applicable()
404
405
        assert self.created_images
406
407
        associated_image = self.created_images.pop()
408
        assert associated_image == image
409
        self._remove_image(associated_image)
410
411
    def offline_scan(self, args, verbose_path):
412
        command_list = self._local_oscap_check_base_arguments() + args
413
414
        return common.run_cmd_local(command_list, verbose_path)
415
416
    def _commit(self, container, image):
417
        raise NotImplementedError
418
419
    def _new_container_from_image(self, image_name, container_name):
420
        raise NotImplementedError
421
422
    def get_ip_address(self):
423
        raise NotImplementedError
424
425
    def _get_container_ports(self, container):
426
        raise NotImplementedError
427
428
    def _terminate_current_running_container_if_applicable(self):
429
        raise NotImplementedError
430
431
    def _remove_image(self, image):
432
        raise NotImplementedError
433
434
    def _local_oscap_check_base_arguments(self):
435
        raise NotImplementedError
436
437
438
class DockerTestEnv(ContainerTestEnv):
439
    name = "docker-based"
440
441
    def __init__(self, mode, image_name):
442
        super(DockerTestEnv, self).__init__(mode, image_name)
443
        try:
444
            import docker
445
        except ImportError:
446
            raise RuntimeError("Can't import the docker module, Docker backend will not work.")
447
        try:
448
            self.client = docker.from_env(version="auto")
449
            self.client.ping()
450
        except Exception as exc:
451
            msg = (
452
                "{}\n"
453
                "Unable to start the Docker test environment, "
454
                "is the Docker service started "
455
                "and do you have rights to access it?"
456
                .format(str(exc)))
457
            raise RuntimeError(msg)
458
459
    def _commit(self, container, image):
460
        container.commit(repository=image)
461
462
    def _new_container_from_image(self, image_name, container_name):
463
        img = self.client.images.get(image_name)
464
        result = self.client.containers.run(
465
            img, "/usr/sbin/sshd -p {} -D".format(self.internal_ssh_port),
466
            name="{0}_{1}".format(self._name_stem, container_name),
467
            ports={"{}".format(self.internal_ssh_port): None},
468
            detach=True)
469
        return result
470
471
    def get_ip_address(self):
472
        container = self.current_container
473
        container.reload()
474
        container_ip = container.attrs["NetworkSettings"]["Networks"]["bridge"]["IPAddress"]
475
        if not container_ip:
476
            container_ip = 'localhost'
477
        return container_ip
478
479
    def _terminate_current_running_container_if_applicable(self):
480
        if self.containers:
481
            running_state = self.containers.pop()
482
            running_state.stop()
483
            running_state.remove()
484
485
    def _remove_image(self, image):
486
        self.client.images.remove(image)
487
488
    def _local_oscap_check_base_arguments(self):
489
        return ['oscap-docker', "container", self.current_container.id,
490
                'xccdf', 'eval']
491
492
    def _get_container_ports(self, container):
493
        raise NotImplementedError("This method shouldn't be needed.")
494
495
496
class PodmanTestEnv(ContainerTestEnv):
497
    # TODO: Rework this class using Podman Python bindings (python3-podman)
498
    # at the moment when their API will provide methods to run containers,
499
    # commit images and inspect containers
500
    name = "podman-based"
501
502
    def __init__(self, scanning_mode, image_name):
503
        super(PodmanTestEnv, self).__init__(scanning_mode, image_name)
504
505
    def _commit(self, container, image):
506
        podman_cmd = ["podman", "commit", container, image]
507
        try:
508
            subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
509
        except subprocess.CalledProcessError as e:
510
            msg = "Command '{0}' returned {1}:\n{2}".format(
511
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
512
            raise RuntimeError(msg)
513
514
    def _new_container_from_image(self, image_name, container_name):
515
        long_name = "{0}_{1}".format(self._name_stem, container_name)
516
        # Podman drops cap_audit_write which causes that it is not possible
517
        # run sshd by default. Therefore, we need to add the capability.
518
        # We also need cap_sys_admin so it can perform mount/umount.
519
        podman_cmd = ["podman", "run", "--name", long_name,
520
                      "--cap-add=cap_audit_write",
521
                      "--cap-add=cap_sys_admin",
522
                      "--cap-add=cap_sys_chroot",
523
                    #   "--privileged",
524
                      "--publish", "{}".format(self.internal_ssh_port), "--detach", image_name,
525
                      "/usr/sbin/sshd", "-p", "{}".format(self.internal_ssh_port), "-D"]
526
        try:
527
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
528
        except subprocess.CalledProcessError as e:
529
            msg = "Command '{0}' returned {1}:\n{2}".format(
530
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
531
            raise RuntimeError(msg)
532
        container_id = podman_output.decode("utf-8").strip()
533
        return container_id
534
535
    def get_ip_address(self):
536
        podman_cmd = [
537
                "podman", "inspect", self.current_container,
538
                "--format", "{{.NetworkSettings.IPAddress}}",
539
        ]
540
        try:
541
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
542
        except subprocess.CalledProcessError as e:
543
            msg = "Command '{0}' returned {1}:\n{2}".format(
544
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
545
            raise RuntimeError(msg)
546
        ip_address = podman_output.decode("utf-8").strip()
547
        if not ip_address:
548
            ip_address = "localhost"
549
        return ip_address
550
551
    def _get_container_ports(self, container):
552
        podman_cmd = ["podman", "inspect", container, "--format",
553
                      "{{json .NetworkSettings.Ports}}"]
554
        try:
555
            podman_output = subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
556
        except subprocess.CalledProcessError as e:
557
            msg = "Command '{0}' returned {1}:\n{2}".format(
558
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
559
            raise RuntimeError(msg)
560
        return self.extract_port_map(json.loads(podman_output))
561
562
    def extract_port_map(self, podman_network_data):
563
        if 'containerPort' in podman_network_data:
564
            container_port = podman_network_data['containerPort']
565
            host_port = podman_network_data['hostPort']
566
        else:
567
            container_port_with_protocol, host_data = podman_network_data.popitem()
568
            container_port = container_port_with_protocol.split("/")[0]
569
            host_port = host_data[0]['HostPort']
570
        port_map = {int(container_port): int(host_port)}
571
        return port_map
572
573
    def _terminate_current_running_container_if_applicable(self):
574
        if self.containers:
575
            running_state = self.containers.pop()
576
            podman_cmd = ["podman", "stop", running_state]
577
            try:
578
                subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
579
            except subprocess.CalledProcessError as e:
580
                msg = "Command '{0}' returned {1}:\n{2}".format(
581
                    " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
582
                raise RuntimeError(msg)
583
            podman_cmd = ["podman", "rm", running_state]
584
            try:
585
                subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
586
            except subprocess.CalledProcessError as e:
587
                msg = "Command '{0}' returned {1}:\n{2}".format(
588
                    " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
589
                raise RuntimeError(msg)
590
591
    def _remove_image(self, image):
592
        podman_cmd = ["podman", "rmi", image]
593
        try:
594
            subprocess.check_output(podman_cmd, stderr=subprocess.STDOUT)
595
        except subprocess.CalledProcessError as e:
596
            msg = "Command '{0}' returned {1}:\n{2}".format(
597
                " ".join(e.cmd), e.returncode, e.output.decode("utf-8"))
598
            raise RuntimeError(msg)
599
600
    def _local_oscap_check_base_arguments(self):
601
        raise NotImplementedError("OpenSCAP doesn't support offline scanning of Podman Containers")
602