Completed
Push — master ( 365ab4...832d7e )
by -
01:39
created

NaxsiRules   F

Complexity

Total Complexity 68

Size/Duplication

Total Lines 248
Duplicated Lines 0 %

Test Coverage

Coverage 82.07%
Metric Value
dl 0
loc 248
ccs 151
cts 184
cp 0.8207
rs 2.9411
wmc 68

13 Methods

Rating   Name   Duplication   Size   Complexity  
A fullstr() 0 4 1
A __str__() 0 4 2
A __init__() 0 14 2
F explain() 0 39 11
B __validate_detection_rx() 0 16 5
B __validate_score() 0 12 6
F __validate_matchzone() 0 33 13
A splitter() 0 5 1
F parse_rule() 0 52 15
B validate() 0 15 5
A __validate_detection_str() 0 4 2
A __fail() 0 3 1
A __validate_id() 0 11 4

How to fix   Complexity   

Complex Class

Complex classes like NaxsiRules often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from time import strftime, localtime
2
3 1
from spike.model import db
4 1
from shlex import shlex
5
6
7 1
class NaxsiRules(db.Model):
8 1
    __bind_key__ = 'rules'
9 1
    __tablename__ = 'naxsi_rules'
10
11 1
    id = db.Column(db.Integer, primary_key=True)
12 1
    msg = db.Column(db.String(), nullable=False)
13 1
    detection = db.Column(db.String(1024), nullable=False)
14 1
    mz = db.Column(db.String(1024), nullable=False)
15 1
    score = db.Column(db.String(1024), nullable=False)
16 1
    sid = db.Column(db.Integer, nullable=False, unique=True)
17 1
    ruleset = db.Column(db.String(1024), nullable=False)
18 1
    rmks = db.Column(db.Text, nullable=True, server_default="")
19 1
    active = db.Column(db.Integer, nullable=False, server_default="1")
20 1
    negative = db.Column(db.Integer, nullable=False, server_default='0')
21 1
    timestamp = db.Column(db.Integer, nullable=False)
22
23 1
    mr_kw = ["MainRule", "BasicRule", "main_rule", "basic_rule"]
24 1
    static_mz = {"$ARGS_VAR", "$BODY_VAR", "$URL", "$HEADERS_VAR"}
25 1
    full_zones = {"ARGS", "BODY", "URL", "HEADERS", "FILE_EXT", "RAW_BODY"}
26 1
    rx_mz = {"$ARGS_VAR_X", "$BODY_VAR_X", "$URL_X", "$HEADERS_VAR_X"}
27 1
    sub_mz = list(static_mz) + list(full_zones) + list(rx_mz)
28
29 1
    def __init__(self, msg="", detection="", mz="", score="", sid=42000, ruleset="", rmks="", active=0, negative=0,
30
                 timestamp=0):
31 1
        self.msg = msg
32 1
        self.detection = detection
33 1
        self.mz = mz
34 1
        self.score = score
35 1
        self.sid = sid
36 1
        self.ruleset = ruleset
37 1
        self.rmks = rmks
38 1
        self.active = active
39 1
        self.negative = 1 if negative == 'checked' else 0
40 1
        self.timestamp = timestamp
41 1
        self.warnings = []
42 1
        self.error = []
43
44 1
    def fullstr(self):
45 1
        rdate = strftime("%F - %H:%M", localtime(float(str(self.timestamp))))
46 1
        rmks = "# ".join(self.rmks.strip().split("\n"))
47 1
        return "#\n# sid: {0} | date: {1}\n#\n# {2}\n#\n{3}".format(self.sid, rdate, rmks, self.__str__())
48
49 1
    def __str__(self):
50 1
        negate = 'negative' if self.negative == 1 else ''
51 1
        return 'MainRule {} "{}" "msg:{}" "mz:{}" "s:{}" id:{} ;'.format(
52
            negate, self.detection, self.msg, self.mz, self.score, self.sid)
53
54 1
    def explain(self):
55
        """ Return a string explaining the rule
56
57
         :return str: A textual explanation of the rule
58
         """
59 1
        translation = {'ARGS': 'argument', 'BODY': 'body', 'URL': 'url', 'HEADER': 'header'}
60 1
        explanation = 'The rule number <strong>%d</strong> is ' % self.sid
61 1
        if self.negative:
62
            explanation += '<strong>not</strong> '
63 1
        explanation += 'setting the '
64
65 1
        scores = []
66 1
        for score in self.score.split(','):
67 1
            scores.append('<strong>{0}</strong> to <strong>{1}</strong> '.format(*score.split(':', 1)))
68 1
        explanation += ', '.join(scores) + 'when it '
69 1
        if self.detection.startswith('str:'):
70 1
            explanation += 'finds the string <strong>{}</strong> '.format(self.detection[4:])
71
        else:
72
            explanation += 'matches the regexp <strong>{}</strong> '.format(self.detection[3:])
73
74 1
        zones = []
75 1
        for mz in self.mz.split('|'):
76 1
            if mz.startswith('$'):
77
                current_zone, arg = mz.split(":", 1)
78
                zone_name = "?"
79
80
                for translated_name in translation:  # translate zone names
81
                    if translated_name in current_zone:
82
                        zone_name = translation[translated_name]
83
84
                if "$URL" in current_zone:
85
                    regexp = "matching regex" if current_zone == "$URL_X" else ""
86
                    explanation += "on the URL {} '{}' ".format(regexp, arg)
87
                else:
88
                    regexp = "matching regex" if current_zone.endswith("_X") else ""
89
                    explanation += "in the var with name {} '{}' of {} ".format(regexp, arg, zone_name)
90
            else:
91 1
                zones.append('the <strong>{0}</strong>'.format(translation[mz]))
92 1
        return explanation
93
94 1
    def validate(self):
95 1
        self.__validate_matchzone(self.mz)
96 1
        self.__validate_id(self.sid)
97
98 1
        if self.detection.startswith('rx:'):
99
            self.__validate_detection_rx(self.detection)
100 1
        elif self.detection.startswith('str:'):
101 1
            self.__validate_detection_str(self.detection)
102
        else:
103 1
            self.error.append("Your 'detection' string must start with str: or rx:")
104
105 1
        if not self.msg:
106
            self.warnings.append("Rule has no 'msg:'.")
107 1
        if not self.score:
108
            self.error.append("Rule has no score.")
109
110 1
    def __fail(self, msg):
111 1
        self.error.append(msg)
112 1
        return False
113
114
    # Bellow are parsers for specific parts of a rule
115
116 1
    def __validate_detection_str(self, p_str, assign=False):
117 1
        if assign is True:
118
            self.detection += p_str
119 1
        return True
120
121 1
    def __validate_detection_rx(self, p_str, assign=False):
122 1
        if not p_str.islower():
123
            self.warnings.append("detection {} is not lower-case. naxsi is case-insensitive".format(p_str))
124
125 1
        try:  # try to validate the regex with PCRE's python bindings
126 1
            import pcre
127
            try:  # if we can't compile the regex, it's likely invalid
128
                pcre.compile(p_str[3:])
129
            except pcre.PCREError:
130
                return self.__fail("{} is not a valid regex:".format(p_str))
131 1
        except ImportError:  # python-pcre is an optional dependency
0 ignored issues
show
Unused Code introduced by
This except handler seems to be unused and could be removed.

Except handlers which only contain pass and do not have an else clause can usually simply be removed:

try:
    raises_exception()
except:  # Could be removed
    pass
Loading history...
132 1
            pass
133
134 1
        if assign is True:
135 1
            self.detection += p_str
136 1
        return True
137
138 1
    def __validate_score(self, p_str, assign=False):
139 1
        for score in p_str.split(','):
140 1
            if ':' not in score:
141
                self.__fail("You score '{}' has no value or name.".format(score))
142 1
            name, value = score.split(':')
143 1
            if not value.isdigit():
144
                self.__fail("Your value '{}' for your score '{}' is not numeric.".format(value, score))
145 1
            elif not name.startswith('$'):
146
                self.__fail("Your name '{}' for your score '{}' does not start with a '$'.".format(name, score))
147 1
        if assign:
148 1
            self.score = p_str
149 1
        return True
150
151 1
    def __validate_matchzone(self, p_str, assign=False):
152 1
        has_zone = False
153 1
        mz_state = set()
154 1
        for loc in p_str.split('|'):
155 1
            keyword, arg = loc, None
156 1
            if loc.startswith("$"):
157 1
                if loc.find(":") == -1:
158
                    return self.__fail("Missing 2nd part after ':' in {0}".format(loc))
159 1
                keyword, arg = loc.split(":")
160
161 1
            if keyword not in self.sub_mz:  # check if `keyword` is a valid keyword
162
                return self.__fail("'{0}' no a known sub-part of mz : {1}".format(keyword, self.sub_mz))
163
164 1
            mz_state.add(keyword)
165
166
            # verify that the rule doesn't attempt to target REGEX and STATIC _VAR/URL at the same time
167 1
            if len(self.rx_mz & mz_state) and len(self.static_mz & mz_state):
168 1
                return self.__fail("You can't mix static $* with regex $*_X ({})".format(', '.join(mz_state)))
169
170 1
            if arg and not arg.islower():  # just a gentle reminder
171 1
                self.warnings.append("{0} in {1} is not lowercase. naxsi is case-insensitive".format(arg, loc))
172
173
            # the rule targets an actual zone
174 1
            if keyword not in ["$URL", "$URL_X"] and keyword in (self.rx_mz | self.full_zones | self.static_mz):
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after in.
Loading history...
175 1
                has_zone = True
176
177 1
        if has_zone is False:
178
            return self.__fail("The rule/whitelist doesn't target any zone.")
179
180 1
        if assign is True:
181 1
            self.mz = p_str
182
183 1
        return True
184
185 1
    def __validate_id(self, p_str, assign=False):
186 1
        try:
187 1
            num = int(p_str)
188 1
            if num < 10000:
189 1
                self.warnings.append("rule IDs below 10k are reserved ({0})".format(num))
190
        except ValueError:
191
            self.error.append("id:{0} is not numeric".format(p_str))
192
            return False
193 1
        if assign is True:
194 1
            self.sid = num
195 1
        return True
196
197 1
    @staticmethod
198
    def splitter(full_str):
199 1
        lexer = shlex(full_str)
200 1
        lexer.whitespace_split = True
201 1
        return list(iter(lexer.get_token, ''))
202
203 1
    def parse_rule(self, full_str):
204
        """
205
        Parse and validate a full naxsi rule
206
        :param full_str: raw rule
207
        :return: [True|False, dict]
208
        """
209 1
        self.warnings = []
210 1
        self.error = []
211
212 1
        func_map = {"id:": self.__validate_id, "str:": self.__validate_detection_str,
213
                    "rx:": self.__validate_detection_rx, "msg:": lambda p_str, assign=False: True,
214
                    "mz:": self.__validate_matchzone, "negative": lambda p_str, assign=False: True,
215
                    "s:": self.__validate_score}
216
217 1
        split = self.splitter(full_str)  # parse string
218 1
        intersection = set(split).intersection(set(self.mr_kw))
219
220 1
        if not intersection:
221
            return self.__fail("No mainrule/basicrule keyword.")
222 1
        elif len(intersection) > 1:
223
            return self.__fail("Multiple mainrule/basicrule keywords.")
224
225 1
        split.remove(intersection.pop())  # remove the mainrule/basicrule keyword
226
227 1
        if ";" in split:
228 1
            split.remove(";")
229
230 1
        while split:  # iterate while there is data, as handlers can defer
231 1
            for keyword in split:
232 1
                orig_kw = keyword
233 1
                keyword = keyword.strip()
234
235 1
                if keyword.endswith(";"):  # remove semi-colons
236 1
                    keyword = keyword[:-1]
237 1
                if keyword.startswith(('"', "'")) and (keyword[0] == keyword[-1]):  # remove (double-)quotes
238 1
                    keyword = keyword[1:-1]
239
240 1
                parsed = False
241 1
                for frag_kw in func_map:
242 1
                    if keyword.startswith(frag_kw):  # use the right parser
243 1
                        function = func_map[frag_kw]
244 1
                        payload = keyword[len(frag_kw):]
245
246 1
                        if function(payload, assign=True) is True:
247 1
                            parsed = True
248 1
                            split.remove(orig_kw)
249 1
                            break
250 1
                        return self.__fail("parsing of element '{0}' failed.".format(keyword))
251
252 1
                if parsed is False:  # we have an item that wasn't successfully parsed
253
                    return self.__fail("'{}' is an invalid element and thus can not be parsed.".format(keyword))
254
        return True
255