Completed
Push — master ( 18fabe...7154a7 )
by -
01:30
created

NaxsiRules.__validate_score()   B

Complexity

Conditions 6

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 6.5625
Metric Value
cc 6
dl 0
loc 12
ccs 9
cts 12
cp 0.75
crap 6.5625
rs 8
1 1
from time import strftime, localtime
2
3 1
from spike.model import db
4 1
from shlex import shlex
5
6
7 1
class NaxsiRules(db.Model):
8 1
    __bind_key__ = 'rules'
9 1
    __tablename__ = 'naxsi_rules'
10
11 1
    id = db.Column(db.Integer, primary_key=True)
12 1
    msg = db.Column(db.String(), nullable=False)
13 1
    detection = db.Column(db.String(1024), nullable=False)
14 1
    mz = db.Column(db.String(1024), nullable=False)
15 1
    score = db.Column(db.String(1024), nullable=False)
16 1
    sid = db.Column(db.Integer, nullable=False, unique=True)
17 1
    ruleset = db.Column(db.String(1024), nullable=False)
18 1
    rmks = db.Column(db.Text, nullable=True, server_default="")
19 1
    active = db.Column(db.Integer, nullable=False, server_default="1")
20 1
    negative = db.Column(db.Integer, nullable=False, server_default='0')
21 1
    timestamp = db.Column(db.Integer, nullable=False)
22
23 1
    mr_kw = ["MainRule", "BasicRule", "main_rule", "basic_rule"]
24 1
    static_mz = {"$ARGS_VAR", "$BODY_VAR", "$URL", "$HEADERS_VAR"}
25 1
    full_zones = {"ARGS", "BODY", "URL", "HEADERS", "FILE_EXT", "RAW_BODY"}
26 1
    rx_mz = {"$ARGS_VAR_X", "$BODY_VAR_X", "$URL_X", "$HEADERS_VAR_X"}
27 1
    sub_mz = list(static_mz) + list(full_zones) + list(rx_mz)
28
29 1
    def __init__(self, msg="", detection="", mz="", score="", sid='42000', ruleset="", rmks="", active=0, negative=0,
30
                 timestamp=0):
31 1
        self.msg = msg
32 1
        self.detection = detection
33 1
        self.mz = mz
34 1
        self.score = score
35 1
        self.sid = sid
36 1
        self.ruleset = ruleset
37 1
        self.rmks = rmks
38 1
        self.active = active
39 1
        self.negative = 1 if negative == 'checked' else 0
40 1
        self.timestamp = timestamp
41 1
        self.warnings = []
42 1
        self.error = []
43
44 1
    def fullstr(self):
45 1
        rdate = strftime("%F - %H:%M", localtime(float(str(self.timestamp))))
46 1
        rmks = "# ".join(self.rmks.strip().split("\n"))
47 1
        return "#\n# sid: {0} | date: {1}\n#\n# {2}\n#\n{3}".format(self.sid, rdate, rmks, self.__str__())
48
49 1
    def __str__(self):
50 1
        negate = 'negative' if self.negative == 1 else ''
51 1
        return 'MainRule {} "{}" "msg:{}" "mz:{}" "s:{}" id:{} ;'.format(
52
            negate, self.detection, self.msg, self.mz, self.score, self.sid)
53
54 1
    def explain(self):
55
        """ Return a string explaining the rule
56
57
         :return str: A textual explanation of the rule
58
         """
59 1
        translation = {'ARGS': 'argument', 'BODY': 'body', 'URL': 'url', 'HEADER': 'header',
60
                       'HEADER:Cookie': 'cookies'}
61 1
        explanation = 'The rule number <strong>{0}</strong> is '.format(self.sid)
62 1
        if self.negative:
63
            explanation += '<strong>not</strong> '
64 1
        explanation += 'setting the '
65
66 1
        scores = []
67 1
        for score in self.score.split(','):
68 1
            scores.append('<strong>{0}</strong> score to <strong>{1}</strong> '.format(*score.split(':', 1)))
69 1
        explanation += ', '.join(scores) + 'when it '
70 1
        if self.detection.startswith('str:'):
71 1
            explanation += 'finds the string <strong>{}</strong> '.format(self.detection[4:])
72
        else:
73 1
            explanation += 'matches the regexp <strong>{}</strong> in '.format(self.detection[3:])
74
75 1
        zones = []
76 1
        for mz in self.mz.split('|'):
77 1
            if mz.startswith('$'):
78 1
                current_zone, arg = mz.split(":", 1)
79 1
                zone_name = "?"
80
81 1
                for translated_name in translation:  # translate zone names
82 1
                    if translated_name in current_zone:
83 1
                        zone_name = translation[translated_name]
84
85 1
                if "$URL" in current_zone:
86
                    regexp = "matching regex" if current_zone == "$URL_X" else ""
87
                    zones.append("on the URL {} '{}' ".format(regexp, arg))
88
                else:
89 1
                    regexp = "matching regex" if current_zone.endswith("_X") else ""
90 1
                    if zone_name == 'header' and arg.lower() == 'cookie':
91 1
                        zones.append('in the <strong>cookies</strong>')
92
                    else:
93
                        zones.append("in the var with name {} '{}' of {} ".format(regexp, arg, zone_name))
94
            else:
95 1
                zones.append('the <strong>{0}</strong>'.format(translation[mz]))
96 1
        return explanation + ' ' + ', '.join(zones) + '.'
97
98 1
    def validate(self):
99 1
        self.warnings = list()
100 1
        self.error = list()
101
102 1
        self.__validate_matchzone(self.mz)
103 1
        self.__validate_id(self.sid)
104 1
        self.__validate_score(self.score)
105
106 1
        if self.detection.startswith('rx:'):
107 1
            self.__validate_detection_rx(self.detection)
108 1
        elif self.detection.startswith('str:'):
109 1
            self.__validate_detection_str(self.detection)
110
        else:
111 1
            self.error.append("Your 'detection' string must start with str: or rx:")
112
113 1
        if not self.msg:
114
            self.warnings.append("Rule has no 'msg:'.")
115 1
        if not self.score:
116
            self.error.append("Rule has no score.")
117 1
        if not self.mz:
118
            self.error.append("Rule has no match zone.")
119
120 1
    def __fail(self, msg):
121 1
        self.error.append(msg)
122 1
        return False
123
124
    # Bellow are parsers for specific parts of a rule
125
126 1
    def __validate_detection_str(self, p_str, assign=False):
127 1
        if assign is True:
128 1
            self.detection = p_str
129 1
        return True
130
131 1
    def __validate_detection_rx(self, p_str, assign=False):
132 1
        if not p_str.islower():
133 1
            self.warnings.append("detection {} is not lower-case. naxsi is case-insensitive".format(p_str))
134
135 1
        try:  # try to validate the regex with PCRE's python bindings
136 1
            import pcre
137
            try:  # if we can't compile the regex, it's likely invalid
138
                pcre.compile(p_str[3:])
139
            except pcre.PCREError:
140
                return self.__fail("{} is not a valid regex:".format(p_str))
141 1
        except ImportError:  # python-pcre is an optional dependency
0 ignored issues
show
Unused Code introduced by
This except handler seems to be unused and could be removed.

Except handlers which only contain pass and do not have an else clause can usually simply be removed:

try:
    raises_exception()
except:  # Could be removed
    pass
Loading history...
142 1
            pass
143
144 1
        if assign is True:
145 1
            self.detection = p_str
146 1
        return True
147
148 1
    def __validate_score(self, p_str, assign=False):
149 1
        for score in p_str.split(','):
150 1
            if ':' not in score:
151
                self.__fail("You score '{}' has no value or name.".format(score))
152 1
            name, value = score.split(':')
153 1
            if not value.isdigit():
154
                self.__fail("Your value '{}' for your score '{}' is not numeric.".format(value, score))
155 1
            elif not name.startswith('$'):
156
                self.__fail("Your name '{}' for your score '{}' does not start with a '$'.".format(name, score))
157 1
        if assign:
158 1
            self.score = p_str
159 1
        return True
160
161 1
    def __validate_matchzone(self, p_str, assign=False):
162 1
        has_zone = False
163 1
        mz_state = set()
164 1
        for loc in p_str.split('|'):
165 1
            keyword, arg = loc, None
166 1
            if loc.startswith("$"):
167 1
                if loc.find(":") == -1:
168
                    return self.__fail("Missing 2nd part after ':' in {0}".format(loc))
169 1
                keyword, arg = loc.split(":")
170
171 1
            if keyword not in self.sub_mz:  # check if `keyword` is a valid keyword
172 1
                return self.__fail("'{0}' is not a known sub-part of mz : {1}".format(keyword, self.sub_mz))
173
174 1
            mz_state.add(keyword)
175
176
            # verify that the rule doesn't attempt to target REGEX and STATIC _VAR/URL at the same time
177 1
            if len(self.rx_mz & mz_state) and len(self.static_mz & mz_state):
178 1
                return self.__fail("You can't mix static $* with regex $*_X ({})".format(', '.join(mz_state)))
179
180 1
            if arg and not arg.islower():  # just a gentle reminder
181 1
                self.warnings.append("{0} in {1} is not lowercase. naxsi is case-insensitive".format(arg, loc))
182
183
            # the rule targets an actual zone
184 1
            if keyword not in ["$URL", "$URL_X"] and keyword in (self.rx_mz | self.full_zones | self.static_mz):
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after in.
Loading history...
185 1
                has_zone = True
186
187 1
        if has_zone is False:
188
            return self.__fail("The rule/whitelist doesn't target any zone.")
189
190 1
        if assign is True:
191 1
            self.mz = p_str
192
193 1
        return True
194
195 1
    def __validate_id(self, p_str, assign=False):
196 1
        try:
197 1
            num = int(p_str)
198 1
            if num < 10000:
199 1
                self.warnings.append("rule IDs below 10k are reserved ({0})".format(num))
200 1
        except ValueError:
201 1
            self.error.append("id:{0} is not numeric".format(p_str))
202 1
            return False
203 1
        if assign is True:
204 1
            self.sid = num
205 1
        return True
206
207 1
    @staticmethod
208
    def splitter(full_str):
209 1
        lexer = shlex(full_str)
210 1
        lexer.whitespace_split = True
211 1
        return list(iter(lexer.get_token, ''))
212
213 1
    def parse_rule(self, full_str):
214
        """
215
        Parse and validate a full naxsi rule
216
        :param full_str: raw rule
217
        :return: [True|False, dict]
218
        """
219 1
        self.warnings = list()
220 1
        self.error = list()
221
222 1
        func_map = {"id:": self.__validate_id, "str:": self.__validate_detection_str,
223
                    "rx:": self.__validate_detection_rx, "msg:": lambda p_str, assign=False: True,
224
                    "mz:": self.__validate_matchzone, "negative": lambda p_str, assign=False: p_str == 'checked',
225
                    "s:": self.__validate_score}
226
227 1
        split = self.splitter(full_str)  # parse string
228 1
        intersection = set(split).intersection(set(self.mr_kw))
229
230 1
        if not intersection:
231 1
            return self.__fail("No mainrule/basicrule keyword.")
232 1
        elif len(intersection) > 1:
233 1
            return self.__fail("Multiple mainrule/basicrule keywords.")
234
235 1
        split.remove(intersection.pop())  # remove the mainrule/basicrule keyword
236
237 1
        if ";" in split:
238 1
            split.remove(";")
239
240 1
        for keyword in split:
241 1
            keyword = keyword.strip()
242
243 1
            if keyword.endswith(";"):  # remove semi-colons
244 1
                keyword = keyword[:-1]
245 1
            if keyword.startswith(('"', "'")) and (keyword[0] == keyword[-1]):  # remove (double-)quotes
246 1
                keyword = keyword[1:-1]
247
248 1
            parsed = False
249 1
            for frag_kw in func_map:
250 1
                if keyword.startswith(frag_kw):  # use the right parser
251 1
                    if frag_kw in ('rx:', 'str:'):  # don't remove the leading "str:" or "rx:"
252 1
                        payload = keyword
253
                    else:
254 1
                        payload = keyword[len(frag_kw):]
255
256 1
                    function = func_map[frag_kw]  # we're using an array of functions, C style!
257 1
                    if function(payload, assign=True) is True:
258 1
                        parsed = True
259 1
                        break
260 1
                    return self.__fail("parsing of element '{0}' failed.".format(keyword))
261
262 1
            if parsed is False:  # we have an item that wasn't successfully parsed
263 1
                return self.__fail("'{}' is an invalid element and thus can not be parsed.".format(keyword))
264
        return True
265