1
|
1 |
|
from spike.model import db |
2
|
|
|
from shlex import shlex |
3
|
|
|
|
4
|
1 |
|
|
5
|
1 |
|
class NaxsiRules(db.Model): |
6
|
1 |
|
__bind_key__ = 'rules' |
7
|
|
|
__tablename__ = 'naxsi_rules' |
8
|
1 |
|
|
9
|
1 |
|
id = db.Column(db.Integer, primary_key=True) |
10
|
1 |
|
msg = db.Column(db.String(), nullable=False) |
11
|
1 |
|
detection = db.Column(db.String(1024), nullable=False) |
12
|
1 |
|
mz = db.Column(db.String(1024), nullable=False) |
13
|
1 |
|
score = db.Column(db.String(1024), nullable=False) |
14
|
1 |
|
sid = db.Column(db.Integer, nullable=False, unique=True) |
15
|
1 |
|
ruleset = db.Column(db.String(1024), nullable=False) |
16
|
1 |
|
rmks = db.Column(db.Text, nullable=True, server_default="") |
17
|
1 |
|
active = db.Column(db.Integer, nullable=False, server_default="1") |
18
|
1 |
|
negative = db.Column(db.Integer, nullable=False, server_default='0') |
19
|
|
|
timestamp = db.Column(db.Integer, nullable=False) |
20
|
1 |
|
|
21
|
1 |
|
warnings = [] |
22
|
1 |
|
error = [] |
23
|
1 |
|
mr_kw = ["MainRule", "BasicRule", "main_rule", "basic_rule"] |
24
|
1 |
|
static_mz = {"$ARGS_VAR", "$BODY_VAR", "$URL", "$HEADERS_VAR"} |
25
|
1 |
|
full_zones = {"ARGS", "BODY", "URL", "HEADERS", "FILE_EXT", "RAW_BODY"} |
26
|
1 |
|
rx_mz = {"$ARGS_VAR_X", "$BODY_VAR_X", "$URL_X", "$HEADERS_VAR_X"} |
27
|
1 |
|
sub_mz = list(static_mz) + list(full_zones) + list(rx_mz) |
28
|
1 |
|
|
29
|
1 |
|
def __init__(self, msg, detection, mz, score, sid, ruleset, rmks, active, negative, timestamp): |
|
|
|
|
30
|
1 |
|
self.msg = msg |
31
|
|
|
self.detection = detection |
32
|
|
|
self.mz = mz |
33
|
|
|
self.score = score |
34
|
|
|
self.sid = sid |
35
|
|
|
self.ruleset = ruleset |
36
|
|
|
self.rmks = rmks |
37
|
|
|
self.active = active |
38
|
|
|
self.negative = 1 if negative == 'checked' else 0 |
39
|
|
|
self.timestamp = timestamp |
40
|
|
|
|
41
|
|
|
def __str__(self): |
42
|
|
|
negate = 'negative' if self.negative == 1 else '' |
43
|
|
|
return 'MainRule {} "{}" "msg:{}" "mz:{}" "s:{}" id:{} ;'.format( |
44
|
|
|
negate, self.detection, self.msg, self.mz, self.score, self.sid) |
45
|
|
|
|
46
|
|
|
def validate(self): |
47
|
|
|
self.p_mz(self.mz) |
48
|
|
|
self.p_id(self.sid) |
49
|
|
|
self.p_detection(self.detection) |
50
|
|
|
|
51
|
|
|
if not len(self.msg): |
52
|
|
|
self.warnings.append("Rule has no 'msg:'.") |
53
|
|
|
if not len(self.score): |
54
|
|
|
self.error.append("Rule has no score.") |
55
|
|
|
|
56
|
|
|
def fail(self, msg): |
57
|
|
|
self.error.append(msg) |
58
|
|
|
return False |
59
|
|
|
|
60
|
|
|
# Bellow are parsers for specific parts of a rule |
61
|
|
|
def p_dummy(self, s, assign=False): |
|
|
|
|
62
|
|
|
return True |
63
|
|
|
|
64
|
|
|
def p_detection(self, s, assign=False): |
|
|
|
|
65
|
|
|
if not s.startswith("str:") and not s.startswith("rx:"): |
66
|
|
|
self.fail("detection {} is neither rx: or str:".format(s)) |
67
|
|
|
return False |
68
|
|
|
if not s.islower(): |
69
|
|
|
self.warnings.append("detection {} is not lower-case. naxsi is case-insensitive".format(s)) |
70
|
|
|
if assign is True: |
71
|
|
|
self.detection = s |
72
|
|
|
return True |
73
|
|
|
|
74
|
|
|
def p_genericstr(self, s, assign=False): |
|
|
|
|
75
|
|
|
if s and not s.islower(): |
76
|
|
|
self.warnings.append("Pattern ({0}) is not lower-case.".format(s)) |
77
|
|
|
return True |
78
|
|
|
|
79
|
|
|
def p_mz(self, s, assign=False): |
|
|
|
|
80
|
|
|
has_zone = False |
81
|
|
|
mz_state = set() |
82
|
|
|
locs = s.split('|') |
83
|
|
|
for loc in locs: |
84
|
|
|
kw = loc |
85
|
|
|
arg = None |
86
|
|
|
if loc.startswith("$"): |
87
|
|
|
try: |
88
|
|
|
kw, arg = loc.split(":") |
89
|
|
|
except ValueError: |
90
|
|
|
return self.fail("Missing 2nd part after ':' in {0}".format(loc)) |
91
|
|
|
# check it is a valid keyword |
92
|
|
|
if kw not in self.sub_mz: |
93
|
|
|
return self.fail("'{0}' no a known sub-part of mz : {1}".format(kw, self.sub_mz)) |
94
|
|
|
mz_state.add(kw) |
95
|
|
|
# verify the rule doesn't attempt to target REGEX and STATIC _VAR/URL at the same time |
96
|
|
|
if len(self.rx_mz & mz_state) and len(self.static_mz & mz_state): |
97
|
|
|
return self.fail("You can't mix static $* with regex $*_X ({})".format(str(mz_state))) |
98
|
|
|
# just a gentle reminder |
99
|
|
|
if arg and arg.islower() is False: |
100
|
|
|
self.warnings.append("{0} in {1} is not lowercase. naxsi is case-insensitive".format(arg, loc)) |
101
|
|
|
# the rule targets an actual zone |
102
|
|
|
if kw not in ["$URL", "$URL_X"] and kw in (self.rx_mz | self.full_zones | self.static_mz): |
|
|
|
|
103
|
|
|
has_zone = True |
104
|
|
|
if has_zone is False: |
105
|
|
|
return self.fail("The rule/whitelist doesn't target any zone.") |
106
|
|
|
if assign is True: |
107
|
|
|
self.mz = s |
108
|
|
|
return True |
109
|
|
|
|
110
|
|
|
def p_id(self, s, assign=False): |
|
|
|
|
111
|
|
|
try: |
112
|
|
|
x = int(s) |
|
|
|
|
113
|
|
|
if x < 10000: |
114
|
|
|
self.warnings.append("rule IDs below 10k are reserved ({0})".format(x)) |
115
|
|
|
except ValueError: |
116
|
|
|
self.error.append("id:{0} is not numeric".format(s)) |
117
|
|
|
return False |
118
|
|
|
if assign is True: |
119
|
|
|
self.sid = x |
120
|
|
|
return True |
121
|
|
|
|
122
|
|
|
def splitter(self, s): |
|
|
|
|
123
|
|
|
lexer = shlex(s) |
124
|
|
|
lexer.quotes = '"\'' |
125
|
|
|
lexer.whitespace_split = True |
126
|
|
|
items = list(iter(lexer.get_token, '')) |
127
|
|
|
return ([i for i in items if i[0] in "\"'"] + |
128
|
|
|
[i for i in items if i[0] not in "\"'"]) |
129
|
|
|
|
130
|
|
|
def parse_rule(self, x): |
|
|
|
|
131
|
|
|
""" |
132
|
|
|
Parse and validate a full naxsi rule |
133
|
|
|
:param x: raw rule |
134
|
|
|
:return: [True|False, dict] |
135
|
|
|
""" |
136
|
|
|
xfrag_kw = {"id:": self.p_id, "str:": self.p_genericstr, |
137
|
|
|
"rx:": self.p_genericstr, "msg:": self.p_dummy, "mz:": self.p_mz, |
138
|
|
|
"negative": self.p_dummy, "s:": self.p_dummy} |
139
|
|
|
# parse string |
140
|
|
|
split = self.splitter(x) |
141
|
|
|
# check if it's a MainRule/BasicRule, store&delete kw |
142
|
|
|
sect = set(self.mr_kw) & set(split) |
143
|
|
|
if len(sect) != 1: |
144
|
|
|
return self.fail("no (or multiple) mainrule/basicrule keyword.") |
145
|
|
|
split.remove(sect.pop()) |
146
|
|
|
if ";" in split: split.remove(";") |
|
|
|
|
147
|
|
|
# iterate while there is data, as handlers can defer |
148
|
|
|
while True: |
149
|
|
|
# we are done |
150
|
|
|
if len(split) == 0: |
151
|
|
|
break |
152
|
|
|
for kw in split: |
153
|
|
|
okw = kw |
154
|
|
|
kw = kw.strip() |
155
|
|
|
# clean-up quotes or semicolon |
156
|
|
|
if kw.endswith(";"): |
157
|
|
|
kw = kw[:-1] |
158
|
|
|
if kw.startswith(('"', "'")) and (kw[0] == kw[-1]): |
159
|
|
|
kw = kw[1:-1] |
160
|
|
|
for frag_kw in xfrag_kw: |
161
|
|
|
ret = False |
162
|
|
|
if kw.startswith(frag_kw): |
163
|
|
|
# parser funcs returns True/False |
164
|
|
|
ret = xfrag_kw[frag_kw](kw[len(frag_kw):]) |
165
|
|
|
if ret is False: |
166
|
|
|
return self.fail("parsing of element '{0}' failed.".format(kw)) |
167
|
|
|
if ret is True: |
168
|
|
|
split.remove(okw) |
169
|
|
|
break |
170
|
|
|
# we have an item that wasn't successfully parsed |
171
|
|
|
if okw in split and ret is not None: |
172
|
|
|
return False |
173
|
|
|
return True |
174
|
|
|
|