Completed
Pull Request — master (#36)
by
unknown
01:40
created

import_rules()   D

Complexity

Conditions 8

Size

Total Lines 45

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 29
CRAP Score 8.017

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 8
c 1
b 0
f 0
dl 0
loc 45
ccs 29
cts 31
cp 0.9355
crap 8.017
rs 4
1 1
import logging
2 1
import re
3 1
import string
4
import os
5 1
6 1
7
from time import time
8 1
from flask import Blueprint, render_template, request, redirect, flash, Response, url_for, current_app
9 1
#from sqlalchemy.exc import IntegrityError
10 1
from pysqlite2.dbapi2 import IntegrityError
11 1
12
from spike.model import db
13 1
from spike.model.naxsi_rules import NaxsiRules
14
from spike.model.naxsi_rulesets import NaxsiRuleSets
15
from spike.model import naxsi_mz, naxsi_score
16 1
17
18 1
19 1
from werkzeug import secure_filename
20
21
rules = Blueprint('rules', __name__)
22 1
23
24
@rules.route("/")
25 1
def index():
26
    _rules = NaxsiRules.query.order_by(NaxsiRules.sid.desc()).all()
27 1
    if not _rules:
28 1
        flash("No rules found, please create one", "success")
29
        return redirect(url_for("rules.new"))
30
    return render_template("rules/index.html", rules=_rules)
31 1
32
33
@rules.route("/plain/<int:sid>", methods=["GET"])
34 1
def plain(sid):
35
    _rule = NaxsiRules.query.filter(NaxsiRules.sid == sid).first()
36 1
    if not _rule:
37 1
        flash("No rules found, please create one", "error")
38 1
        return redirect(url_for("rules.new"))
39 1
    return Response(_rule.fullstr(), mimetype='text/plain')
40 1
41
42
@rules.route("/view/<int:sid>", methods=["GET"])
43 1
def view(sid):
44
    _rule = NaxsiRules.query.filter(NaxsiRules.sid == sid).first()
45 1
    if _rule is None:
46
        flash("no rules found, please create one", "error")
47 1
        return redirect(url_for("rules.index"))
48 1
    return render_template("rules/view.html", rule=_rule, rtext=_rule)
49
50
51 1
@rules.route("/search/", methods=["GET"])
52 1
def search():
53
    terms = request.args.get('s', '')
54 1
55 1
    if len(terms) < 2:
56
        return redirect(url_for("rules.index"))
57 1
58
    # No fancy injections
59 1
    whitelist = set(string.ascii_letters + string.digits + ':-_ ')
60 1
    filtered = ''.join(filter(whitelist.__contains__, terms))
61
62
    if filtered.isdigit():  # get rule by id
63
        _rules = db.session.query(NaxsiRules).filter(NaxsiRules.sid == int(filtered))
64
    else:
65
        cve = re.search('cve:\d{4}-\d{4,}', filtered, re.IGNORECASE)  # search by CVE
66
67 1
        expression = '%' + filtered + '%'
68 1
        _rules = db.session.query(NaxsiRules).filter(
69 1
            db.or_(
70 1
                NaxsiRules.msg.like(expression),
71
                NaxsiRules.rmks.like(expression),
72
                NaxsiRules.detection.like(expression)
73 1
            )
74
        )
75 1
        if cve:
76 1
            _rules.filter(NaxsiRules.msg.like('%' + cve.group() + '%'))
77
    _rules = _rules.order_by(NaxsiRules.sid.desc()).all()
78
    return render_template("rules/index.html", rules=_rules, selection="Search: %s" % filtered, lsearch=terms)
79 1
80
81 1
@rules.route("/import", methods=["POST", "GET"])
82 1
def import_rules():
83 1
    """
84
    Import rules to an existing ruleset from a file.
85
    :return:
86 1
    """
87 1
    _rulesets = NaxsiRuleSets.query.all()
88
89 1
    if request.method == "GET":
90 1
        return render_template("rules/import.html", rulesets=_rulesets)
91 1
    success_imports = 0
92
    potential_imports = 0
93 1
    upfile = request.files['file']
94
    ruleset = request.form.get("ruleset", "")
95
    flash("Importing in ruleset {0}".format(ruleset))
96
    if not ruleset or not upfile:
97 1
        flash("missing rule file and/or ruleset name.")
98
        return redirect(url_for("rules.new"))
99 1
100 1
    filename = secure_filename(upfile.filename)
101 1
    upfile.save(os.path.join(current_app.config['UPLOAD_FOLDER'], filename))
102 1
    raw = open(os.path.join(current_app.config['UPLOAD_FOLDER'], filename), "r")
103 1
    for potential_rule in raw.readlines():
104 1
        potential_rule = potential_rule.strip()
105 1
        # Save ourselves some time by not trying to import comments
106
        if potential_rule.startswith("#"):
107 1
            continue
108 1
        potential_imports += 1
109
        tmp = NaxsiRules(ruleset=ruleset, active=1)
110 1
        if tmp.parse_rule(potential_rule) is False:
111
            print "Parsing failed for '{}'".format(potential_rule)
112
            print "errors : {0}".format(tmp.error)
113 1
114
            continue
115 1
        else:
116 1
            db.session.add(tmp)
117 1
            try:
118
                db.session.commit()
119 1
                success_imports += 1
120 1
            except:
121 1
                #pysqlite2.dbapi2.IntegrityError
122 1
                db.session.rollback()
123 1
                flash("Rule #{0} has no unique ID, skip".format(tmp.sid))
124
    flash("Imported {0} out of {1} rules in ruleset {2}".format(success_imports, potential_imports, ruleset))
125
    return render_template("rules/import.html", rulesets=_rulesets)
126 1
127
128
@rules.route("/new", methods=["GET", "POST"])
129
def new():
130 1
    latest_sid = NaxsiRules.query.order_by(NaxsiRules.sid.desc()).first()
131
    if latest_sid is None:
132 1
        sid = 200001
133 1
    else:
134 1
        sid = latest_sid.sid + 1
135 1
136 1
    if request.method == "GET":
137 1
        _rulesets = NaxsiRuleSets.query.all()
138 1
        return render_template("rules/new.html", mz=naxsi_mz, rulesets=_rulesets, score=naxsi_score, latestn=sid)
139 1
140 1
    # create new rule
141 1
    logging.debug('Posted new request: %s', request.form)
142 1
    mz = "|".join(filter(len, request.form.getlist("mz") + request.form.getlist("custom_mz_val")))
143 1
144 1
    score = request.form.get("score", "")
145
    score += ':'
146 1
    score += request.form.get("score_%s" % request.form.get("score", ""), "")
147 1
148 1
    nrule = NaxsiRules(request.form.get("msg", ""), request.form.get("detection", ""), mz, score, sid,
149 1
                       request.form.get("ruleset", ""), request.form.get("rmks", ""), "1",
150 1
                       request.form.get("negative", "") == 'checked', int(time()))
151
152 1
    nrule.validate()
153 1
154
    if nrule.error:
155 1
        for error in nrule.error:
156
            flash(error, category='error')
157
        return redirect(url_for("rules.new"))
158 1 View Code Duplication
    elif nrule.warnings:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
159 1
        for warning in nrule.warnings:
160 1
            flash(warning, category='warnings')
161 1
162
    db.session.add(nrule)
163
    db.session.commit()
164 1
165 1
    return redirect("/rules/edit/%s" % sid)
166
167 1
168 1
@rules.route("/edit/<int:sid>", methods=["GET", "POST"])
169
def edit(sid):
170
    rinfo = NaxsiRules.query.filter(NaxsiRules.sid == sid).first()
171 1
    if not rinfo:
172
        return redirect(url_for("rules.index"))
173 1
174 1
    _rulesets = NaxsiRuleSets.query.all()
175 1
    rruleset = NaxsiRuleSets.query.filter(NaxsiRuleSets.name == rinfo.ruleset).first()
176
    custom_mz = ""
177 1
    mz_check = rinfo.mz
178 1
    if re.search(r"^\$[A-Z]+:(.*)\|[A-Z]+", mz_check):
179
        custom_mz = mz_check
180 1
        rinfo.mz = "custom"
181 1
    return render_template("rules/edit.html", mz=naxsi_mz, rulesets=_rulesets, score=naxsi_score, rules_info=rinfo,
182
                           rule_ruleset=rruleset, custom_mz=custom_mz)
183 1
184 1
185 1
@rules.route("/save/<int:sid>", methods=["POST"])
186
def save(sid):
187
    mz = "|".join(filter(len, request.form.getlist("mz") + request.form.getlist("custom_mz_val")))
188
    score = "{}:{}".format(request.form.get("score", ""), request.form.get("score_%s" % request.form.get("score", "")))
189
    nrule = NaxsiRules.query.filter(NaxsiRules.sid == sid).first()
190
    nrule.msg = request.form.get("msg", "")
191
    nrule.detection = request.form.get("detection", "")
192
    nrule.mz = mz
193
    nrule.score = score
194
    nrule.ruleset = request.form.get("ruleset", "")
195
    nrule.rmks = request.form.get("rmks", "")
196
    nrule.active = request.form.get("active", "")
197
    nrule.negative = request.form.get("negative", "") == 'checked'
198
    nrule.timestamp = int(time())
199
    nrule.validate()
200
201
    if nrule.error:
202
        flash(",".join(nrule.error), 'error')
203
        return redirect("/rules/edit/%s" % sid)
204
    elif nrule.warnings:
205
        flash(",".join(nrule.warnings), 'warning')
206
207
    db.session.add(nrule)
208
    db.session.commit()
209
210
    return redirect("/rules/edit/%s" % sid)
211
212
213
@rules.route("/del/<int:sid>", methods=["GET"])
214
def del_sid(sid=''):
215
    nrule = NaxsiRules.query.filter(NaxsiRules.sid == sid).first()
216
    if not nrule:
217
        return redirect(url_for("rules.index"))
218
219
    db.session.delete(nrule)
220
    db.session.commit()
221
222
    flash("Successfully deleted %s : %s" % (sid, nrule.msg), "success")
223
    return redirect(url_for("rules.index"))
224
225
226
@rules.route("/deact/<int:sid>", methods=["GET"])
227
def deact(sid):
228
    nrule = NaxsiRules.query.filter(NaxsiRules.sid == sid).first()
229
    if nrule is None:
230
        return redirect(url_for("rules.index"))
231
232
    fm = 'deactivate' if nrule.active else 'reactivate'
233
    nrule.active = not nrule.active
234
235
    db.session.add(nrule)
236
    db.session.commit()
237
238
    flash("Successfully deactivated %s %sd : %s" % (fm, sid, nrule.msg), "success")
239
    _rulesets = NaxsiRuleSets.query.all()
240
    return render_template("rules/edit.html", mz=naxsi_mz, rulesets=_rulesets, score=naxsi_score, rules_info=nrule)
241