Completed
Push — master ( 41374a...ac8e27 )
by -
01:29
created

NaxsiRules.validate()   C

Complexity

Conditions 7

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 7.457
Metric Value
cc 7
dl 0
loc 23
ccs 15
cts 19
cp 0.7895
crap 7.457
rs 5.5
1 1
from time import strftime, localtime
2
3 1
from spike.model import db
4 1
from shlex import shlex
5
6
7 1
class NaxsiRules(db.Model):
8 1
    __bind_key__ = 'rules'
9 1
    __tablename__ = 'naxsi_rules'
10
11 1
    id = db.Column(db.Integer, primary_key=True)
12 1
    msg = db.Column(db.String(), nullable=False)
13 1
    detection = db.Column(db.String(1024), nullable=False)
14 1
    mz = db.Column(db.String(1024), nullable=False)
15 1
    score = db.Column(db.String(1024), nullable=False)
16 1
    sid = db.Column(db.Integer, nullable=False, unique=True)
17 1
    ruleset = db.Column(db.String(1024), nullable=False)
18 1
    rmks = db.Column(db.Text, nullable=True, server_default="")
19 1
    active = db.Column(db.Integer, nullable=False, server_default="1")
20 1
    negative = db.Column(db.Integer, nullable=False, server_default='0')
21 1
    timestamp = db.Column(db.Integer, nullable=False)
22
23 1
    mr_kw = ["MainRule", "BasicRule", "main_rule", "basic_rule"]
24 1
    static_mz = {"$ARGS_VAR", "$BODY_VAR", "$URL", "$HEADERS_VAR"}
25 1
    full_zones = {"ARGS", "BODY", "URL", "HEADERS", "FILE_EXT", "RAW_BODY"}
26 1
    rx_mz = {"$ARGS_VAR_X", "$BODY_VAR_X", "$URL_X", "$HEADERS_VAR_X"}
27 1
    sub_mz = list(static_mz) + list(full_zones) + list(rx_mz)
28
29 1
    def __init__(self, msg="", detection="str:a", mz="ARGS", score="$None:0", sid='42000', ruleset="", rmks="",
30
                 active=0, negative=False, timestamp=0):
31 1
        self.msg = msg
32 1
        self.detection = detection
33 1
        self.mz = mz
34 1
        self.score = score
35 1
        self.sid = sid
36 1
        self.ruleset = ruleset
37 1
        self.rmks = rmks
38 1
        self.active = active
39 1
        self.negative = negative
40 1
        self.timestamp = timestamp
41 1
        self.warnings = []
42 1
        self.error = []
43
44 1
    def fullstr(self):
45 1
        rdate = strftime("%F - %H:%M", localtime(float(str(self.timestamp))))
46 1
        rmks = "# ".join(self.rmks.strip().split("\n"))
47 1
        return "#\n# sid: {0} | date: {1}\n#\n# {2}\n#\n{3}".format(self.sid, rdate, rmks, self.__str__())
48
49 1
    def __str__(self):
50 1
        return 'MainRule {} "{}" "msg:{}" "mz:{}" "s:{}" id:{} ;'.format(
51
            'negative' if self.negative else '', self.detection, self.msg, self.mz, self.score, self.sid)
52
53 1
    def explain(self):
54
        """ Return a string explaining the rule
55
56
         :return str: A textual explanation of the rule
57
         """
58 1
        translation = {'ARGS': 'argument', 'BODY': 'body', 'URL': 'url', 'HEADER': 'header',
59
                       'HEADER:Cookie': 'cookies'}
60 1
        explanation = 'The rule number <strong>{0}</strong> is '.format(self.sid)
61 1
        if self.negative:
62 1
            explanation += '<strong>not</strong> '
63 1
        explanation += 'setting the '
64
65 1
        scores = []
66 1
        for score in self.score.split(','):
67 1
            scores.append('<strong>{0}</strong> score to <strong>{1}</strong> '.format(*score.split(':', 1)))
68 1
        explanation += ', '.join(scores) + 'when it '
69 1
        if self.detection.startswith('str:'):
70 1
            explanation += 'finds the string <strong>{}</strong> '.format(self.detection[4:])
71
        else:
72 1
            explanation += 'matches the regexp <strong>{}</strong> in '.format(self.detection[3:])
73
74 1
        zones = []
75 1
        for mz in self.mz.split('|'):
76 1
            if mz.startswith('$'):
77 1
                current_zone, arg = mz.split(":", 1)
78 1
                zone_name = "?"
79
80 1
                for translated_name in translation:  # translate zone names
81 1
                    if translated_name in current_zone:
82 1
                        zone_name = translation[translated_name]
83
84 1
                if "$URL" in current_zone:
85
                    regexp = "matching regex" if current_zone == "$URL_X" else ""
86
                    zones.append("on the URL {} '{}' ".format(regexp, arg))
87
                else:
88 1
                    regexp = "matching regex" if current_zone.endswith("_X") else ""
89 1
                    if zone_name == 'header' and arg.lower() == 'cookie':
90 1
                        zones.append('in the <strong>cookies</strong>')
91
                    else:
92
                        zones.append("in the var with name {} '{}' of {} ".format(regexp, arg, zone_name))
93
            else:
94 1
                zones.append('the <strong>{0}</strong>'.format(translation[mz]))
95 1
        return explanation + ' ' + ', '.join(zones) + '.'
96
97 1
    def validate(self):
98 1
        self.warnings = list()
99 1
        self.error = list()
100
101 1
        self.__validate_matchzone(self.mz)
102 1
        self.__validate_id(self.sid)
103 1
        self.__validate_score(self.score)
104
105 1
        if self.detection.startswith('rx:'):
106 1
            self.__validate_detection_rx(self.detection)
107 1
        elif self.detection.startswith('str:'):
108 1
            self.__validate_detection_str(self.detection)
109
        else:
110 1
            self.error.append("Your 'detection' string must start with str: or rx:")
111
112 1
        if not self.msg:
113
            self.warnings.append("Rule has no 'msg:'.")
114 1
        if not self.score:
115
            self.error.append("Rule has no score.")
116 1
        elif not self.mz:
117
            self.error.append("Rule has no match zone.")
118 1
        elif not self.sid:
119
            self.error.append("Rule has no sid.")
120
121 1
    def __fail(self, msg):
122 1
        self.error.append(msg)
123 1
        return False
124
125
    # Bellow are parsers for specific parts of a rule
126
127 1
    def __validate_detection_str(self, p_str, assign=False):
128 1
        if assign is True:
129
            self.detection = p_str
130 1
        return True
131
132 1
    def __validate_detection_rx(self, p_str, assign=False):
133 1
        if not p_str.islower():
134 1
            self.warnings.append("detection {} is not lower-case. naxsi is case-insensitive".format(p_str))
135
136 1
        try:  # try to validate the regex with PCRE's python bindings
137 1
            import pcre
138
            try:  # if we can't compile the regex, it's likely invalid
139
                pcre.compile(p_str[3:])
140
            except pcre.PCREError:
141
                return self.__fail("{} is not a valid regex:".format(p_str))
142 1
        except ImportError:  # python-pcre is an optional dependency
0 ignored issues
show
Unused Code introduced by
This except handler seems to be unused and could be removed.

Except handlers which only contain pass and do not have an else clause can usually simply be removed:

try:
    raises_exception()
except:  # Could be removed
    pass
Loading history...
143 1
            pass
144
145 1
        if assign is True:
146 1
            self.detection = p_str
147 1
        return True
148
149 1
    def __validate_score(self, p_str, assign=False):
150 1
        for score in p_str.split(','):
151 1
            if ':' not in score:
152
                self.__fail("You score '{}' has no value or name.".format(score))
153 1
            name, value = score.split(':')
154 1
            if not value.isdigit():
155
                self.__fail("Your value '{}' for your score '{}' is not numeric.".format(value, score))
156 1
            elif not name.startswith('$'):
157
                self.__fail("Your name '{}' for your score '{}' does not start with a '$'.".format(name, score))
158 1
        if assign:
159 1
            self.score = p_str
160 1
        return True
161
162 1
    def __validate_matchzone(self, p_str, assign=False):
163 1
        has_zone = False
164 1
        mz_state = set()
165 1
        for loc in p_str.split('|'):
166 1
            keyword, arg = loc, None
167 1
            if loc.startswith("$"):
168 1
                if loc.find(":") == -1:
169
                    return self.__fail("Missing 2nd part after ':' in {0}".format(loc))
170 1
                keyword, arg = loc.split(":")
171
172 1
            if keyword not in self.sub_mz:  # check if `keyword` is a valid keyword
173 1
                return self.__fail("'{0}' is not a known sub-part of mz : {1}".format(keyword, self.sub_mz))
174
175 1
            mz_state.add(keyword)
176
177
            # verify that the rule doesn't attempt to target REGEX and STATIC _VAR/URL at the same time
178 1
            if len(self.rx_mz & mz_state) and len(self.static_mz & mz_state):
179 1
                return self.__fail("You can't mix static $* with regex $*_X ({})".format(', '.join(mz_state)))
180
181 1
            if arg and not arg.islower():  # just a gentle reminder
182 1
                self.warnings.append("{0} in {1} is not lowercase. naxsi is case-insensitive".format(arg, loc))
183
184
            # the rule targets an actual zone
185 1
            if keyword not in ["$URL", "$URL_X"] and keyword in (self.rx_mz | self.full_zones | self.static_mz):
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after in.
Loading history...
186 1
                has_zone = True
187
188 1
        if has_zone is False:
189
            return self.__fail("The rule/whitelist doesn't target any zone.")
190
191 1
        if assign is True:
192 1
            self.mz = p_str
193
194 1
        return True
195
196 1
    def __validate_id(self, p_str, assign=False):
197 1
        try:
198 1
            num = int(p_str)
199 1
            if num < 10000:
200 1
                self.warnings.append("rule IDs below 10k are reserved ({0})".format(num))
201 1
        except ValueError:
202 1
            self.error.append("id:{0} is not numeric".format(p_str))
203 1
            return False
204 1
        if assign is True:
205 1
            self.sid = num
206 1
        return True
207
208 1
    @staticmethod
209
    def splitter(full_str):
210 1
        lexer = shlex(full_str)
211 1
        lexer.whitespace_split = True
212 1
        return list(iter(lexer.get_token, ''))
213
214 1
    def parse_rule(self, full_str):
215
        """
216
        Parse and validate a full naxsi rule
217
        :param full_str: raw rule
218
        :return: [True|False, dict]
219
        """
220 1
        self.warnings = list()
221 1
        self.error = list()
222
223 1
        func_map = {"id:": self.__validate_id, "str:": self.__validate_detection_str,
224
                    "rx:": self.__validate_detection_rx, "msg:": lambda p_str, assign=False: True,
225
                    "mz:": self.__validate_matchzone, "negative": lambda p_str, assign=False: p_str == 'checked',
226
                    "s:": self.__validate_score}
227
228 1
        split = self.splitter(full_str)  # parse string
229 1
        intersection = set(split).intersection(set(self.mr_kw))
230
231 1
        if not intersection:
232 1
            return self.__fail("No mainrule/basicrule keyword.")
233 1
        elif len(intersection) > 1:
234 1
            return self.__fail("Multiple mainrule/basicrule keywords.")
235
236 1
        split.remove(intersection.pop())  # remove the mainrule/basicrule keyword
237
238 1
        if ";" in split:
239 1
            split.remove(";")
240
241 1
        for keyword in split:
242 1
            keyword = keyword.strip()
243
244 1
            if keyword.endswith(";"):  # remove semi-colons
245 1
                keyword = keyword[:-1]
246 1
            if keyword.startswith(('"', "'")) and (keyword[0] == keyword[-1]):  # remove (double-)quotes
247 1
                keyword = keyword[1:-1]
248
249 1
            parsed = False
250 1
            for frag_kw in func_map:
251 1
                if keyword.startswith(frag_kw):  # use the right parser
252 1
                    if frag_kw in ('rx:', 'str:'):  # don't remove the leading "str:" or "rx:"
253 1
                        payload = keyword
254
                    else:
255 1
                        payload = keyword[len(frag_kw):]
256
257 1
                    function = func_map[frag_kw]  # we're using an array of functions, C style!
258 1
                    if function(payload, assign=True) is True:
259 1
                        parsed = True
260 1
                        break
261 1
                    return self.__fail("parsing of element '{0}' failed.".format(keyword))
262
263 1
            if parsed is False:  # we have an item that wasn't successfully parsed
264 1
                return self.__fail("'{}' is an invalid element and thus can not be parsed.".format(keyword))
265
        return True
266