ospd.vtfilter   A
last analyzed

Complexity

Total Complexity 18

Size/Duplication

Total Lines 149
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 18
eloc 60
dl 0
loc 149
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A VtsFilter.parse_filters() 0 34 5
A VtsFilter.format_filter_value() 0 14 1
A VtsFilter.__init__() 0 11 1
C VtsFilter.get_filtered_vts_list() 0 42 9
A VtsFilter.format_vt_creation_time() 0 5 1
A VtsFilter.format_vt_modification_time() 0 5 1
1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" Vulnerability Test Filter class.
19
"""
20
import re
21
import operator
22
from typing import Dict, List, Optional
23
24
from ospd.errors import OspdCommandError
25
26
from .vts import Vts
27
28
29
class VtsFilter:
30
    """Helper class to filter Vulnerability Tests"""
31
32
    def __init__(self) -> None:
33
        """Initialize filter operator and allowed filters."""
34
        self.filter_operator = {
35
            '<': operator.lt,
36
            '>': operator.gt,
37
            '=': operator.eq,
38
        }
39
40
        self.allowed_filter = {
41
            'creation_time': self.format_vt_creation_time,
42
            'modification_time': self.format_vt_modification_time,
43
        }
44
45
    def parse_filters(self, vt_filter: str) -> List:
46
        """Parse a string containing one or more filters
47
        and return a list of filters
48
49
        Arguments:
50
            vt_filter (string): String containing filters separated with
51
                semicolon.
52
        Return:
53
            List with filters. Each filters is a list with 3 elements
54
            e.g. [arg, operator, value]
55
        """
56
57
        filter_list = vt_filter.split(';')
58
        filters = list()
59
60
        for single_filter in filter_list:
61
            filter_aux = re.split(r'(\W)', single_filter, 1)
62
63
            if len(filter_aux) < 3:
64
                raise OspdCommandError(
65
                    "Invalid number of argument in the filter", "get_vts"
66
                )
67
68
            _element, _oper, _val = filter_aux
69
70
            if _element not in self.allowed_filter:
71
                raise OspdCommandError("Invalid filter element", "get_vts")
72
73
            if _oper not in self.filter_operator:
74
                raise OspdCommandError("Invalid filter operator", "get_vts")
75
76
            filters.append(filter_aux)
77
78
        return filters
79
80
    def format_vt_creation_time(self, value):
81
        """In case the given creationdatetime value must be formatted,
82
        this function must be implemented by the wrapper
83
        """
84
        return value
85
86
    def format_vt_modification_time(self, value):
87
        """In case the given modification datetime value must be formatted,
88
        this function must be implemented by the wrapper
89
        """
90
        return value
91
92
    def format_filter_value(self, element: str, value: Dict):
93
        """Calls the specific function to format value,
94
        depending on the given element.
95
96
        Arguments:
97
            element (string): The element of the VT to be formatted.
98
            value (dictionary): The element value.
99
100
        Returns:
101
            Returns a formatted value.
102
103
        """
104
        format_func = self.allowed_filter.get(element)
105
        return format_func(value)
106
107
    def get_filtered_vts_list(
108
        self, vts: Vts, vt_filter: str
109
    ) -> Optional[List[str]]:
110
        """Gets a collection of vulnerability test from the vts dictionary,
111
        which match the filter.
112
113
        Arguments:
114
            vt_filter: Filter to apply to the vts collection.
115
            vts: The complete vts collection.
116
117
        Returns:
118
            List with filtered vulnerability tests. The list can be empty.
119
            None in case of filter parse failure.
120
        """
121
        if not vt_filter:
122
            raise OspdCommandError('vt_filter: A valid filter is required.')
123
124
        filters = self.parse_filters(vt_filter)
125
        if not filters:
126
            return None
127
128
        vt_oid_list = list(vts)
129
130
        for _element, _oper, _filter_val in filters:
131
            for vt_oid in vts:
132
                if vt_oid not in vt_oid_list:
133
                    continue
134
135
                vt = vts.get(vt_oid)
136
                if vt is None or not vt.get(_element):
137
                    vt_oid_list.remove(vt_oid)
138
                    continue
139
140
                _elem_val = vt.get(_element)
141
                _val = self.format_filter_value(_element, _elem_val)
142
143
                if self.filter_operator[_oper](_val, _filter_val):
144
                    continue
145
                else:
146
                    vt_oid_list.remove(vt_oid)
147
148
        return vt_oid_list
149