Completed
Push — master ( 77348d...b5200a )
by -
01:45
created

NaxsiRules   F

Complexity

Total Complexity 69

Size/Duplication

Total Lines 255
Duplicated Lines 0 %

Test Coverage

Coverage 88.36%
Metric Value
dl 0
loc 255
ccs 167
cts 189
cp 0.8836
rs 2.8301
wmc 69

13 Methods

Rating   Name   Duplication   Size   Complexity  
A fullstr() 0 4 1
A __str__() 0 4 2
A __init__() 0 14 2
B __validate_detection_rx() 0 16 5
B __validate_score() 0 12 6
F explain() 0 39 11
F __validate_matchzone() 0 33 13
A splitter() 0 5 1
F parse_rule() 0 55 16
B validate() 0 19 5
A __validate_detection_str() 0 4 2
A __fail() 0 3 1
A __validate_id() 0 11 4

How to fix   Complexity   

Complex Class

Complex classes like NaxsiRules often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from time import strftime, localtime
2
3 1
from spike.model import db
4 1
from shlex import shlex
5
6
7 1
class NaxsiRules(db.Model):
8 1
    __bind_key__ = 'rules'
9 1
    __tablename__ = 'naxsi_rules'
10
11 1
    id = db.Column(db.Integer, primary_key=True)
12 1
    msg = db.Column(db.String(), nullable=False)
13 1
    detection = db.Column(db.String(1024), nullable=False)
14 1
    mz = db.Column(db.String(1024), nullable=False)
15 1
    score = db.Column(db.String(1024), nullable=False)
16 1
    sid = db.Column(db.Integer, nullable=False, unique=True)
17 1
    ruleset = db.Column(db.String(1024), nullable=False)
18 1
    rmks = db.Column(db.Text, nullable=True, server_default="")
19 1
    active = db.Column(db.Integer, nullable=False, server_default="1")
20 1
    negative = db.Column(db.Integer, nullable=False, server_default='0')
21 1
    timestamp = db.Column(db.Integer, nullable=False)
22
23 1
    mr_kw = ["MainRule", "BasicRule", "main_rule", "basic_rule"]
24 1
    static_mz = {"$ARGS_VAR", "$BODY_VAR", "$URL", "$HEADERS_VAR"}
25 1
    full_zones = {"ARGS", "BODY", "URL", "HEADERS", "FILE_EXT", "RAW_BODY"}
26 1
    rx_mz = {"$ARGS_VAR_X", "$BODY_VAR_X", "$URL_X", "$HEADERS_VAR_X"}
27 1
    sub_mz = list(static_mz) + list(full_zones) + list(rx_mz)
28
29 1
    def __init__(self, msg="", detection="", mz="", score="", sid=42000, ruleset="", rmks="", active=0, negative=0,
30
                 timestamp=0):
31 1
        self.msg = msg
32 1
        self.detection = detection
33 1
        self.mz = mz
34 1
        self.score = score
35 1
        self.sid = sid
36 1
        self.ruleset = ruleset
37 1
        self.rmks = rmks
38 1
        self.active = active
39 1
        self.negative = 1 if negative == 'checked' else 0
40 1
        self.timestamp = timestamp
41 1
        self.warnings = []
42 1
        self.error = []
43
44 1
    def fullstr(self):
45 1
        rdate = strftime("%F - %H:%M", localtime(float(str(self.timestamp))))
46 1
        rmks = "# ".join(self.rmks.strip().split("\n"))
47 1
        return "#\n# sid: {0} | date: {1}\n#\n# {2}\n#\n{3}".format(self.sid, rdate, rmks, self.__str__())
48
49 1
    def __str__(self):
50 1
        negate = 'negative' if self.negative == 1 else ''
51 1
        return 'MainRule {} "{}" "msg:{}" "mz:{}" "s:{}" id:{} ;'.format(
52
            negate, self.detection, self.msg, self.mz, self.score, self.sid)
53
54 1
    def explain(self):
55
        """ Return a string explaining the rule
56
57
         :return str: A textual explanation of the rule
58
         """
59 1
        translation = {'ARGS': 'argument', 'BODY': 'body', 'URL': 'url', 'HEADER': 'header'}
60 1
        explanation = 'The rule number <strong>{0}</strong> is '.format(self.sid)
61 1
        if self.negative:
62
            explanation += '<strong>not</strong> '
63 1
        explanation += 'setting the '
64
65 1
        scores = []
66 1
        for score in self.score.split(','):
67 1
            scores.append('<strong>{0}</strong> to <strong>{1}</strong> '.format(*score.split(':', 1)))
68 1
        explanation += ', '.join(scores) + 'when it '
69 1
        if self.detection.startswith('str:'):
70 1
            explanation += 'finds the string <strong>{}</strong> '.format(self.detection[4:])
71
        else:
72 1
            explanation += 'matches the regexp <strong>{}</strong> '.format(self.detection[3:])
73
74 1
        zones = []
75 1
        for mz in self.mz.split('|'):
76 1
            if mz.startswith('$'):
77 1
                current_zone, arg = mz.split(":", 1)
78 1
                zone_name = "?"
79
80 1
                for translated_name in translation:  # translate zone names
81 1
                    if translated_name in current_zone:
82 1
                        zone_name = translation[translated_name]
83
84 1
                if "$URL" in current_zone:
85
                    regexp = "matching regex" if current_zone == "$URL_X" else ""
86
                    explanation += "on the URL {} '{}' ".format(regexp, arg)
87
                else:
88 1
                    regexp = "matching regex" if current_zone.endswith("_X") else ""
89 1
                    explanation += "in the var with name {} '{}' of {} ".format(regexp, arg, zone_name)
90
            else:
91 1
                zones.append('the <strong>{0}</strong>'.format(translation[mz]))
92 1
        return explanation
93
94 1
    def validate(self):
95 1
        self.warnings = list()
96 1
        self.error = list()
97
98 1
        self.__validate_matchzone(self.mz)
99 1
        self.__validate_id(self.sid)
100 1
        self.__validate_score(self.score)
101
102 1
        if self.detection.startswith('rx:'):
103 1
            self.__validate_detection_rx(self.detection)
104 1
        elif self.detection.startswith('str:'):
105 1
            self.__validate_detection_str(self.detection)
106
        else:
107 1
            self.error.append("Your 'detection' string must start with str: or rx:")
108
109 1
        if not self.msg:
110
            self.warnings.append("Rule has no 'msg:'.")
111 1
        if not self.score:
112
            self.error.append("Rule has no score.")
113
114 1
    def __fail(self, msg):
115 1
        self.error.append(msg)
116 1
        return False
117
118
    # Bellow are parsers for specific parts of a rule
119
120 1
    def __validate_detection_str(self, p_str, assign=False):
121 1
        if assign is True:
122
            self.detection = p_str
123 1
        return True
124
125 1
    def __validate_detection_rx(self, p_str, assign=False):
126 1
        if not p_str.islower():
127 1
            self.warnings.append("detection {} is not lower-case. naxsi is case-insensitive".format(p_str))
128
129 1
        try:  # try to validate the regex with PCRE's python bindings
130 1
            import pcre
131
            try:  # if we can't compile the regex, it's likely invalid
132
                pcre.compile(p_str[3:])
133
            except pcre.PCREError:
134
                return self.__fail("{} is not a valid regex:".format(p_str))
135 1
        except ImportError:  # python-pcre is an optional dependency
0 ignored issues
show
Unused Code introduced by
This except handler seems to be unused and could be removed.

Except handlers which only contain pass and do not have an else clause can usually simply be removed:

try:
    raises_exception()
except:  # Could be removed
    pass
Loading history...
136 1
            pass
137
138 1
        if assign is True:
139 1
            self.detection = p_str
140 1
        return True
141
142 1
    def __validate_score(self, p_str, assign=False):
143 1
        for score in p_str.split(','):
144 1
            if ':' not in score:
145
                self.__fail("You score '{}' has no value or name.".format(score))
146 1
            name, value = score.split(':')
147 1
            if not value.isdigit():
148
                self.__fail("Your value '{}' for your score '{}' is not numeric.".format(value, score))
149 1
            elif not name.startswith('$'):
150
                self.__fail("Your name '{}' for your score '{}' does not start with a '$'.".format(name, score))
151 1
        if assign:
152 1
            self.score = p_str
153 1
        return True
154
155 1
    def __validate_matchzone(self, p_str, assign=False):
156 1
        has_zone = False
157 1
        mz_state = set()
158 1
        for loc in p_str.split('|'):
159 1
            keyword, arg = loc, None
160 1
            if loc.startswith("$"):
161 1
                if loc.find(":") == -1:
162
                    return self.__fail("Missing 2nd part after ':' in {0}".format(loc))
163 1
                keyword, arg = loc.split(":")
164
165 1
            if keyword not in self.sub_mz:  # check if `keyword` is a valid keyword
166
                return self.__fail("'{0}' no a known sub-part of mz : {1}".format(keyword, self.sub_mz))
167
168 1
            mz_state.add(keyword)
169
170
            # verify that the rule doesn't attempt to target REGEX and STATIC _VAR/URL at the same time
171 1
            if len(self.rx_mz & mz_state) and len(self.static_mz & mz_state):
172 1
                return self.__fail("You can't mix static $* with regex $*_X ({})".format(', '.join(mz_state)))
173
174 1
            if arg and not arg.islower():  # just a gentle reminder
175 1
                self.warnings.append("{0} in {1} is not lowercase. naxsi is case-insensitive".format(arg, loc))
176
177
            # the rule targets an actual zone
178 1
            if keyword not in ["$URL", "$URL_X"] and keyword in (self.rx_mz | self.full_zones | self.static_mz):
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after in.
Loading history...
179 1
                has_zone = True
180
181 1
        if has_zone is False:
182
            return self.__fail("The rule/whitelist doesn't target any zone.")
183
184 1
        if assign is True:
185 1
            self.mz = p_str
186
187 1
        return True
188
189 1
    def __validate_id(self, p_str, assign=False):
190 1
        try:
191 1
            num = int(p_str)
192 1
            if num < 10000:
193 1
                self.warnings.append("rule IDs below 10k are reserved ({0})".format(num))
194
        except ValueError:
195
            self.error.append("id:{0} is not numeric".format(p_str))
196
            return False
197 1
        if assign is True:
198 1
            self.sid = num
199 1
        return True
200
201 1
    @staticmethod
202
    def splitter(full_str):
203 1
        lexer = shlex(full_str)
204 1
        lexer.whitespace_split = True
205 1
        return list(iter(lexer.get_token, ''))
206
207 1
    def parse_rule(self, full_str):
208
        """
209
        Parse and validate a full naxsi rule
210
        :param full_str: raw rule
211
        :return: [True|False, dict]
212
        """
213 1
        self.warnings = list()
214 1
        self.error = list()
215
216 1
        func_map = {"id:": self.__validate_id, "str:": self.__validate_detection_str,
217
                    "rx:": self.__validate_detection_rx, "msg:": lambda p_str, assign=False: True,
218
                    "mz:": self.__validate_matchzone, "negative": lambda p_str, assign=False: True,
219
                    "s:": self.__validate_score}
220
221 1
        split = self.splitter(full_str)  # parse string
222 1
        intersection = set(split).intersection(set(self.mr_kw))
223
224 1
        if not intersection:
225
            return self.__fail("No mainrule/basicrule keyword.")
226 1
        elif len(intersection) > 1:
227
            return self.__fail("Multiple mainrule/basicrule keywords.")
228
229 1
        split.remove(intersection.pop())  # remove the mainrule/basicrule keyword
230
231 1
        if ";" in split:
232 1
            split.remove(";")
233
234 1
        while split:  # iterate while there is data, as handlers can defer
235 1
            for keyword in split:
236 1
                orig_kw = keyword
237 1
                keyword = keyword.strip()
238
239 1
                if keyword.endswith(";"):  # remove semi-colons
240 1
                    keyword = keyword[:-1]
241 1
                if keyword.startswith(('"', "'")) and (keyword[0] == keyword[-1]):  # remove (double-)quotes
242 1
                    keyword = keyword[1:-1]
243
244 1
                parsed = False
245 1
                for frag_kw in func_map:
246 1
                    if keyword.startswith(frag_kw):  # use the right parser
247 1
                        function = func_map[frag_kw]
248 1
                        if frag_kw in ('rx:', 'str:'):  # don't remove the leading "str:" or "rx:"
249 1
                            payload = keyword
250
                        else:
251 1
                            payload = keyword[len(frag_kw):]
252
253 1
                        if function(payload, assign=True) is True:
254 1
                            parsed = True
255 1
                            split.remove(orig_kw)
256 1
                            break
257 1
                        return self.__fail("parsing of element '{0}' failed.".format(keyword))
258
259 1
                if parsed is False:  # we have an item that wasn't successfully parsed
260
                    return self.__fail("'{}' is an invalid element and thus can not be parsed.".format(keyword))
261
        return True
262