Passed
Push — master ( efe542...1a3d0a )
by Matěj
01:02 queued 11s
created

ssg_test_suite.common.RuleResult.__eq__()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1 1
import logging
2 1
import subprocess
3 1
from collections import namedtuple
4 1
import functools
5 1
from ssg.constants import MULTI_PLATFORM_MAPPING
6 1
from ssg.constants import PRODUCT_TO_CPE_MAPPING
7 1
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING
8
9 1
Scenario_run = namedtuple(
10
    "Scenario_run",
11
    ("rule_id", "script"))
12 1
Scenario_conditions = namedtuple(
13
    "Scenario_conditions",
14
    ("backend", "scanning_mode", "remediated_by", "datastream"))
15
16
17 1
IGNORE_KNOWN_HOSTS_OPTIONS = (
18
    "-o", "StrictHostKeyChecking=no",
19
    "-o", "UserKnownHostsFile=/dev/null",
20
)
21
22
23 1
class Stage(object):
24 1
    NONE = 0
25 1
    PREPARATION = 1
26 1
    INITIAL_SCAN = 2
27 1
    REMEDIATION = 3
28 1
    FINAL_SCAN = 4
29
30
31 1
@functools.total_ordering
32 1
class RuleResult(object):
33 1
    STAGE_STRINGS = {
34
        "preparation",
35
        "initial_scan",
36
        "remediation",
37
        "final_scan",
38
    }
39
40
    """
41
    Result of a test suite testing rule under a scenario.
42
43
    Supports ordering by success - the most successful run orders first.
44
    """
45 1
    def __init__(self, result_dict=None):
46 1
        self.scenario = Scenario_run("", "")
47 1
        self.conditions = Scenario_conditions("", "", "", "")
48 1
        self.when = ""
49 1
        self.passed_stages = dict()
50 1
        self.passed_stages_count = 0
51 1
        self.success = False
52
53 1
        if result_dict:
54 1
            self.load_from_dict(result_dict)
55
56 1
    def load_from_dict(self, data):
57 1
        self.scenario = Scenario_run(data["rule_id"], data["scenario_script"])
58 1
        self.conditions = Scenario_conditions(
59
            data["backend"], data["scanning_mode"],
60
            data["remediated_by"], data["datastream"])
61 1
        self.when = data["run_timestamp"]
62
63 1
        self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data}
64 1
        self.passed_stages_count = sum(self.passed_stages.values())
65
66 1
        self.success = data.get("final_scan", False)
67 1
        if not self.success:
68 1
            self.success = (
69
                "remediation" not in data
70
                and data.get("initial_scan", False))
71
72 1
    def save_to_dict(self):
73 1
        data = dict()
74 1
        data["rule_id"] = self.scenario.rule_id
75 1
        data["scenario_script"] = self.scenario.script
76
77 1
        data["backend"] = self.conditions.backend
78 1
        data["scanning_mode"] = self.conditions.scanning_mode
79 1
        data["remediated_by"] = self.conditions.remediated_by
80 1
        data["datastream"] = self.conditions.datastream
81
82 1
        data["run_timestamp"] = self.when
83
84 1
        for stage_str, result in self.passed_stages.items():
85 1
            data[stage_str] = result
86
87 1
        return data
88
89 1
    def record_stage_result(self, stage, successful):
90
        assert stage in self.STAGE_STRINGS, (
91
            "Stage name {name} is invalid, choose one from {choices}"
92
            .format(name=stage, choices=", ".join(self.STAGE_STRINGS))
93
        )
94
        self.passed_stages[stage] = successful
95
96 1
    def relative_conditions_to(self, other):
97 1
        if self.conditions == other.conditions:
98
            return self.when, other.when
99
        else:
100 1
            return tuple(self.conditions), tuple(other.conditions)
101
102 1
    def __eq__(self, other):
103 1
        return (self.success == other.success
104
                and tuple(self.passed_stages) == tuple(self.passed_stages))
105
106 1
    def __lt__(self, other):
107 1
        return self.passed_stages_count > other.passed_stages_count
108
109
110 1
def run_cmd_local(command, verbose_path, env=None):
111
    command_string = ' '.join(command)
112
    logging.debug('Running {}'.format(command_string))
113
    returncode, output = _run_cmd(command, verbose_path, env)
114
    return returncode, output
115
116
117 1
def run_cmd_remote(command_string, domain_ip, verbose_path, env=None):
118
    machine = 'root@{0}'.format(domain_ip)
119
    remote_cmd = ['ssh'] + list(IGNORE_KNOWN_HOSTS_OPTIONS) + [machine, command_string]
120
    logging.debug('Running {}'.format(command_string))
121
    returncode, output = _run_cmd(remote_cmd, verbose_path, env)
122
    return returncode, output
123
124
125 1
def _run_cmd(command_list, verbose_path, env=None):
126
    returncode = 0
127
    output = b""
128
    try:
129
        with open(verbose_path, 'w') as verbose_file:
130
            output = subprocess.check_output(
131
                command_list, stderr=verbose_file, env=env)
132
    except subprocess.CalledProcessError as e:
133
        returncode = e.returncode
134
        output = e.output
135
    return returncode, output.decode('utf-8')
136
137
138 1
def _get_platform_cpes(platform):
139 1
    if platform.startswith("multi_platform_"):
140 1
        try:
141 1
            products = MULTI_PLATFORM_MAPPING[platform]
142 1
        except KeyError:
143 1
            logging.error(
144
                "Unknown multi_platform specifier: %s is not from %s"
145
                % (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys())))
146 1
            raise ValueError
147 1
        platform_cpes = set()
148 1
        for p in products:
149 1
            platform_cpes |= set(PRODUCT_TO_CPE_MAPPING[p])
150 1
        return platform_cpes
151
    else:
152
        # scenario platform is specified by a full product name
153 1
        try:
154 1
            product = FULL_NAME_TO_PRODUCT_MAPPING[platform]
155 1
        except KeyError:
156 1
            logging.error(
157
                "Unknown product name: %s is not from %s"
158
                % (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys())))
159 1
            raise ValueError
160 1
        platform_cpes = set(PRODUCT_TO_CPE_MAPPING[product])
161 1
        return platform_cpes
162
163
164 1
def matches_platform(scenario_platforms, benchmark_cpes):
165 1
    if "multi_platform_all" in scenario_platforms:
166 1
        return True
167 1
    scenario_cpes = set()
168 1
    for p in scenario_platforms:
169 1
        scenario_cpes |= _get_platform_cpes(p)
170
    return len(scenario_cpes & benchmark_cpes) > 0
171