Completed
Push — master ( 832d7e...77348d )
by -
01:42
created

NaxsiRules   F

Complexity

Total Complexity 68

Size/Duplication

Total Lines 249
Duplicated Lines 0 %

Test Coverage

Coverage 82.16%
Metric Value
dl 0
loc 249
ccs 152
cts 185
cp 0.8216
rs 2.9411
wmc 68

13 Methods

Rating   Name   Duplication   Size   Complexity  
A fullstr() 0 4 1
A __str__() 0 4 2
A __init__() 0 14 2
F explain() 0 39 11
B __validate_detection_rx() 0 16 5
B __validate_score() 0 12 6
F __validate_matchzone() 0 33 13
A splitter() 0 5 1
F parse_rule() 0 52 15
B validate() 0 16 5
A __validate_detection_str() 0 4 2
A __fail() 0 3 1
A __validate_id() 0 11 4

How to fix   Complexity   

Complex Class

Complex classes like NaxsiRules often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from time import strftime, localtime
2
3 1
from spike.model import db
4 1
from shlex import shlex
5
6
7 1
class NaxsiRules(db.Model):
8 1
    __bind_key__ = 'rules'
9 1
    __tablename__ = 'naxsi_rules'
10
11 1
    id = db.Column(db.Integer, primary_key=True)
12 1
    msg = db.Column(db.String(), nullable=False)
13 1
    detection = db.Column(db.String(1024), nullable=False)
14 1
    mz = db.Column(db.String(1024), nullable=False)
15 1
    score = db.Column(db.String(1024), nullable=False)
16 1
    sid = db.Column(db.Integer, nullable=False, unique=True)
17 1
    ruleset = db.Column(db.String(1024), nullable=False)
18 1
    rmks = db.Column(db.Text, nullable=True, server_default="")
19 1
    active = db.Column(db.Integer, nullable=False, server_default="1")
20 1
    negative = db.Column(db.Integer, nullable=False, server_default='0')
21 1
    timestamp = db.Column(db.Integer, nullable=False)
22
23 1
    mr_kw = ["MainRule", "BasicRule", "main_rule", "basic_rule"]
24 1
    static_mz = {"$ARGS_VAR", "$BODY_VAR", "$URL", "$HEADERS_VAR"}
25 1
    full_zones = {"ARGS", "BODY", "URL", "HEADERS", "FILE_EXT", "RAW_BODY"}
26 1
    rx_mz = {"$ARGS_VAR_X", "$BODY_VAR_X", "$URL_X", "$HEADERS_VAR_X"}
27 1
    sub_mz = list(static_mz) + list(full_zones) + list(rx_mz)
28
29 1
    def __init__(self, msg="", detection="", mz="", score="", sid=42000, ruleset="", rmks="", active=0, negative=0,
30
                 timestamp=0):
31 1
        self.msg = msg
32 1
        self.detection = detection
33 1
        self.mz = mz
34 1
        self.score = score
35 1
        self.sid = sid
36 1
        self.ruleset = ruleset
37 1
        self.rmks = rmks
38 1
        self.active = active
39 1
        self.negative = 1 if negative == 'checked' else 0
40 1
        self.timestamp = timestamp
41 1
        self.warnings = []
42 1
        self.error = []
43
44 1
    def fullstr(self):
45 1
        rdate = strftime("%F - %H:%M", localtime(float(str(self.timestamp))))
46 1
        rmks = "# ".join(self.rmks.strip().split("\n"))
47 1
        return "#\n# sid: {0} | date: {1}\n#\n# {2}\n#\n{3}".format(self.sid, rdate, rmks, self.__str__())
48
49 1
    def __str__(self):
50 1
        negate = 'negative' if self.negative == 1 else ''
51 1
        return 'MainRule {} "{}" "msg:{}" "mz:{}" "s:{}" id:{} ;'.format(
52
            negate, self.detection, self.msg, self.mz, self.score, self.sid)
53
54 1
    def explain(self):
55
        """ Return a string explaining the rule
56
57
         :return str: A textual explanation of the rule
58
         """
59 1
        translation = {'ARGS': 'argument', 'BODY': 'body', 'URL': 'url', 'HEADER': 'header'}
60 1
        explanation = 'The rule number <strong>%d</strong> is ' % self.sid
61 1
        if self.negative:
62
            explanation += '<strong>not</strong> '
63 1
        explanation += 'setting the '
64
65 1
        scores = []
66 1
        for score in self.score.split(','):
67 1
            scores.append('<strong>{0}</strong> to <strong>{1}</strong> '.format(*score.split(':', 1)))
68 1
        explanation += ', '.join(scores) + 'when it '
69 1
        if self.detection.startswith('str:'):
70 1
            explanation += 'finds the string <strong>{}</strong> '.format(self.detection[4:])
71
        else:
72
            explanation += 'matches the regexp <strong>{}</strong> '.format(self.detection[3:])
73
74 1
        zones = []
75 1
        for mz in self.mz.split('|'):
76 1
            if mz.startswith('$'):
77
                current_zone, arg = mz.split(":", 1)
78
                zone_name = "?"
79
80
                for translated_name in translation:  # translate zone names
81
                    if translated_name in current_zone:
82
                        zone_name = translation[translated_name]
83
84
                if "$URL" in current_zone:
85
                    regexp = "matching regex" if current_zone == "$URL_X" else ""
86
                    explanation += "on the URL {} '{}' ".format(regexp, arg)
87
                else:
88
                    regexp = "matching regex" if current_zone.endswith("_X") else ""
89
                    explanation += "in the var with name {} '{}' of {} ".format(regexp, arg, zone_name)
90
            else:
91 1
                zones.append('the <strong>{0}</strong>'.format(translation[mz]))
92 1
        return explanation
93
94 1
    def validate(self):
95 1
        self.__validate_matchzone(self.mz)
96 1
        self.__validate_id(self.sid)
97 1
        self.__validate_score(self.score)
98
99 1
        if self.detection.startswith('rx:'):
100
            self.__validate_detection_rx(self.detection)
101 1
        elif self.detection.startswith('str:'):
102 1
            self.__validate_detection_str(self.detection)
103
        else:
104 1
            self.error.append("Your 'detection' string must start with str: or rx:")
105
106 1
        if not self.msg:
107
            self.warnings.append("Rule has no 'msg:'.")
108 1
        if not self.score:
109
            self.error.append("Rule has no score.")
110
111 1
    def __fail(self, msg):
112 1
        self.error.append(msg)
113 1
        return False
114
115
    # Bellow are parsers for specific parts of a rule
116
117 1
    def __validate_detection_str(self, p_str, assign=False):
118 1
        if assign is True:
119
            self.detection += p_str
120 1
        return True
121
122 1
    def __validate_detection_rx(self, p_str, assign=False):
123 1
        if not p_str.islower():
124
            self.warnings.append("detection {} is not lower-case. naxsi is case-insensitive".format(p_str))
125
126 1
        try:  # try to validate the regex with PCRE's python bindings
127 1
            import pcre
128
            try:  # if we can't compile the regex, it's likely invalid
129
                pcre.compile(p_str[3:])
130
            except pcre.PCREError:
131
                return self.__fail("{} is not a valid regex:".format(p_str))
132 1
        except ImportError:  # python-pcre is an optional dependency
0 ignored issues
show
Unused Code introduced by
This except handler seems to be unused and could be removed.

Except handlers which only contain pass and do not have an else clause can usually simply be removed:

try:
    raises_exception()
except:  # Could be removed
    pass
Loading history...
133 1
            pass
134
135 1
        if assign is True:
136 1
            self.detection += p_str
137 1
        return True
138
139 1
    def __validate_score(self, p_str, assign=False):
140 1
        for score in p_str.split(','):
141 1
            if ':' not in score:
142
                self.__fail("You score '{}' has no value or name.".format(score))
143 1
            name, value = score.split(':')
144 1
            if not value.isdigit():
145
                self.__fail("Your value '{}' for your score '{}' is not numeric.".format(value, score))
146 1
            elif not name.startswith('$'):
147
                self.__fail("Your name '{}' for your score '{}' does not start with a '$'.".format(name, score))
148 1
        if assign:
149 1
            self.score = p_str
150 1
        return True
151
152 1
    def __validate_matchzone(self, p_str, assign=False):
153 1
        has_zone = False
154 1
        mz_state = set()
155 1
        for loc in p_str.split('|'):
156 1
            keyword, arg = loc, None
157 1
            if loc.startswith("$"):
158 1
                if loc.find(":") == -1:
159
                    return self.__fail("Missing 2nd part after ':' in {0}".format(loc))
160 1
                keyword, arg = loc.split(":")
161
162 1
            if keyword not in self.sub_mz:  # check if `keyword` is a valid keyword
163
                return self.__fail("'{0}' no a known sub-part of mz : {1}".format(keyword, self.sub_mz))
164
165 1
            mz_state.add(keyword)
166
167
            # verify that the rule doesn't attempt to target REGEX and STATIC _VAR/URL at the same time
168 1
            if len(self.rx_mz & mz_state) and len(self.static_mz & mz_state):
169 1
                return self.__fail("You can't mix static $* with regex $*_X ({})".format(', '.join(mz_state)))
170
171 1
            if arg and not arg.islower():  # just a gentle reminder
172 1
                self.warnings.append("{0} in {1} is not lowercase. naxsi is case-insensitive".format(arg, loc))
173
174
            # the rule targets an actual zone
175 1
            if keyword not in ["$URL", "$URL_X"] and keyword in (self.rx_mz | self.full_zones | self.static_mz):
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after in.
Loading history...
176 1
                has_zone = True
177
178 1
        if has_zone is False:
179
            return self.__fail("The rule/whitelist doesn't target any zone.")
180
181 1
        if assign is True:
182 1
            self.mz = p_str
183
184 1
        return True
185
186 1
    def __validate_id(self, p_str, assign=False):
187 1
        try:
188 1
            num = int(p_str)
189 1
            if num < 10000:
190 1
                self.warnings.append("rule IDs below 10k are reserved ({0})".format(num))
191
        except ValueError:
192
            self.error.append("id:{0} is not numeric".format(p_str))
193
            return False
194 1
        if assign is True:
195 1
            self.sid = num
196 1
        return True
197
198 1
    @staticmethod
199
    def splitter(full_str):
200 1
        lexer = shlex(full_str)
201 1
        lexer.whitespace_split = True
202 1
        return list(iter(lexer.get_token, ''))
203
204 1
    def parse_rule(self, full_str):
205
        """
206
        Parse and validate a full naxsi rule
207
        :param full_str: raw rule
208
        :return: [True|False, dict]
209
        """
210 1
        self.warnings = []
211 1
        self.error = []
212
213 1
        func_map = {"id:": self.__validate_id, "str:": self.__validate_detection_str,
214
                    "rx:": self.__validate_detection_rx, "msg:": lambda p_str, assign=False: True,
215
                    "mz:": self.__validate_matchzone, "negative": lambda p_str, assign=False: True,
216
                    "s:": self.__validate_score}
217
218 1
        split = self.splitter(full_str)  # parse string
219 1
        intersection = set(split).intersection(set(self.mr_kw))
220
221 1
        if not intersection:
222
            return self.__fail("No mainrule/basicrule keyword.")
223 1
        elif len(intersection) > 1:
224
            return self.__fail("Multiple mainrule/basicrule keywords.")
225
226 1
        split.remove(intersection.pop())  # remove the mainrule/basicrule keyword
227
228 1
        if ";" in split:
229 1
            split.remove(";")
230
231 1
        while split:  # iterate while there is data, as handlers can defer
232 1
            for keyword in split:
233 1
                orig_kw = keyword
234 1
                keyword = keyword.strip()
235
236 1
                if keyword.endswith(";"):  # remove semi-colons
237 1
                    keyword = keyword[:-1]
238 1
                if keyword.startswith(('"', "'")) and (keyword[0] == keyword[-1]):  # remove (double-)quotes
239 1
                    keyword = keyword[1:-1]
240
241 1
                parsed = False
242 1
                for frag_kw in func_map:
243 1
                    if keyword.startswith(frag_kw):  # use the right parser
244 1
                        function = func_map[frag_kw]
245 1
                        payload = keyword[len(frag_kw):]
246
247 1
                        if function(payload, assign=True) is True:
248 1
                            parsed = True
249 1
                            split.remove(orig_kw)
250 1
                            break
251 1
                        return self.__fail("parsing of element '{0}' failed.".format(keyword))
252
253 1
                if parsed is False:  # we have an item that wasn't successfully parsed
254
                    return self.__fail("'{}' is an invalid element and thus can not be parsed.".format(keyword))
255
        return True
256