Passed
Branch master (53139b)
by Matěj
02:25
created

ssg_test_suite.common._get_platform_cpes()   B

Complexity

Conditions 5

Size

Total Lines 24
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 5

Importance

Changes 0
Metric Value
cc 5
eloc 22
nop 1
dl 0
loc 24
ccs 18
cts 18
cp 1
crap 5
rs 8.8853
c 0
b 0
f 0
1 1
import logging
2 1
import subprocess
3 1
from collections import namedtuple
4 1
import enum
5 1
import functools
6 1
from ssg.constants import MULTI_PLATFORM_MAPPING
7 1
from ssg.constants import PRODUCT_TO_CPE_MAPPING
8 1
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING
9
10 1
Scenario_run = namedtuple(
11
    "Scenario_run",
12
    ("rule_id", "script"))
13 1
Scenario_conditions = namedtuple(
14
    "Scenario_conditions",
15
    ("backend", "scanning_mode", "remediated_by", "datastream"))
16
17
18 1
IGNORE_KNOWN_HOSTS_OPTIONS = (
19
    "-o", "StrictHostKeyChecking=no",
20
    "-o", "UserKnownHostsFile=/dev/null",
21
)
22
23
24 1
class Stage(enum.IntEnum):
25 1
    NONE = 0
26 1
    PREPARATION = 1
27 1
    INITIAL_SCAN = 2
28 1
    REMEDIATION = 3
29 1
    FINAL_SCAN = 4
30
31
32 1
@functools.total_ordering
33 1
class RuleResult(object):
34 1
    STAGE_STRINGS = {
35
        "preparation",
36
        "initial_scan",
37
        "remediation",
38
        "final_scan",
39
    }
40
41
    """
42
    Result of a test suite testing rule under a scenario.
43
44
    Supports ordering by success - the most successful run orders first.
45
    """
46 1
    def __init__(self, result_dict=None):
47 1
        self.scenario = Scenario_run("", "")
48 1
        self.conditions = Scenario_conditions("", "", "", "")
49 1
        self.when = ""
50 1
        self.passed_stages = dict()
51 1
        self.passed_stages_count = 0
52 1
        self.success = False
53
54 1
        if result_dict:
55 1
            self.load_from_dict(result_dict)
56
57 1
    def load_from_dict(self, data):
58 1
        self.scenario = Scenario_run(data["rule_id"], data["scenario_script"])
59 1
        self.conditions = Scenario_conditions(
60
            data["backend"], data["scanning_mode"],
61
            data["remediated_by"], data["datastream"])
62 1
        self.when = data["run_timestamp"]
63
64 1
        self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data}
65 1
        self.passed_stages_count = sum(self.passed_stages.values())
66
67 1
        self.success = data.get("final_scan", False)
68 1
        if not self.success:
69 1
            self.success = (
70
                "remediation" not in data
71
                and data.get("initial_scan", False))
72
73 1
    def save_to_dict(self):
74 1
        data = dict()
75 1
        data["rule_id"] = self.scenario.rule_id
76 1
        data["scenario_script"] = self.scenario.script
77
78 1
        data["backend"] = self.conditions.backend
79 1
        data["scanning_mode"] = self.conditions.scanning_mode
80 1
        data["remediated_by"] = self.conditions.remediated_by
81 1
        data["datastream"] = self.conditions.datastream
82
83 1
        data["run_timestamp"] = self.when
84
85 1
        for stage_str, result in self.passed_stages.items():
86 1
            data[stage_str] = result
87
88 1
        return data
89
90 1
    def record_stage_result(self, stage, successful):
91
        assert stage in self.STAGE_STRINGS, (
92
            "Stage name {name} is invalid, choose one from {choices}"
93
            .format(name=stage, choices=", ".join(self.STAGE_STRINGS))
94
        )
95
        self.passed_stages[stage] = successful
96
97 1
    def relative_conditions_to(self, other):
98 1
        if self.conditions == other.conditions:
99
            return self.when, other.when
100
        else:
101 1
            return tuple(self.conditions), tuple(other.conditions)
102
103 1
    def __eq__(self, other):
104 1
        return (self.success == other.success
105
                and tuple(self.passed_stages) == tuple(self.passed_stages))
106
107 1
    def __lt__(self, other):
108 1
        return self.passed_stages_count > other.passed_stages_count
109
110
111 1
def run_cmd_local(command, verbose_path, env=None):
112
    command_string = ' '.join(command)
113
    logging.debug('Running {}'.format(command_string))
114
    returncode, output = _run_cmd(command, verbose_path, env)
115
    return returncode, output
116
117
118 1
def run_cmd_remote(command_string, domain_ip, verbose_path, env=None):
119
    machine = 'root@{0}'.format(domain_ip)
120
    remote_cmd = ['ssh'] + list(IGNORE_KNOWN_HOSTS_OPTIONS) + [machine, command_string]
121
    logging.debug('Running {}'.format(command_string))
122
    returncode, output = _run_cmd(remote_cmd, verbose_path, env)
123
    return returncode, output
124
125
126 1
def _run_cmd(command_list, verbose_path, env=None):
127
    returncode = 0
128
    output = b""
129
    try:
130
        with open(verbose_path, 'w') as verbose_file:
131
            output = subprocess.check_output(
132
                command_list, stderr=verbose_file, env=env)
133
    except subprocess.CalledProcessError as e:
134
        returncode = e.returncode
135
        output = e.output
136
    return returncode, output.decode('utf-8')
137
138
139 1
def _get_platform_cpes(platform):
140 1
    if platform.startswith("multi_platform_"):
141 1
        try:
142 1
            products = MULTI_PLATFORM_MAPPING[platform]
143 1
        except KeyError:
144 1
            logging.error(
145
                "Unknown multi_platform specifier: %s is not from %s"
146
                % (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys())))
147 1
            raise ValueError
148 1
        platform_cpes = set()
149 1
        for p in products:
150 1
            platform_cpes |= set(PRODUCT_TO_CPE_MAPPING[p])
151 1
        return platform_cpes
152
    else:
153
        # scenario platform is specified by a full product name
154 1
        try:
155 1
            product = FULL_NAME_TO_PRODUCT_MAPPING[platform]
156 1
        except KeyError:
157 1
            logging.error(
158
                "Unknown product name: %s is not from %s"
159
                % (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys())))
160 1
            raise ValueError
161 1
        platform_cpes = set(PRODUCT_TO_CPE_MAPPING[product])
162 1
        return platform_cpes
163
164
165 1
def matches_platform(scenario_platforms, benchmark_cpes):
166 1
    if "multi_platform_all" in scenario_platforms:
167 1
        return True
168 1
    scenario_cpes = set()
169 1
    for p in scenario_platforms:
170 1
        scenario_cpes |= _get_platform_cpes(p)
171
    return len(scenario_cpes & benchmark_cpes) > 0
172