Completed
Push — master ( 1c2da9...58aa39 )
by -
01:41
created

NaxsiRules   F

Complexity

Total Complexity 63

Size/Duplication

Total Lines 222
Duplicated Lines 0 %

Test Coverage

Coverage 83.72%
Metric Value
dl 0
loc 222
ccs 144
cts 172
cp 0.8372
rs 3.6585
wmc 63

12 Methods

Rating   Name   Duplication   Size   Complexity  
A fullstr() 0 4 1
A __str__() 0 4 2
B __validate_detection() 0 10 5
F explain() 0 37 11
F __validate_matchzone() 0 28 13
A splitter() 0 6 1
B __validate_genericstr() 0 12 6
F parse_rule() 0 49 14
A validate() 0 9 3
A __fail() 0 3 1
A __init__() 0 14 2
A __validate_id() 0 11 4

How to fix   Complexity   

Complex Class

Complex classes like NaxsiRules often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from time import strftime, localtime
2
3 1
from spike.model import db
4 1
from shlex import shlex
5
6
7 1
class NaxsiRules(db.Model):
8 1
    __bind_key__ = 'rules'
9 1
    __tablename__ = 'naxsi_rules'
10
11 1
    id = db.Column(db.Integer, primary_key=True)
12 1
    msg = db.Column(db.String(), nullable=False)
13 1
    detection = db.Column(db.String(1024), nullable=False)
14 1
    mz = db.Column(db.String(1024), nullable=False)
15 1
    score = db.Column(db.String(1024), nullable=False)
16 1
    sid = db.Column(db.Integer, nullable=False, unique=True)
17 1
    ruleset = db.Column(db.String(1024), nullable=False)
18 1
    rmks = db.Column(db.Text, nullable=True, server_default="")
19 1
    active = db.Column(db.Integer, nullable=False, server_default="1")
20 1
    negative = db.Column(db.Integer, nullable=False, server_default='0')
21 1
    timestamp = db.Column(db.Integer, nullable=False)
22
23 1
    mr_kw = ["MainRule", "BasicRule", "main_rule", "basic_rule"]
24 1
    static_mz = {"$ARGS_VAR", "$BODY_VAR", "$URL", "$HEADERS_VAR"}
25 1
    full_zones = {"ARGS", "BODY", "URL", "HEADERS", "FILE_EXT", "RAW_BODY"}
26 1
    rx_mz = {"$ARGS_VAR_X", "$BODY_VAR_X", "$URL_X", "$HEADERS_VAR_X"}
27 1
    sub_mz = list(static_mz) + list(full_zones) + list(rx_mz)
28
29 1
    def __init__(self, msg="", detection="", mz="", score="", sid=42000, ruleset="", rmks="", active=0, negative=0,
30
                 timestamp=0):
31 1
        self.msg = msg
32 1
        self.detection = detection
33 1
        self.mz = mz
34 1
        self.score = score
35 1
        self.sid = sid
36 1
        self.ruleset = ruleset
37 1
        self.rmks = rmks
38 1
        self.active = active
39 1
        self.negative = 1 if negative == 'checked' else 0
40 1
        self.timestamp = timestamp
41 1
        self.warnings = []
42 1
        self.error = []
43
44 1
    def fullstr(self):
45 1
        rdate = strftime("%F - %H:%M", localtime(float(str(self.timestamp))))
46 1
        rmks = "# ".join(self.rmks.strip().split("\n"))
47 1
        return "#\n# sid: {0} | date: {1}\n#\n# {2}\n#\n{3}".format(self.sid, rdate, rmks, self.__str__())
48
49 1
    def __str__(self):
50 1
        negate = 'negative' if self.negative == 1 else ''
51 1
        return 'MainRule {} "{}" "msg:{}" "mz:{}" "s:{}" id:{} ;'.format(
52
            negate, self.detection, self.msg, self.mz, self.score, self.sid)
53
54 1
    def explain(self):
55
        """ Return a string explaining a rule """
56 1
        translation = {'ARGS': 'argument', 'BODY': 'body', 'URL': 'url', 'HEADER': 'header'}
57 1
        explanation = 'The rule number <strong>%d</strong> is ' % self.sid
58 1
        if self.negative:
59
60
            explanation += '<strong>not</strong> '
61 1
        explanation += 'setting the '
62
63 1
        scores = []
64 1
        for score in self.score.split(','):
65 1
            scores.append('<strong>{0}</strong> to <strong>{1}</strong> '.format(*score.split(':', 1)))
66 1
        explanation += ', '.join(scores) + 'when it '
67 1
        if self.detection.startswith('str:'):
68 1
            explanation += 'finds the string <strong>{}</strong> '.format(self.detection[4:])
69
        else:
70
            explanation += 'matches the regexp <strong>{}</strong> '.format(self.detection[3:])
71
72 1
        zones = []
73 1
        for mz in self.mz.split('|'):
74 1
            if mz.startswith('$'):
75
                current_zone, arg = mz.split(":", 1)
76
                zone_name = "?"
77
78
                for translated_name in translation:  # translate zone names
79
                    if translated_name in current_zone:
80
                        zone_name = translation[translated_name]
81
82
                if "$URL" in current_zone:
83
                    regexp = "matching regex" if current_zone == "$URL_X" else ""
84
                    explanation += "on the URL {} '{}' ".format(regexp, arg)
85
                else:
86
                    regexp = "matching regex" if current_zone.endswith("_X") else ""
87
                    explanation += "in the var with name {} '{}' of {} ".format(regexp, arg, zone_name)
88
            else:
89 1
                zones.append('the <strong>{0}</strong>'.format(translation[mz]))
90 1
        return explanation
91
92 1
    def validate(self):
93 1
        self.__validate_matchzone(self.mz)
94 1
        self.__validate_id(self.sid)
95 1
        self.__validate_detection(self.detection)
96
97 1
        if not self.msg:
98
            self.warnings.append("Rule has no 'msg:'.")
99 1
        if not self.score:
100
            self.error.append("Rule has no score.")
101
102 1
    def __fail(self, msg):
103 1
        self.error.append(msg)
104 1
        return False
105
106
    # Bellow are parsers for specific parts of a rule
107
108 1
    def __validate_detection(self, p_str, label="", assign=False):
109 1
        p_str = label + p_str
110 1
        if not p_str.islower():
111 1
            self.warnings.append("detection {} is not lower-case. naxsi is case-insensitive".format(p_str))
112 1
        if p_str.startswith("str:") or p_str.startswith("rx:"):
113 1
            if assign is True:
114 1
                self.detection = p_str
115
        else:
116 1
            return self.__fail("detection {} is neither rx: or str:".format(p_str))
117 1
        return True
118
119 1
    def __validate_genericstr(self, p_str, label="", assign=False):
120 1
        if assign is False:
121
            return True
122 1
        if label == "s:":
123 1
            self.score = p_str
124 1
        elif label == "msg:":
125 1
            self.msg = p_str
126
        elif label == "negative":
127
            self.negative = 1
128
        elif label != "":
129
            return self.__fail("Unknown fragment {}".format(label+p_str))
130 1
        return True
131
132 1
    def __validate_matchzone(self, p_str, label="", assign=False):
0 ignored issues
show
Unused Code introduced by
The argument label seems to be unused.
Loading history...
133 1
        has_zone = False
134 1
        mz_state = set()
135 1
        locs = p_str.split('|')
136 1
        for loc in locs:
137 1
            keyword, arg = loc, None
138 1
            if loc.startswith("$"):
139 1
                if loc.find(":") == -1:
140
                    return self.__fail("Missing 2nd part after ':' in {0}".format(loc))
141 1
                keyword, arg = loc.split(":")
142
            # check it is a valid keyword
143 1
            if keyword not in self.sub_mz:
144
                return self.__fail("'{0}' no a known sub-part of mz : {1}".format(keyword, self.sub_mz))
145 1
            mz_state.add(keyword)
146
            # verify the rule doesn't attempt to target REGEX and STATIC _VAR/URL at the same time
147 1
            if len(self.rx_mz & mz_state) and len(self.static_mz & mz_state):
148 1
                return self.__fail("You can't mix static $* with regex $*_X ({})".format(', '.join(mz_state)))
149
            # just a gentle reminder
150 1
            if arg and arg.islower() is False:
151 1
                self.warnings.append("{0} in {1} is not lowercase. naxsi is case-insensitive".format(arg, loc))
152
            # the rule targets an actual zone
153 1
            if keyword not in ["$URL", "$URL_X"] and keyword in (self.rx_mz | self.full_zones | self.static_mz):
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after in.
Loading history...
154 1
                has_zone = True
155 1
        if has_zone is False:
156
            return self.__fail("The rule/whitelist doesn't target any zone.")
157 1
        if assign is True:
158 1
            self.mz = p_str
159 1
        return True
160
161 1
    def __validate_id(self, p_str, label="", assign=False):
0 ignored issues
show
Unused Code introduced by
The argument label seems to be unused.
Loading history...
162 1
        try:
163 1
            num = int(p_str)
164 1
            if num < 10000:
165 1
                self.warnings.append("rule IDs below 10k are reserved ({0})".format(num))
166
        except ValueError:
167
            self.error.append("id:{0} is not numeric".format(p_str))
168
            return False
169 1
        if assign is True:
170 1
            self.sid = num
171 1
        return True
172
173 1
    @staticmethod
174
    def splitter(s):
0 ignored issues
show
Coding Style Naming introduced by
The name s does not conform to the argument naming conventions ([a-z_][a-z0-9_]{1,30}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
175 1
        lexer = shlex(s)
176 1
        lexer.whitespace_split = True
177 1
        items = list(iter(lexer.get_token, ''))
178 1
        return items
179
180 1
    def parse_rule(self, full_str):
181
        """
182
        Parse and validate a full naxsi rule
183
        :param full_str: raw rule
184
        :return: [True|False, dict]
185
        """
186 1
        self.warnings = []
187 1
        self.error = []
188
189 1
        func_map = {"id:": self.__validate_id, "str:": self.__validate_detection,
190
                    "rx:": self.__validate_detection, "msg:": self.__validate_genericstr, "mz:": self.__validate_matchzone,
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (123/120).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
191
                    "negative": self.__validate_genericstr, "s:": self.__validate_genericstr}
192 1
        ret = False
193 1
        split = self.splitter(full_str)  # parse string
194 1
        intersection = set(split).intersection(set(self.mr_kw))
195
196 1
        if not intersection:
197
            return self.__fail("No mainrule/basicrule keyword.")
198 1
        elif len(intersection) > 1:
199
            return self.__fail("Multiple mainrule/basicrule keywords.")
200
201 1
        split.remove(intersection.pop())  # remove the mainrule/basicrule keyword
202
203 1
        if ";" in split:
204 1
            split.remove(";")
205
206 1
        while split:  # iterate while there is data, as handlers can defer
207 1
            for keyword in split:
208 1
                orig_kw = keyword
209 1
                keyword = keyword.strip()
210
211 1
                if keyword.endswith(";"):  # remove semi-colons
212 1
                    keyword = keyword[:-1]
213 1
                if keyword.startswith(('"', "'")) and (keyword[0] == keyword[-1]):  # remove (double-)quotes
214 1
                    keyword = keyword[1:-1]
215 1
                for frag_kw in func_map:
216 1
                    ret = False
217 1
                    if keyword.startswith(frag_kw):
218
                        # parser funcs returns True/False
219 1
                        ret = func_map[frag_kw](keyword[len(frag_kw):], label=frag_kw, assign=True)
220 1
                        if ret is True:
221 1
                            split.remove(orig_kw)
222
                        else:
223 1
                            return self.__fail("parsing of element '{0}' failed.".format(keyword))
224 1
                        break
225
                # we have an item that wasn't successfully parsed
226 1
                if orig_kw in split and ret is not None:
227
                    return False
228
        return True
229