|
1
|
1 |
|
from time import strftime, localtime |
|
2
|
|
|
|
|
3
|
1 |
|
from spike.model import db |
|
4
|
1 |
|
from shlex import shlex |
|
5
|
|
|
|
|
6
|
|
|
|
|
7
|
1 |
|
class NaxsiRules(db.Model): |
|
8
|
1 |
|
__bind_key__ = 'rules' |
|
9
|
1 |
|
__tablename__ = 'naxsi_rules' |
|
10
|
|
|
|
|
11
|
1 |
|
id = db.Column(db.Integer, primary_key=True) |
|
12
|
1 |
|
msg = db.Column(db.String(), nullable=False) |
|
13
|
1 |
|
detection = db.Column(db.String(1024), nullable=False) |
|
14
|
1 |
|
mz = db.Column(db.String(1024), nullable=False) |
|
15
|
1 |
|
score = db.Column(db.String(1024), nullable=False) |
|
16
|
1 |
|
sid = db.Column(db.Integer, nullable=False, unique=True) |
|
17
|
1 |
|
ruleset = db.Column(db.String(1024), nullable=False) |
|
18
|
1 |
|
rmks = db.Column(db.Text, nullable=True, server_default="") |
|
19
|
1 |
|
active = db.Column(db.Integer, nullable=False, server_default="1") |
|
20
|
1 |
|
negative = db.Column(db.Integer, nullable=False, server_default='0') |
|
21
|
1 |
|
timestamp = db.Column(db.Integer, nullable=False) |
|
22
|
|
|
|
|
23
|
1 |
|
mr_kw = ["MainRule", "BasicRule", "main_rule", "basic_rule"] |
|
24
|
1 |
|
static_mz = {"$ARGS_VAR", "$BODY_VAR", "$URL", "$HEADERS_VAR"} |
|
25
|
1 |
|
full_zones = {"ARGS", "BODY", "URL", "HEADERS", "FILE_EXT", "RAW_BODY"} |
|
26
|
1 |
|
rx_mz = {"$ARGS_VAR_X", "$BODY_VAR_X", "$URL_X", "$HEADERS_VAR_X"} |
|
27
|
1 |
|
sub_mz = list(static_mz) + list(full_zones) + list(rx_mz) |
|
28
|
|
|
|
|
29
|
1 |
|
def __init__(self, msg="", detection="str:a", mz="ARGS", score="$None:0", sid='42000', ruleset="", rmks="", |
|
30
|
|
|
active=0, negative=False, timestamp=0): |
|
31
|
1 |
|
self.msg = msg |
|
32
|
1 |
|
self.detection = detection |
|
33
|
1 |
|
self.mz = mz |
|
34
|
1 |
|
self.score = score |
|
35
|
1 |
|
self.sid = sid |
|
36
|
1 |
|
self.ruleset = ruleset |
|
37
|
1 |
|
self.rmks = rmks |
|
38
|
1 |
|
self.active = active |
|
39
|
1 |
|
self.negative = negative |
|
40
|
1 |
|
self.timestamp = timestamp |
|
41
|
1 |
|
self.warnings = [] |
|
42
|
1 |
|
self.error = [] |
|
43
|
|
|
|
|
44
|
1 |
|
def fullstr(self): |
|
45
|
1 |
|
rdate = strftime("%F - %H:%M", localtime(float(str(self.timestamp)))) |
|
46
|
1 |
|
rmks = "# ".join(self.rmks.strip().split("\n")) |
|
47
|
1 |
|
return "#\n# sid: {0} | date: {1}\n#\n# {2}\n#\n{3}".format(self.sid, rdate, rmks, self.__str__()) |
|
48
|
|
|
|
|
49
|
1 |
|
def __str__(self): |
|
50
|
1 |
|
return 'MainRule {} "{}" "msg:{}" "mz:{}" "s:{}" id:{} ;'.format( |
|
51
|
|
|
'negative' if self.negative else '', self.detection, self.msg, self.mz, self.score, self.sid) |
|
52
|
|
|
|
|
53
|
1 |
|
def explain(self): |
|
54
|
|
|
""" Return a string explaining the rule |
|
55
|
|
|
|
|
56
|
|
|
:return str: A textual explanation of the rule |
|
57
|
|
|
""" |
|
58
|
1 |
|
translation = {'ARGS': 'argument', 'BODY': 'body', 'URL': 'url', 'HEADER': 'header', |
|
59
|
|
|
'HEADER:Cookie': 'cookies'} |
|
60
|
1 |
|
explanation = 'The rule number <strong>{0}</strong> is '.format(self.sid) |
|
61
|
1 |
|
if self.negative: |
|
62
|
1 |
|
explanation += '<strong>not</strong> ' |
|
63
|
1 |
|
explanation += 'setting the ' |
|
64
|
|
|
|
|
65
|
1 |
|
scores = [] |
|
66
|
1 |
|
for score in self.score.split(','): |
|
67
|
1 |
|
scores.append('<strong>{0}</strong> score to <strong>{1}</strong> '.format(*score.split(':', 1))) |
|
68
|
1 |
|
explanation += ', '.join(scores) + 'when it ' |
|
69
|
1 |
|
if self.detection.startswith('str:'): |
|
70
|
1 |
|
explanation += 'finds the string <strong>{}</strong> '.format(self.detection[4:]) |
|
71
|
|
|
else: |
|
72
|
1 |
|
explanation += 'matches the regexp <strong>{}</strong> in '.format(self.detection[3:]) |
|
73
|
|
|
|
|
74
|
1 |
|
zones = [] |
|
75
|
1 |
|
for mz in self.mz.split('|'): |
|
76
|
1 |
|
if mz.startswith('$'): |
|
77
|
1 |
|
current_zone, arg = mz.split(":", 1) |
|
78
|
1 |
|
zone_name = "?" |
|
79
|
|
|
|
|
80
|
1 |
|
for translated_name in translation: # translate zone names |
|
81
|
1 |
|
if translated_name in current_zone: |
|
82
|
1 |
|
zone_name = translation[translated_name] |
|
83
|
|
|
|
|
84
|
1 |
|
if "$URL" in current_zone: |
|
85
|
|
|
regexp = "matching regex" if current_zone == "$URL_X" else "" |
|
86
|
|
|
zones.append("on the URL {} '{}' ".format(regexp, arg)) |
|
87
|
|
|
else: |
|
88
|
1 |
|
regexp = "matching regex" if current_zone.endswith("_X") else "" |
|
89
|
1 |
|
if zone_name == 'header' and arg.lower() == 'cookie': |
|
90
|
1 |
|
zones.append('in the <strong>cookies</strong>') |
|
91
|
|
|
else: |
|
92
|
|
|
zones.append("in the var with name {} '{}' of {} ".format(regexp, arg, zone_name)) |
|
93
|
|
|
else: |
|
94
|
1 |
|
zones.append('the <strong>{0}</strong>'.format(translation[mz])) |
|
95
|
1 |
|
return explanation + ' ' + ', '.join(zones) + '.' |
|
96
|
|
|
|
|
97
|
1 |
|
def validate(self): |
|
98
|
1 |
|
self.warnings = list() |
|
99
|
1 |
|
self.error = list() |
|
100
|
|
|
|
|
101
|
1 |
|
self.__validate_matchzone(self.mz) |
|
102
|
1 |
|
self.__validate_id(self.sid) |
|
103
|
1 |
|
self.__validate_score(self.score) |
|
104
|
|
|
|
|
105
|
1 |
|
if self.detection.startswith('rx:'): |
|
106
|
1 |
|
self.__validate_detection_rx(self.detection) |
|
107
|
1 |
|
elif self.detection.startswith('str:'): |
|
108
|
1 |
|
self.__validate_detection_str(self.detection) |
|
109
|
|
|
else: |
|
110
|
1 |
|
self.error.append("Your 'detection' string must start with str: or rx:") |
|
111
|
|
|
|
|
112
|
1 |
|
if not self.msg: |
|
113
|
|
|
self.warnings.append("Rule has no 'msg:'.") |
|
114
|
1 |
|
if not self.score: |
|
115
|
|
|
self.error.append("Rule has no score.") |
|
116
|
1 |
|
elif not self.mz: |
|
117
|
|
|
self.error.append("Rule has no match zone.") |
|
118
|
1 |
|
elif not self.sid: |
|
119
|
|
|
self.error.append("Rule has no sid.") |
|
120
|
|
|
|
|
121
|
1 |
|
def __fail(self, msg): |
|
122
|
1 |
|
self.error.append(msg) |
|
123
|
1 |
|
return False |
|
124
|
|
|
|
|
125
|
|
|
# Bellow are parsers for specific parts of a rule |
|
126
|
|
|
|
|
127
|
1 |
|
def __validate_detection_str(self, p_str, assign=False): |
|
128
|
1 |
|
if assign is True: |
|
129
|
1 |
|
self.detection = p_str |
|
130
|
1 |
|
return True |
|
131
|
|
|
|
|
132
|
1 |
|
def __validate_msg(self, p_str, assign=False): |
|
133
|
1 |
|
if assign is True: |
|
134
|
1 |
|
self.msg = p_str |
|
135
|
|
|
return True |
|
136
|
1 |
|
|
|
137
|
1 |
|
def __validate_detection_rx(self, p_str, assign=False): |
|
138
|
|
|
if not p_str.islower(): |
|
139
|
|
|
self.warnings.append("detection {} is not lower-case. naxsi is case-insensitive".format(p_str)) |
|
140
|
|
|
|
|
141
|
|
|
try: # try to validate the regex with PCRE's python bindings |
|
142
|
1 |
|
import pcre |
|
143
|
1 |
|
try: # if we can't compile the regex, it's likely invalid |
|
144
|
|
|
pcre.compile(p_str[3:]) |
|
145
|
1 |
|
except pcre.PCREError: |
|
146
|
1 |
|
return self.__fail("{} is not a valid regex:".format(p_str)) |
|
147
|
1 |
|
except ImportError: # python-pcre is an optional dependency |
|
|
|
|
|
|
148
|
|
|
pass |
|
149
|
1 |
|
|
|
150
|
1 |
|
if assign is True: |
|
151
|
1 |
|
self.detection = p_str |
|
152
|
|
|
return True |
|
153
|
1 |
|
|
|
154
|
1 |
|
def __validate_score(self, p_str, assign=False): |
|
155
|
|
|
for score in p_str.split(','): |
|
156
|
1 |
|
if ':' not in score: |
|
157
|
|
|
self.__fail("You score '{}' has no value or name.".format(score)) |
|
158
|
1 |
|
name, value = score.split(':') |
|
159
|
1 |
|
if not value.isdigit(): |
|
160
|
1 |
|
self.__fail("Your value '{}' for your score '{}' is not numeric.".format(value, score)) |
|
161
|
|
|
elif not name.startswith('$'): |
|
162
|
1 |
|
self.__fail("Your name '{}' for your score '{}' does not start with a '$'.".format(name, score)) |
|
163
|
1 |
|
if assign: |
|
164
|
1 |
|
self.score = p_str |
|
165
|
1 |
|
return True |
|
166
|
1 |
|
|
|
167
|
1 |
|
def __validate_matchzone(self, p_str, assign=False): |
|
168
|
1 |
|
has_zone = False |
|
169
|
|
|
mz_state = set() |
|
170
|
1 |
|
for loc in p_str.split('|'): |
|
171
|
|
|
keyword, arg = loc, None |
|
172
|
1 |
|
if loc.startswith("$"): |
|
173
|
1 |
|
if loc.find(":") == -1: |
|
174
|
|
|
return self.__fail("Missing 2nd part after ':' in {0}".format(loc)) |
|
175
|
1 |
|
keyword, arg = loc.split(":") |
|
176
|
|
|
|
|
177
|
|
|
if keyword not in self.sub_mz: # check if `keyword` is a valid keyword |
|
178
|
1 |
|
return self.__fail("'{0}' is not a known sub-part of mz : {1}".format(keyword, self.sub_mz)) |
|
179
|
1 |
|
|
|
180
|
|
|
mz_state.add(keyword) |
|
181
|
1 |
|
|
|
182
|
1 |
|
# verify that the rule doesn't attempt to target REGEX and STATIC _VAR/URL at the same time |
|
183
|
|
|
if len(self.rx_mz & mz_state) and len(self.static_mz & mz_state): |
|
184
|
|
|
return self.__fail("You can't mix static $* with regex $*_X ({})".format(', '.join(mz_state))) |
|
185
|
1 |
|
|
|
186
|
1 |
|
if arg and not arg.islower(): # just a gentle reminder |
|
187
|
|
|
self.warnings.append("{0} in {1} is not lowercase. naxsi is case-insensitive".format(arg, loc)) |
|
188
|
1 |
|
|
|
189
|
|
|
# the rule targets an actual zone |
|
190
|
|
|
if keyword not in ["$URL", "$URL_X"] and keyword in (self.rx_mz | self.full_zones | self.static_mz): |
|
|
|
|
|
|
191
|
1 |
|
has_zone = True |
|
192
|
1 |
|
|
|
193
|
|
|
if has_zone is False: |
|
194
|
1 |
|
return self.__fail("The rule/whitelist doesn't target any zone.") |
|
195
|
|
|
|
|
196
|
1 |
|
if assign is True: |
|
197
|
1 |
|
self.mz = p_str |
|
198
|
1 |
|
|
|
199
|
1 |
|
return True |
|
200
|
1 |
|
|
|
201
|
1 |
|
def __validate_id(self, p_str, assign=False): |
|
202
|
1 |
|
try: |
|
203
|
1 |
|
num = int(p_str) |
|
204
|
1 |
|
if num < 10000: |
|
205
|
1 |
|
self.warnings.append("rule IDs below 10k are reserved ({0})".format(num)) |
|
206
|
1 |
|
except ValueError: |
|
207
|
|
|
self.error.append("id:{0} is not numeric".format(p_str)) |
|
208
|
1 |
|
return False |
|
209
|
|
|
if assign is True: |
|
210
|
|
|
self.sid = num |
|
211
|
|
|
return True |
|
212
|
|
|
|
|
213
|
|
|
def parse_rule(self, full_str): |
|
214
|
1 |
|
""" |
|
215
|
1 |
|
Parse and validate a full naxsi rule |
|
216
|
|
|
:param full_str: raw rule |
|
217
|
1 |
|
:return: [True|False, dict] |
|
218
|
|
|
""" |
|
219
|
|
|
self.warnings = list() |
|
220
|
|
|
self.error = list() |
|
221
|
|
|
|
|
222
|
1 |
|
func_map = {"id:": self.__validate_id, "str:": self.__validate_detection_str, |
|
223
|
1 |
|
"rx:": self.__validate_detection_rx, "msg:": self.__validate_msg, |
|
224
|
1 |
|
"mz:": self.__validate_matchzone, "negative": lambda p_str, assign=False: p_str == 'checked', |
|
225
|
|
|
"s:": self.__validate_score} |
|
226
|
1 |
|
|
|
227
|
1 |
|
lexer = shlex(full_str) |
|
228
|
1 |
|
lexer.whitespace_split = True |
|
229
|
1 |
|
split = list(iter(lexer.get_token, '')) |
|
230
|
1 |
|
|
|
231
|
|
|
intersection = set(split).intersection(set(self.mr_kw)) |
|
232
|
1 |
|
if not intersection: |
|
233
|
|
|
return self.__fail("No mainrule/basicrule keyword.") |
|
234
|
1 |
|
elif len(intersection) > 1: |
|
235
|
1 |
|
return self.__fail("Multiple mainrule/basicrule keywords.") |
|
236
|
|
|
|
|
237
|
1 |
|
split.remove(intersection.pop()) # remove the mainrule/basicrule keyword |
|
238
|
1 |
|
|
|
239
|
|
|
if ";" in split: |
|
240
|
1 |
|
split.remove(";") |
|
241
|
1 |
|
|
|
242
|
1 |
|
for keyword in split: |
|
243
|
1 |
|
keyword = keyword.strip() |
|
244
|
|
|
|
|
245
|
1 |
|
if keyword.endswith(";"): # remove semi-colons |
|
246
|
1 |
|
keyword = keyword[:-1] |
|
247
|
1 |
|
if keyword.startswith(('"', "'")) and (keyword[0] == keyword[-1]): # remove (double-)quotes |
|
248
|
1 |
|
keyword = keyword[1:-1] |
|
249
|
1 |
|
|
|
250
|
|
|
parsed = False |
|
251
|
1 |
|
for frag_kw in func_map: |
|
252
|
|
|
if keyword.startswith(frag_kw): # use the right parser |
|
253
|
1 |
|
if frag_kw in ('rx:', 'str:'): # don't remove the leading "str:" or "rx:" |
|
254
|
1 |
|
payload = keyword |
|
255
|
1 |
|
else: |
|
256
|
1 |
|
payload = keyword[len(frag_kw):] |
|
257
|
1 |
|
|
|
258
|
|
|
function = func_map[frag_kw] # we're using an array of functions, C style! |
|
259
|
1 |
|
if function(payload, assign=True) is True: |
|
260
|
1 |
|
parsed = True |
|
261
|
1 |
|
break |
|
262
|
|
|
return self.__fail("parsing of element '{0}' failed.".format(keyword)) |
|
263
|
|
|
|
|
264
|
|
|
if parsed is False: # we have an item that wasn't successfully parsed |
|
265
|
|
|
return self.__fail("'{}' is an invalid element and thus can not be parsed.".format(keyword)) |
|
266
|
|
|
return True |
|
267
|
|
|
|
Except handlers which only contain
passand do not have anelseclause can usually simply be removed: