1
|
1 |
|
from time import strftime, localtime |
2
|
1 |
|
import logging |
|
|
|
|
3
|
|
|
|
4
|
1 |
|
from spike.model import db |
5
|
1 |
|
from shlex import shlex |
6
|
|
|
|
7
|
|
|
|
8
|
1 |
|
class NaxsiRules(db.Model): |
9
|
1 |
|
__bind_key__ = 'rules' |
10
|
1 |
|
__tablename__ = 'naxsi_rules' |
11
|
|
|
|
12
|
1 |
|
id = db.Column(db.Integer, primary_key=True) |
13
|
1 |
|
msg = db.Column(db.String(), nullable=False) |
14
|
1 |
|
detection = db.Column(db.String(1024), nullable=False) |
15
|
1 |
|
mz = db.Column(db.String(1024), nullable=False) |
16
|
1 |
|
score = db.Column(db.String(1024), nullable=False) |
17
|
1 |
|
sid = db.Column(db.Integer, nullable=False, unique=True) |
18
|
1 |
|
ruleset = db.Column(db.String(1024), nullable=False) |
19
|
1 |
|
rmks = db.Column(db.Text, nullable=True, server_default="") |
20
|
1 |
|
active = db.Column(db.Integer, nullable=False, server_default="1") |
21
|
1 |
|
negative = db.Column(db.Integer, nullable=False, server_default='0') |
22
|
1 |
|
timestamp = db.Column(db.Integer, nullable=False) |
23
|
|
|
|
24
|
1 |
|
mr_kw = ["MainRule", "BasicRule", "main_rule", "basic_rule"] |
25
|
1 |
|
static_mz = {"$ARGS_VAR", "$BODY_VAR", "$URL", "$HEADERS_VAR"} |
26
|
1 |
|
full_zones = {"ARGS", "BODY", "URL", "HEADERS", "FILE_EXT", "RAW_BODY"} |
27
|
1 |
|
rx_mz = {"$ARGS_VAR_X", "$BODY_VAR_X", "$URL_X", "$HEADERS_VAR_X"} |
28
|
1 |
|
sub_mz = list(static_mz) + list(full_zones) + list(rx_mz) |
29
|
|
|
|
30
|
1 |
|
def __init__(self, msg="", detection="", mz="", score="", sid=42000, ruleset="", rmks="", active=0, negative=0, timestamp=0): |
|
|
|
|
31
|
1 |
|
self.msg = msg |
32
|
1 |
|
self.detection = detection |
33
|
1 |
|
self.mz = mz |
34
|
1 |
|
self.score = score |
35
|
1 |
|
self.sid = sid |
36
|
1 |
|
self.ruleset = ruleset |
37
|
1 |
|
self.rmks = rmks |
38
|
1 |
|
self.active = active |
39
|
1 |
|
self.negative = 1 if negative == 'checked' else 0 |
40
|
1 |
|
self.timestamp = timestamp |
41
|
1 |
|
self.warnings = [] |
42
|
1 |
|
self.error = [] |
43
|
|
|
|
44
|
1 |
|
def fullstr(self): |
45
|
1 |
|
rdate = strftime("%F - %H:%M", localtime(float(str(self.timestamp)))) |
46
|
1 |
|
rmks = "# ".join(self.rmks.strip().split("\n")) |
47
|
1 |
|
return "#\n# sid: {0} | date: {1}\n#\n# {2}\n#\n{3}".format(self.sid, rdate, rmks, self.__str__()) |
48
|
|
|
|
49
|
1 |
|
def __str__(self): |
50
|
1 |
|
negate = 'negative' if self.negative == 1 else '' |
51
|
1 |
|
return 'MainRule {} "{}" "msg:{}" "mz:{}" "s:{}" id:{} ;'.format( |
52
|
|
|
negate, self.detection, self.msg, self.mz, self.score, self.sid) |
53
|
|
|
|
54
|
1 |
|
def explanation(self): |
55
|
|
|
""" Return a string explainign a rule """ |
56
|
1 |
|
assoc = {'ARGS': 'argument', 'BODY': 'body', 'URL': 'url', 'HEADER': 'header'} |
57
|
1 |
|
expl = 'The rule number <strong>%d</strong> is ' % self.sid |
58
|
1 |
|
if self.negative: |
59
|
|
|
expl += '<strong>not</strong> ' |
60
|
1 |
|
expl += 'setting the ' |
61
|
1 |
|
scores = [] |
62
|
1 |
|
for score in self.score.split(','): |
63
|
1 |
|
scores.append('<strong>{0}</strong> to <strong>{1}</strong> '.format(*score.split(':', 3))) |
64
|
1 |
|
expl += ', '.join(scores) + 'when it ' |
65
|
1 |
|
if self.detection.startswith('str:'): |
66
|
1 |
|
expl += 'finds the string <strong>{}</strong> '.format(self.detection[4:]) |
67
|
|
|
else: |
68
|
|
|
expl += 'matches the regexp <strong>{}</strong> '.format(self.detection[3:]) |
69
|
1 |
|
|
70
|
1 |
|
zones = [] |
71
|
1 |
|
for mz in self.mz.split('|'): |
72
|
1 |
|
if mz.startswith('$'): |
73
|
|
|
zone,arg = mz.split(":") |
|
|
|
|
74
|
|
|
for tmpzone in assoc: |
75
|
|
|
if tmpzone in zone: |
76
|
|
|
zone_name = assoc[tmpzone] |
77
|
|
|
if "$URL" in zone: |
78
|
1 |
|
expl += "on the URL {} '{}' ".format("matching regex" if zone == "$URL_X" else "", |
79
|
1 |
|
arg) |
80
|
|
|
else: |
81
|
1 |
|
expl += "in the var with name {} '{}' of {} ".format("matching regex" if zone.endswith("_X") else "", |
|
|
|
|
82
|
1 |
|
arg, zone_name) |
83
|
1 |
|
else: |
84
|
1 |
|
zones.append('the <strong>{0}</strong>'.format(assoc[mz])) |
85
|
|
|
return expl |
86
|
1 |
|
|
87
|
|
|
def validate(self): |
88
|
1 |
|
self.__validate_matchzone(self.mz) |
89
|
|
|
self.__validate_id(self.sid) |
90
|
|
|
self.__validate_detection(self.detection) |
91
|
1 |
|
|
92
|
1 |
|
if not self.msg: |
93
|
1 |
|
self.warnings.append("Rule has no 'msg:'.") |
94
|
|
|
if not self.score: |
95
|
|
|
self.error.append("Rule has no score.") |
96
|
1 |
|
|
97
|
1 |
|
def __fail(self, msg): |
98
|
|
|
self.error.append(msg) |
99
|
1 |
|
return False |
100
|
1 |
|
|
101
|
1 |
|
# Bellow are parsers for specific parts of a rule |
102
|
1 |
|
|
103
|
1 |
|
def __validate_detection(self, p_str, label="", assign=False): |
104
|
1 |
|
p_str = label + p_str |
105
|
|
|
if not p_str.islower(): |
106
|
1 |
|
self.warnings.append("detection {} is not lower-case. naxsi is case-insensitive".format(p_str)) |
107
|
|
|
if assign is False: |
108
|
1 |
|
return True |
109
|
1 |
|
if p_str.startswith("str:") or p_str.startswith("rx:"): |
110
|
|
|
self.detection = p_str |
111
|
1 |
|
else: |
112
|
|
|
return self.__fail("detection {} is neither rx: or str:".format(p_str)) |
113
|
1 |
|
|
114
|
1 |
|
return True |
115
|
1 |
|
|
116
|
1 |
|
def __validate_genericstr(self, p_str, label="", assign=False): |
117
|
1 |
|
if assign is False: |
118
|
1 |
|
return True |
119
|
1 |
|
if label == "s:": |
120
|
1 |
|
self.score = p_str |
121
|
|
|
elif label == "msg:": |
122
|
1 |
|
self.msg = p_str |
123
|
|
|
elif label == "negative": |
124
|
1 |
|
self.negative = 1 |
125
|
|
|
elif label != "": |
126
|
1 |
|
return self.__fail("Unknown fragment {}".format(label+p_str)) |
127
|
|
|
return True |
128
|
1 |
|
|
129
|
1 |
|
def __validate_matchzone(self, p_str, label="", assign=False): |
|
|
|
|
130
|
|
|
has_zone = False |
131
|
1 |
|
mz_state = set() |
132
|
1 |
|
locs = p_str.split('|') |
133
|
|
|
for loc in locs: |
134
|
1 |
|
keyword, arg = loc, None |
135
|
1 |
|
if loc.startswith("$"): |
136
|
1 |
|
if loc.find(":") == -1: |
137
|
|
|
return self.__fail("Missing 2nd part after ':' in {0}".format(loc)) |
138
|
1 |
|
keyword, arg = loc.split(":") |
139
|
1 |
|
# check it is a valid keyword |
140
|
1 |
|
if keyword not in self.sub_mz: |
141
|
|
|
return self.__fail("'{0}' no a known sub-part of mz : {1}".format(keyword, self.sub_mz)) |
142
|
1 |
|
mz_state.add(keyword) |
143
|
1 |
|
# verify the rule doesn't attempt to target REGEX and STATIC _VAR/URL at the same time |
144
|
1 |
|
if len(self.rx_mz & mz_state) and len(self.static_mz & mz_state): |
145
|
1 |
|
return self.__fail("You can't mix static $* with regex $*_X ({})".format(', '.join(mz_state))) |
146
|
1 |
|
# just a gentle reminder |
147
|
|
|
if arg and arg.islower() is False: |
148
|
|
|
self.warnings.append("{0} in {1} is not lowercase. naxsi is case-insensitive".format(arg, loc)) |
149
|
|
|
# the rule targets an actual zone |
150
|
1 |
|
if keyword not in ["$URL", "$URL_X"] and keyword in (self.rx_mz | self.full_zones | self.static_mz): |
|
|
|
|
151
|
1 |
|
has_zone = True |
152
|
1 |
|
if has_zone is False: |
153
|
|
|
return self.__fail("The rule/whitelist doesn't target any zone.") |
154
|
1 |
|
if assign is True: |
155
|
|
|
self.mz = p_str |
156
|
1 |
|
return True |
157
|
1 |
|
|
158
|
1 |
|
def __validate_id(self, p_str, label="", assign=False): |
|
|
|
|
159
|
1 |
|
try: |
160
|
|
|
num = int(p_str) |
161
|
1 |
|
if num < 10000: |
162
|
|
|
self.warnings.append("rule IDs below 10k are reserved ({0})".format(num)) |
163
|
|
|
except ValueError: |
164
|
|
|
self.error.append("id:{0} is not numeric".format(p_str)) |
165
|
|
|
return False |
166
|
|
|
if assign is True: |
167
|
1 |
|
self.sid = num |
168
|
1 |
|
return True |
169
|
|
|
|
170
|
1 |
|
@staticmethod |
171
|
|
|
def splitter(s): |
|
|
|
|
172
|
|
|
lexer = shlex(s) |
173
|
1 |
|
lexer.whitespace_split = True |
174
|
1 |
|
items = list(iter(lexer.get_token, '')) |
175
|
1 |
|
return items |
176
|
|
|
|
177
|
1 |
|
def parse_rule(self, full_str): |
178
|
|
|
""" |
179
|
1 |
|
Parse and validate a full naxsi rule |
180
|
|
|
:param full_str: raw rule |
181
|
|
|
:return: [True|False, dict] |
182
|
1 |
|
""" |
183
|
|
|
self.warnings = [] |
184
|
1 |
|
self.error = [] |
185
|
1 |
|
|
186
|
|
|
func_map = {"id:": self.__validate_id, "str:": self.__validate_detection, |
187
|
1 |
|
"rx:": self.__validate_detection, "msg:": self.__validate_genericstr, "mz:": self.__validate_matchzone, |
|
|
|
|
188
|
1 |
|
"negative": self.__validate_genericstr, "s:": self.__validate_genericstr} |
189
|
1 |
|
ret = False |
190
|
1 |
|
split = self.splitter(full_str) # parse string |
191
|
|
|
intersection = set(split).intersection(set(self.mr_kw)) |
192
|
1 |
|
|
193
|
1 |
|
if not intersection: |
194
|
1 |
|
return self.__fail("No mainrule/basicrule keyword.") |
195
|
1 |
|
elif len(intersection) > 1: |
196
|
1 |
|
return self.__fail("Multiple mainrule/basicrule keywords.") |
197
|
1 |
|
|
198
|
1 |
|
split.remove(intersection.pop()) # remove the mainrule/basicrule keyword |
199
|
|
|
|
200
|
1 |
|
if ";" in split: |
201
|
1 |
|
split.remove(";") |
202
|
1 |
|
|
203
|
|
|
while split: # iterate while there is data, as handlers can defer |
204
|
1 |
|
for keyword in split: |
205
|
1 |
|
orig_kw = keyword |
206
|
|
|
keyword = keyword.strip() |
207
|
1 |
|
|
208
|
|
|
if keyword.endswith(";"): # remove semi-colons |
209
|
1 |
|
keyword = keyword[:-1] |
210
|
|
|
if keyword.startswith(('"', "'")) and (keyword[0] == keyword[-1]): # remove (double-)quotes |
211
|
|
|
keyword = keyword[1:-1] |
212
|
|
|
for frag_kw in func_map: |
213
|
|
|
ret = False |
214
|
|
|
if keyword.startswith(frag_kw): |
215
|
|
|
# parser funcs returns True/False |
216
|
|
|
ret = func_map[frag_kw](keyword[len(frag_kw):], label=frag_kw, assign=True) |
217
|
|
|
if ret is True: |
218
|
|
|
split.remove(orig_kw) |
219
|
|
|
else: |
220
|
|
|
return self.__fail("parsing of element '{0}' failed.".format(keyword)) |
221
|
|
|
break |
222
|
|
|
# we have an item that wasn't successfully parsed |
223
|
|
|
if orig_kw in split and ret is not None: |
224
|
|
|
return False |
225
|
|
|
return True |
226
|
|
|
|