Completed
Push — master ( c63c4e...dfc09d )
by -
01:32
created

NaxsiRules.__validate_detection_str()   A

Complexity

Conditions 2

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2
Metric Value
cc 2
dl 0
loc 4
rs 10
ccs 4
cts 4
cp 1
crap 2
1 1
from time import strftime, localtime
2
3 1
from spike.model import db
4 1
from shlex import shlex
5
6
7 1
class NaxsiRules(db.Model):
8 1
    __bind_key__ = 'rules'
9 1
    __tablename__ = 'naxsi_rules'
10
11 1
    id = db.Column(db.Integer, primary_key=True)
12 1
    msg = db.Column(db.String(), nullable=False)
13 1
    detection = db.Column(db.String(1024), nullable=False)
14 1
    mz = db.Column(db.String(1024), nullable=False)
15 1
    score = db.Column(db.String(1024), nullable=False)
16 1
    sid = db.Column(db.Integer, nullable=False, unique=True)
17 1
    ruleset = db.Column(db.String(1024), nullable=False)
18 1
    rmks = db.Column(db.Text, nullable=True, server_default="")
19 1
    active = db.Column(db.Integer, nullable=False, server_default="1")
20 1
    negative = db.Column(db.Integer, nullable=False, server_default='0')
21 1
    timestamp = db.Column(db.Integer, nullable=False)
22
23 1
    mr_kw = ["MainRule", "BasicRule", "main_rule", "basic_rule"]
24 1
    static_mz = {"$ARGS_VAR", "$BODY_VAR", "$URL", "$HEADERS_VAR"}
25 1
    full_zones = {"ARGS", "BODY", "URL", "HEADERS", "FILE_EXT", "RAW_BODY"}
26 1
    rx_mz = {"$ARGS_VAR_X", "$BODY_VAR_X", "$URL_X", "$HEADERS_VAR_X"}
27 1
    sub_mz = list(static_mz) + list(full_zones) + list(rx_mz)
28
29 1
    def __init__(self, msg="", detection="", mz="", score="", sid='42000', ruleset="", rmks="", active=0, negative=0,
30
                 timestamp=0):
31 1
        self.msg = msg
32 1
        self.detection = detection
33 1
        self.mz = mz
34 1
        self.score = score
35 1
        self.sid = sid
36 1
        self.ruleset = ruleset
37 1
        self.rmks = rmks
38 1
        self.active = active
39 1
        self.negative = 1 if negative == 'checked' else 0
40 1
        self.timestamp = timestamp
41 1
        self.warnings = []
42 1
        self.error = []
43
44 1
    def fullstr(self):
45 1
        rdate = strftime("%F - %H:%M", localtime(float(str(self.timestamp))))
46 1
        rmks = "# ".join(self.rmks.strip().split("\n"))
47 1
        return "#\n# sid: {0} | date: {1}\n#\n# {2}\n#\n{3}".format(self.sid, rdate, rmks, self.__str__())
48
49 1
    def __str__(self):
50 1
        negate = 'negative' if self.negative == 1 else ''
51 1
        return 'MainRule {} "{}" "msg:{}" "mz:{}" "s:{}" id:{} ;'.format(
52
            negate, self.detection, self.msg, self.mz, self.score, self.sid)
53
54 1
    def explain(self):
55
        """ Return a string explaining the rule
56
57
         :return str: A textual explanation of the rule
58
         """
59 1
        translation = {'ARGS': 'argument', 'BODY': 'body', 'URL': 'url', 'HEADER': 'header',
60
                       'HEADER:Cookie': 'cookies'}
61 1
        explanation = 'The rule number <strong>{0}</strong> is '.format(self.sid)
62 1
        if self.negative:
63
            explanation += '<strong>not</strong> '
64 1
        explanation += 'setting the '
65
66 1
        scores = []
67 1
        for score in self.score.split(','):
68 1
            scores.append('<strong>{0}</strong> score to <strong>{1}</strong> '.format(*score.split(':', 1)))
69 1
        explanation += ', '.join(scores) + 'when it '
70 1
        if self.detection.startswith('str:'):
71 1
            explanation += 'finds the string <strong>{}</strong> '.format(self.detection[4:])
72
        else:
73 1
            explanation += 'matches the regexp <strong>{}</strong> in '.format(self.detection[3:])
74
75 1
        zones = []
76 1
        for mz in self.mz.split('|'):
77 1
            if mz.startswith('$'):
78 1
                current_zone, arg = mz.split(":", 1)
79 1
                zone_name = "?"
80
81 1
                for translated_name in translation:  # translate zone names
82 1
                    if translated_name in current_zone:
83 1
                        zone_name = translation[translated_name]
84
85 1
                if "$URL" in current_zone:
86
                    regexp = "matching regex" if current_zone == "$URL_X" else ""
87
                    zones.append("on the URL {} '{}' ".format(regexp, arg))
88
                else:
89 1
                    regexp = "matching regex" if current_zone.endswith("_X") else ""
90 1
                    if zone_name == 'header' and arg.lower() == 'cookie':
91 1
                        zones.append('in the <strong>cookies</strong>')
92
                    else:
93
                        zones.append("in the var with name {} '{}' of {} ".format(regexp, arg, zone_name))
94
            else:
95 1
                zones.append('the <strong>{0}</strong>'.format(translation[mz]))
96 1
        return explanation + ' ' + ', '.join(zones) + '.'
97
98 1
    def validate(self):
99 1
        self.warnings = list()
100 1
        self.error = list()
101
102 1
        self.__validate_matchzone(self.mz)
103 1
        self.__validate_id(self.sid)
104 1
        self.__validate_score(self.score)
105
106 1
        if self.detection.startswith('rx:'):
107 1
            self.__validate_detection_rx(self.detection)
108 1
        elif self.detection.startswith('str:'):
109 1
            self.__validate_detection_str(self.detection)
110
        else:
111 1
            self.error.append("Your 'detection' string must start with str: or rx:")
112
113 1
        if not self.msg:
114
            self.warnings.append("Rule has no 'msg:'.")
115 1
        if not self.score:
116
            self.error.append("Rule has no score.")
117
118 1
    def __fail(self, msg):
119 1
        self.error.append(msg)
120 1
        return False
121
122
    # Bellow are parsers for specific parts of a rule
123
124 1
    def __validate_detection_str(self, p_str, assign=False):
125 1
        if assign is True:
126 1
            self.detection = p_str
127 1
        return True
128
129 1
    def __validate_detection_rx(self, p_str, assign=False):
130 1
        if not p_str.islower():
131 1
            self.warnings.append("detection {} is not lower-case. naxsi is case-insensitive".format(p_str))
132
133 1
        try:  # try to validate the regex with PCRE's python bindings
134 1
            import pcre
135
            try:  # if we can't compile the regex, it's likely invalid
136
                pcre.compile(p_str[3:])
137
            except pcre.PCREError:
138
                return self.__fail("{} is not a valid regex:".format(p_str))
139 1
        except ImportError:  # python-pcre is an optional dependency
0 ignored issues
show
Unused Code introduced by
This except handler seems to be unused and could be removed.

Except handlers which only contain pass and do not have an else clause can usually simply be removed:

try:
    raises_exception()
except:  # Could be removed
    pass
Loading history...
140 1
            pass
141
142 1
        if assign is True:
143 1
            self.detection = p_str
144 1
        return True
145
146 1
    def __validate_score(self, p_str, assign=False):
147 1
        for score in p_str.split(','):
148 1
            if ':' not in score:
149
                self.__fail("You score '{}' has no value or name.".format(score))
150 1
            name, value = score.split(':')
151 1
            if not value.isdigit():
152
                self.__fail("Your value '{}' for your score '{}' is not numeric.".format(value, score))
153 1
            elif not name.startswith('$'):
154
                self.__fail("Your name '{}' for your score '{}' does not start with a '$'.".format(name, score))
155 1
        if assign:
156 1
            self.score = p_str
157 1
        return True
158
159 1
    def __validate_matchzone(self, p_str, assign=False):
160 1
        has_zone = False
161 1
        mz_state = set()
162 1
        for loc in p_str.split('|'):
163 1
            keyword, arg = loc, None
164 1
            if loc.startswith("$"):
165 1
                if loc.find(":") == -1:
166
                    return self.__fail("Missing 2nd part after ':' in {0}".format(loc))
167 1
                keyword, arg = loc.split(":")
168
169 1
            if keyword not in self.sub_mz:  # check if `keyword` is a valid keyword
170
                return self.__fail("'{0}' no a known sub-part of mz : {1}".format(keyword, self.sub_mz))
171
172 1
            mz_state.add(keyword)
173
174
            # verify that the rule doesn't attempt to target REGEX and STATIC _VAR/URL at the same time
175 1
            if len(self.rx_mz & mz_state) and len(self.static_mz & mz_state):
176 1
                return self.__fail("You can't mix static $* with regex $*_X ({})".format(', '.join(mz_state)))
177
178 1
            if arg and not arg.islower():  # just a gentle reminder
179 1
                self.warnings.append("{0} in {1} is not lowercase. naxsi is case-insensitive".format(arg, loc))
180
181
            # the rule targets an actual zone
182 1
            if keyword not in ["$URL", "$URL_X"] and keyword in (self.rx_mz | self.full_zones | self.static_mz):
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after in.
Loading history...
183 1
                has_zone = True
184
185 1
        if has_zone is False:
186
            return self.__fail("The rule/whitelist doesn't target any zone.")
187
188 1
        if assign is True:
189 1
            self.mz = p_str
190
191 1
        return True
192
193 1
    def __validate_id(self, p_str, assign=False):
194 1
        try:
195 1
            num = int(p_str)
196 1
            if num < 10000:
197 1
                self.warnings.append("rule IDs below 10k are reserved ({0})".format(num))
198
        except ValueError:
199
            self.error.append("id:{0} is not numeric".format(p_str))
200
            return False
201 1
        if assign is True:
202 1
            self.sid = num
203 1
        return True
204
205 1
    @staticmethod
206
    def splitter(full_str):
207 1
        lexer = shlex(full_str)
208 1
        lexer.whitespace_split = True
209 1
        return list(iter(lexer.get_token, ''))
210
211 1
    def parse_rule(self, full_str):
212
        """
213
        Parse and validate a full naxsi rule
214
        :param full_str: raw rule
215
        :return: [True|False, dict]
216
        """
217 1
        self.warnings = list()
218 1
        self.error = list()
219
220 1
        func_map = {"id:": self.__validate_id, "str:": self.__validate_detection_str,
221
                    "rx:": self.__validate_detection_rx, "msg:": lambda p_str, assign=False: True,
222
                    "mz:": self.__validate_matchzone, "negative": lambda p_str, assign=False: p_str == 'checked',
223
                    "s:": self.__validate_score}
224
225 1
        split = self.splitter(full_str)  # parse string
226 1
        intersection = set(split).intersection(set(self.mr_kw))
227
228 1
        if not intersection:
229
            return self.__fail("No mainrule/basicrule keyword.")
230 1
        elif len(intersection) > 1:
231
            return self.__fail("Multiple mainrule/basicrule keywords.")
232
233 1
        split.remove(intersection.pop())  # remove the mainrule/basicrule keyword
234
235 1
        if ";" in split:
236 1
            split.remove(";")
237
238 1
        for keyword in split:
239 1
            keyword = keyword.strip()
240
241 1
            if keyword.endswith(";"):  # remove semi-colons
242 1
                keyword = keyword[:-1]
243 1
            if keyword.startswith(('"', "'")) and (keyword[0] == keyword[-1]):  # remove (double-)quotes
244 1
                keyword = keyword[1:-1]
245
246 1
            parsed = False
247 1
            for frag_kw in func_map:
248 1
                if keyword.startswith(frag_kw):  # use the right parser
249 1
                    if frag_kw in ('rx:', 'str:'):  # don't remove the leading "str:" or "rx:"
250 1
                        payload = keyword
251
                    else:
252 1
                        payload = keyword[len(frag_kw):]
253
254 1
                    function = func_map[frag_kw]  # we're using an array of functions, C style!
255 1
                    if function(payload, assign=True) is True:
256 1
                        parsed = True
257 1
                        break
258 1
                    return self.__fail("parsing of element '{0}' failed.".format(keyword))
259
260 1
            if parsed is False:  # we have an item that wasn't successfully parsed
261
                return self.__fail("'{}' is an invalid element and thus can not be parsed.".format(keyword))
262
        return True
263