Passed
Branch master (53139b)
by Matěj
02:07
created

ssg_test_suite.common._get_platform_cpes()   B

Complexity

Conditions 5

Size

Total Lines 24
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 22
nop 1
dl 0
loc 24
rs 8.8853
c 0
b 0
f 0
1
import logging
2
import subprocess
3
from collections import namedtuple
4
import enum
5
import functools
6
from ssg.constants import MULTI_PLATFORM_MAPPING
7
from ssg.constants import PRODUCT_TO_CPE_MAPPING
8
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING
9
10
Scenario_run = namedtuple(
11
    "Scenario_run",
12
    ("rule_id", "script"))
13
Scenario_conditions = namedtuple(
14
    "Scenario_conditions",
15
    ("backend", "scanning_mode", "remediated_by", "datastream"))
16
17
18
IGNORE_KNOWN_HOSTS_OPTIONS = (
19
    "-o", "StrictHostKeyChecking=no",
20
    "-o", "UserKnownHostsFile=/dev/null",
21
)
22
23
24
class Stage(enum.IntEnum):
25
    NONE = 0
26
    PREPARATION = 1
27
    INITIAL_SCAN = 2
28
    REMEDIATION = 3
29
    FINAL_SCAN = 4
30
31
32
@functools.total_ordering
33
class RuleResult(object):
34
    STAGE_STRINGS = {
35
        "preparation",
36
        "initial_scan",
37
        "remediation",
38
        "final_scan",
39
    }
40
41
    """
42
    Result of a test suite testing rule under a scenario.
43
44
    Supports ordering by success - the most successful run orders first.
45
    """
46
    def __init__(self, result_dict=None):
47
        self.scenario = Scenario_run("", "")
48
        self.conditions = Scenario_conditions("", "", "", "")
49
        self.when = ""
50
        self.passed_stages = dict()
51
        self.passed_stages_count = 0
52
        self.success = False
53
54
        if result_dict:
55
            self.load_from_dict(result_dict)
56
57
    def load_from_dict(self, data):
58
        self.scenario = Scenario_run(data["rule_id"], data["scenario_script"])
59
        self.conditions = Scenario_conditions(
60
            data["backend"], data["scanning_mode"],
61
            data["remediated_by"], data["datastream"])
62
        self.when = data["run_timestamp"]
63
64
        self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data}
65
        self.passed_stages_count = sum(self.passed_stages.values())
66
67
        self.success = data.get("final_scan", False)
68
        if not self.success:
69
            self.success = (
70
                "remediation" not in data
71
                and data.get("initial_scan", False))
72
73
    def save_to_dict(self):
74
        data = dict()
75
        data["rule_id"] = self.scenario.rule_id
76
        data["scenario_script"] = self.scenario.script
77
78
        data["backend"] = self.conditions.backend
79
        data["scanning_mode"] = self.conditions.scanning_mode
80
        data["remediated_by"] = self.conditions.remediated_by
81
        data["datastream"] = self.conditions.datastream
82
83
        data["run_timestamp"] = self.when
84
85
        for stage_str, result in self.passed_stages.items():
86
            data[stage_str] = result
87
88
        return data
89
90
    def record_stage_result(self, stage, successful):
91
        assert stage in self.STAGE_STRINGS, (
92
            "Stage name {name} is invalid, choose one from {choices}"
93
            .format(name=stage, choices=", ".join(self.STAGE_STRINGS))
94
        )
95
        self.passed_stages[stage] = successful
96
97
    def relative_conditions_to(self, other):
98
        if self.conditions == other.conditions:
99
            return self.when, other.when
100
        else:
101
            return tuple(self.conditions), tuple(other.conditions)
102
103
    def __eq__(self, other):
104
        return (self.success == other.success
105
                and tuple(self.passed_stages) == tuple(self.passed_stages))
106
107
    def __lt__(self, other):
108
        return self.passed_stages_count > other.passed_stages_count
109
110
111
def run_cmd_local(command, verbose_path, env=None):
112
    command_string = ' '.join(command)
113
    logging.debug('Running {}'.format(command_string))
114
    returncode, output = _run_cmd(command, verbose_path, env)
115
    return returncode, output
116
117
118
def run_cmd_remote(command_string, domain_ip, verbose_path, env=None):
119
    machine = 'root@{0}'.format(domain_ip)
120
    remote_cmd = ['ssh'] + list(IGNORE_KNOWN_HOSTS_OPTIONS) + [machine, command_string]
121
    logging.debug('Running {}'.format(command_string))
122
    returncode, output = _run_cmd(remote_cmd, verbose_path, env)
123
    return returncode, output
124
125
126
def _run_cmd(command_list, verbose_path, env=None):
127
    returncode = 0
128
    output = b""
129
    try:
130
        with open(verbose_path, 'w') as verbose_file:
131
            output = subprocess.check_output(
132
                command_list, stderr=verbose_file, env=env)
133
    except subprocess.CalledProcessError as e:
134
        returncode = e.returncode
135
        output = e.output
136
    return returncode, output.decode('utf-8')
137
138
139
def _get_platform_cpes(platform):
140
    if platform.startswith("multi_platform_"):
141
        try:
142
            products = MULTI_PLATFORM_MAPPING[platform]
143
        except KeyError:
144
            logging.error(
145
                "Unknown multi_platform specifier: %s is not from %s"
146
                % (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys())))
147
            raise ValueError
148
        platform_cpes = set()
149
        for p in products:
150
            platform_cpes |= set(PRODUCT_TO_CPE_MAPPING[p])
151
        return platform_cpes
152
    else:
153
        # scenario platform is specified by a full product name
154
        try:
155
            product = FULL_NAME_TO_PRODUCT_MAPPING[platform]
156
        except KeyError:
157
            logging.error(
158
                "Unknown product name: %s is not from %s"
159
                % (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys())))
160
            raise ValueError
161
        platform_cpes = set(PRODUCT_TO_CPE_MAPPING[product])
162
        return platform_cpes
163
164
165
def matches_platform(scenario_platforms, benchmark_cpes):
166
    if "multi_platform_all" in scenario_platforms:
167
        return True
168
    scenario_cpes = set()
169
    for p in scenario_platforms:
170
        scenario_cpes |= _get_platform_cpes(p)
171
    return len(scenario_cpes & benchmark_cpes) > 0
172