Completed
Push — master ( 58aa39...1597ed )
by -
01:31
created

NaxsiRules.parse_rule()   F

Complexity

Conditions 14

Size

Total Lines 51

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 30
CRAP Score 14.1472
Metric Value
cc 14
dl 0
loc 51
ccs 30
cts 33
cp 0.9091
crap 14.1472
rs 2.8289

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like NaxsiRules.parse_rule() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
from time import strftime, localtime
2
3 1
from spike.model import db
4 1
from shlex import shlex
5
6
7 1
class NaxsiRules(db.Model):
8 1
    __bind_key__ = 'rules'
9 1
    __tablename__ = 'naxsi_rules'
10
11 1
    id = db.Column(db.Integer, primary_key=True)
12 1
    msg = db.Column(db.String(), nullable=False)
13 1
    detection = db.Column(db.String(1024), nullable=False)
14 1
    mz = db.Column(db.String(1024), nullable=False)
15 1
    score = db.Column(db.String(1024), nullable=False)
16 1
    sid = db.Column(db.Integer, nullable=False, unique=True)
17 1
    ruleset = db.Column(db.String(1024), nullable=False)
18 1
    rmks = db.Column(db.Text, nullable=True, server_default="")
19 1
    active = db.Column(db.Integer, nullable=False, server_default="1")
20 1
    negative = db.Column(db.Integer, nullable=False, server_default='0')
21 1
    timestamp = db.Column(db.Integer, nullable=False)
22
23 1
    mr_kw = ["MainRule", "BasicRule", "main_rule", "basic_rule"]
24 1
    static_mz = {"$ARGS_VAR", "$BODY_VAR", "$URL", "$HEADERS_VAR"}
25 1
    full_zones = {"ARGS", "BODY", "URL", "HEADERS", "FILE_EXT", "RAW_BODY"}
26 1
    rx_mz = {"$ARGS_VAR_X", "$BODY_VAR_X", "$URL_X", "$HEADERS_VAR_X"}
27 1
    sub_mz = list(static_mz) + list(full_zones) + list(rx_mz)
28
29 1
    def __init__(self, msg="", detection="", mz="", score="", sid=42000, ruleset="", rmks="", active=0, negative=0,
30
                 timestamp=0):
31 1
        self.msg = msg
32 1
        self.detection = detection
33 1
        self.mz = mz
34 1
        self.score = score
35 1
        self.sid = sid
36 1
        self.ruleset = ruleset
37 1
        self.rmks = rmks
38 1
        self.active = active
39 1
        self.negative = 1 if negative == 'checked' else 0
40 1
        self.timestamp = timestamp
41 1
        self.warnings = []
42 1
        self.error = []
43
44 1
    def fullstr(self):
45 1
        rdate = strftime("%F - %H:%M", localtime(float(str(self.timestamp))))
46 1
        rmks = "# ".join(self.rmks.strip().split("\n"))
47 1
        return "#\n# sid: {0} | date: {1}\n#\n# {2}\n#\n{3}".format(self.sid, rdate, rmks, self.__str__())
48
49 1
    def __str__(self):
50 1
        negate = 'negative' if self.negative == 1 else ''
51 1
        return 'MainRule {} "{}" "msg:{}" "mz:{}" "s:{}" id:{} ;'.format(
52
            negate, self.detection, self.msg, self.mz, self.score, self.sid)
53
54 1
    def explain(self):
55
        """ Return a string explaining the rule
56
57
         :return str: A textual explaination of the rule
58
         """
59 1
        translation = {'ARGS': 'argument', 'BODY': 'body', 'URL': 'url', 'HEADER': 'header'}
60 1
        explanation = 'The rule number <strong>%d</strong> is ' % self.sid
61 1
        if self.negative:
62
63
            explanation += '<strong>not</strong> '
64 1
        explanation += 'setting the '
65
66 1
        scores = []
67 1
        for score in self.score.split(','):
68 1
            scores.append('<strong>{0}</strong> to <strong>{1}</strong> '.format(*score.split(':', 1)))
69 1
        explanation += ', '.join(scores) + 'when it '
70 1
        if self.detection.startswith('str:'):
71 1
            explanation += 'finds the string <strong>{}</strong> '.format(self.detection[4:])
72
        else:
73
            explanation += 'matches the regexp <strong>{}</strong> '.format(self.detection[3:])
74
75 1
        zones = []
76 1
        for mz in self.mz.split('|'):
77 1
            if mz.startswith('$'):
78
                current_zone, arg = mz.split(":", 1)
79
                zone_name = "?"
80
81
                for translated_name in translation:  # translate zone names
82
                    if translated_name in current_zone:
83
                        zone_name = translation[translated_name]
84
85
                if "$URL" in current_zone:
86
                    regexp = "matching regex" if current_zone == "$URL_X" else ""
87
                    explanation += "on the URL {} '{}' ".format(regexp, arg)
88
                else:
89
                    regexp = "matching regex" if current_zone.endswith("_X") else ""
90
                    explanation += "in the var with name {} '{}' of {} ".format(regexp, arg, zone_name)
91
            else:
92 1
                zones.append('the <strong>{0}</strong>'.format(translation[mz]))
93 1
        return explanation
94
95 1
    def validate(self):
96 1
        self.__validate_matchzone(self.mz)
97 1
        self.__validate_id(self.sid)
98 1
        self.__validate_detection(self.detection)
99
100 1
        if not self.msg:
101
            self.warnings.append("Rule has no 'msg:'.")
102 1
        if not self.score:
103
            self.error.append("Rule has no score.")
104
105 1
    def __fail(self, msg):
106 1
        self.error.append(msg)
107 1
        return False
108
109
    # Bellow are parsers for specific parts of a rule
110
111 1
    def __validate_detection(self, p_str, label="", assign=False):
112 1
        p_str = label + p_str
113 1
        if not p_str.islower():
114 1
            self.warnings.append("detection {} is not lower-case. naxsi is case-insensitive".format(p_str))
115 1
        if p_str.startswith("str:") or p_str.startswith("rx:"):
116 1
            if assign is True:
117 1
                self.detection = p_str
118
        else:
119 1
            return self.__fail("detection {} is neither rx: or str:".format(p_str))
120 1
        return True
121
122 1
    def __validate_genericstr(self, p_str, label="", assign=False):
123 1
        if assign is False:
124
            return True
125 1
        if label == "s:":
126 1
            self.score = p_str
127 1
        elif label == "msg:":
128 1
            self.msg = p_str
129
        elif label == "negative":
130
            self.negative = 1
131
        elif label != "":
132
            return self.__fail("Unknown fragment {}".format(label+p_str))
133 1
        return True
134
135 1
    def __validate_matchzone(self, p_str, label="", assign=False):
0 ignored issues
show
Unused Code introduced by
The argument label seems to be unused.
Loading history...
136 1
        has_zone = False
137 1
        mz_state = set()
138 1
        for loc in p_str.split('|'):
139 1
            keyword, arg = loc, None
140 1
            if loc.startswith("$"):
141 1
                if loc.find(":") == -1:
142
                    return self.__fail("Missing 2nd part after ':' in {0}".format(loc))
143 1
                keyword, arg = loc.split(":")
144
145 1
            if keyword not in self.sub_mz:  # check if `keyword` is a valid keyword
146
                return self.__fail("'{0}' no a known sub-part of mz : {1}".format(keyword, self.sub_mz))
147
148 1
            mz_state.add(keyword)
149
150
            # verify that the rule doesn't attempt to target REGEX and STATIC _VAR/URL at the same time
151 1
            if len(self.rx_mz & mz_state) and len(self.static_mz & mz_state):
152 1
                return self.__fail("You can't mix static $* with regex $*_X ({})".format(', '.join(mz_state)))
153
154 1
            if arg and not arg.islower(): # just a gentle reminder
155 1
                self.warnings.append("{0} in {1} is not lowercase. naxsi is case-insensitive".format(arg, loc))
156
157
            # the rule targets an actual zone
158 1
            if keyword not in ["$URL", "$URL_X"] and keyword in (self.rx_mz | self.full_zones | self.static_mz):
0 ignored issues
show
Unused Code Coding Style introduced by
There is an unnecessary parenthesis after in.
Loading history...
159 1
                has_zone = True
160
161 1
        if has_zone is False:
162
            return self.__fail("The rule/whitelist doesn't target any zone.")
163
164 1
        if assign is True:
165 1
            self.mz = p_str
166
167 1
        return True
168
169 1
    def __validate_id(self, p_str, label="", assign=False):
0 ignored issues
show
Unused Code introduced by
The argument label seems to be unused.
Loading history...
170 1
        try:
171 1
            num = int(p_str)
172 1
            if num < 10000:
173 1
                self.warnings.append("rule IDs below 10k are reserved ({0})".format(num))
174
        except ValueError:
175
            self.error.append("id:{0} is not numeric".format(p_str))
176
            return False
177 1
        if assign is True:
178 1
            self.sid = num
179 1
        return True
180
181 1
    @staticmethod
182
    def splitter(s):
0 ignored issues
show
Coding Style Naming introduced by
The name s does not conform to the argument naming conventions ([a-z_][a-z0-9_]{1,30}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
183 1
        lexer = shlex(s)
184 1
        lexer.whitespace_split = True
185 1
        return list(iter(lexer.get_token, ''))
186
187 1
    def parse_rule(self, full_str):
188
        """
189
        Parse and validate a full naxsi rule
190
        :param full_str: raw rule
191
        :return: [True|False, dict]
192
        """
193 1
        self.warnings = []
194 1
        self.error = []
195
196 1
        func_map = {"id:": self.__validate_id, "str:": self.__validate_detection,
197
                    "rx:": self.__validate_detection, "msg:": self.__validate_genericstr,
198
                    "mz:": self.__validate_matchzone, "negative": self.__validate_genericstr,
199
                    "s:": self.__validate_genericstr}
200 1
        ret = False
201 1
        split = self.splitter(full_str)  # parse string
202 1
        intersection = set(split).intersection(set(self.mr_kw))
203
204 1
        if not intersection:
205
            return self.__fail("No mainrule/basicrule keyword.")
206 1
        elif len(intersection) > 1:
207
            return self.__fail("Multiple mainrule/basicrule keywords.")
208
209 1
        split.remove(intersection.pop())  # remove the mainrule/basicrule keyword
210
211 1
        if ";" in split:
212 1
            split.remove(";")
213
214 1
        while split:  # iterate while there is data, as handlers can defer
215 1
            for keyword in split:
216 1
                orig_kw = keyword
217 1
                keyword = keyword.strip()
218
219 1
                if keyword.endswith(";"):  # remove semi-colons
220 1
                    keyword = keyword[:-1]
221 1
                if keyword.startswith(('"', "'")) and (keyword[0] == keyword[-1]):  # remove (double-)quotes
222 1
                    keyword = keyword[1:-1]
223
224 1
                for frag_kw in func_map:
225 1
                    ret = False
226 1
                    if keyword.startswith(frag_kw):
227
                        # parser funcs returns True/False
228 1
                        ret = func_map[frag_kw](keyword[len(frag_kw):], label=frag_kw, assign=True)
229 1
                        if ret is True:
230 1
                            split.remove(orig_kw)
231
                        else:
232 1
                            return self.__fail("parsing of element '{0}' failed.".format(keyword))
233 1
                        break
234
235 1
                if orig_kw in split and ret is not None:  # we have an item that wasn't successfully parsed
236
                    return False
237
        return True
238