Issues (5)

traceroutedb/server.py (1 issue)

1
#!/usr/bin/env python
2
3
from __future__ import print_function
4
from flask import Flask, request, abort
5
from werkzeug.contrib.cache import SimpleCache
6
import sys
7
import json
8
import geoip2.database
9
from geoip2.errors import AddressNotFoundError
10
from tabulate import tabulate
11
from traceroutedb.log import logger
12
13
try:
14
    import psycopg2
15
    from psycopg2 import extras as pgextras
16
except ImportError:
17
    print("psycopg2 needed to run server")
18
    sys.exit(1)
19
20
app = Flask(__name__)
21
cache = SimpleCache()
22
23
24
CACHE_TIMEOUT = 300
25
26
27
class cached(object):
28
29
    def __init__(self, timeout=None):
30
        self.timeout = timeout or CACHE_TIMEOUT
31
32
    def __call__(self, f):
33
        def decorator(*args, **kwargs):
34
            response = cache.get(request.path)
35
            if response is None:
36
                response = f(*args, **kwargs)
37
                cache.set(request.path, response, self.timeout)
38
            return response
39
        return decorator
40
41
42
def dictToHstore(in_dict):
43
    single_kvs = []
44
    for key in in_dict.iterkeys():
45
        if in_dict[key]:
46
            single_kvs.append('{0}=>"{1}"'.format(key, in_dict[key]))
47
    hstore_str = "'{0}'".format(", ".join(single_kvs))
48
    return hstore_str
49
50
51
@app.route("/rules", methods=["GET"])
52
@cached()
53
def get_rules():
54
    try:
55
        connv = psycopg2.connect("dbname=trdb user=postgres host=localhost")
56
    except psycopg2.OperationalError as e:
57
        logger.error(str(e))
58
        abort(503)
59
    cur = connv.cursor()
60
    cur.execute("SELECT * FROM endpoints;")
61
    rows = cur.fetchall()
62
    connv.close()
63
    return json.dumps(rows)
64
65
66
@app.route("/trace", methods=["POST"])
67
def receive_traces():
68
    config = app.config["trdb"]
69
    if config.get("mmdb", False):
70
        reader = config["mmdb"]
71
72
    if request.method == "POST":
73
        cur = conn.cursor()
74
        data = request.get_json(force=True)
75
76
        trace = data["data"]
77
        reporter = data["reporter"]
78
        kvs = {"note": data.get("note"), "ext_ip": data.get("ext_ip")}
79
80
        if config.debug:
81
            print("SELECT nextval('traceroute_id_seq');")
82
            trace_id = "DEBUG_ID"
83
        else:
84
            try:
85
                cur.execute("SELECT nextval('traceroute_id_seq');")
86
            except psycopg2.ProgrammingError as e:
87
                logger.error(str(e))
88
                conn.rollback()
89
                abort(503)
90
            trace_id = cur.fetchone()[0]
91
92
        trace_sql = "INSERT INTO traceroute VALUES ({0}, '{1}', '{2}', now(), '{3}', {4}::HSTORE);".format(trace_id, trace["src_ip"], trace["dst_ip"], reporter, dictToHstore(kvs))
93
        if config.debug:
94
            print(trace_sql)
95
        else:
96
            try:
97
                cur.execute(trace_sql)
98
            except psycopg2.ProgrammingError as e:
99
                logger.error(str(e))
100
                conn.rollback()
101
                abort(503)
102
103
        hops = trace["hops"]
104
        for key in hops.keys():
105
            hop = hops[key]
106
            for probe in hop:
107
                if probe["ip"] is None or probe["rtt"] is None:
108
                    continue
109
                else:
110
                    try:
111
                        probe["isp"] = reader.isp(probe["ip"])
112
                    except AddressNotFoundError:
113
                        probe["isp"] = None
114
115
                kvs = {}
116
                time = probe.get("rtt", None)
117
                if time is not None and time != "None":
118
                    kvs["time"] = time
119
120
                anno = probe.get("anno", None)
121
                if anno:
122
                    kvs["anno"] = anno
123
124
                asn = probe.get("isp").raw.get("autonomous_system_number", None) if probe.get("isp") else False
125
                if asn:
126
                    kvs["asn"] = asn
127
128
                kvs = dictToHstore(kvs)
129
                if config.debug:
130
                    print("INSERT INTO hop VALUES (nextval('probe_id_seq'), {0}, {1}, {2}, '{3}', now());".format(trace_id, key, kvs, probe["ip"]))
131
                else:
132
                    try:
133
                        cur.execute("INSERT INTO hop VALUES (nextval('probe_id_seq'), {0}, {1}, {2}, '{3}', now());".format(trace_id, key, kvs, probe["ip"]))
134
                    except psycopg2.ProgrammingError as e:
135
                        logger.error(str(e))
136
                        conn.rollback()
137
                        abort(503)
138
    conn.commit()
139
    cur.close()
140
    conn.commit()
141
    print(request.remote_addr, "submitted result for:", trace["src_ip"], ">", trace["dst_ip"], "trace_id:", trace_id)
142
    return 'OK'
143
144
145
@app.route("/trace/<trace_id>", methods=["GET"])
146
def lookup_trace(trace_id):
147
    cur = conn.cursor(cursor_factory=pgextras.DictCursor)
148
    pgextras.register_hstore(cur)
149
    cur.execute("SELECT * FROM trv_trace WHERE traceroute_id = {id} ORDER BY traceroute_id,hop_number;".format(id=trace_id))
150
    rows = cur.fetchall()
151
    if "table" in request.args:
152
        headers = ["reporter", "traceroute_id", "origin_ip", "dest_ip", "probe_id",
153
                   "hop_number", "host", "hop_kvs"]
154
        return tabulate(rows, headers, tablefmt="psql")
155
    else:
156
        out = [json.dumps(x) for x in rows]
157
        return "\n".join(out)
158
159
160
try:
161
    conn = psycopg2.connect("dbname=trdb user=postgres host=localhost")
162
except psycopg2.OperationalError as e:
163
    logger.error(str(e))
164
    sys.exit(1)
165
166
167
def run_server(config, app=app):
0 ignored issues
show
Comprehensibility Bug introduced by
app is re-defining a name which is already available in the outer-scope (previously defined on line 20).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
168
    app.config["trdb"] = config
169
    if config.mmdb:
170
        reader = geoip2.database.Reader(config.mmdb)
171
        app.config["trdb"]["mmdb"] = reader
172
173
    logger.warn("Starting sever")
174
    app.run(port=9001, debug=config.debug, host='::')
175