Completed
Push — master ( 739b7a...711b3d )
by Glenn
01:21
created

skf/api/security.py (1 issue)

Severity
1
import jwt, datetime, re
2
3
from skf import settings
4
from flask import request, abort
5
from skf.api.restplus import api
0 ignored issues
show
Unused api imported from skf.api.restplus
Loading history...
6
from skf.database import db
7
from skf.database.logs import logs
8
9
10
def security_headers():
11
    """This decorator passes multiple security headers"""
12
    return {'X-Frame-Options': 'deny',
13
            'X-XSS-Protection': '1',
14
            'X-Content-Type-Options': 'nosniff',
15
            'Cache-Control': 'no-store, no-cache',
16
            'Strict-Transport-Security': 'max-age=16070400; includeSubDomains',
17
            'Server': 'Security Knowledge Framework API'}
18
19
20
def log(message, threat, status):
21
    """Create log entry and write events triggerd by the user, contains FAIL or SUCCESS and threat LOW MEDIUM HIGH"""
22
    now = datetime.datetime.now()
23
    dateLog = now.strftime("%Y-%m-%d")
24
    dateTime = now.strftime("%H:%M")
25
    try:
26
        ip = request.remote_addr
27
        if request.headers.get('Authorization'):
28
            token = request.headers.get('Authorization').split()[0]
29
            checkClaims = jwt.decode(token, settings.JWT_SECRET, algorithms='HS256')
30
            userID = checkClaims['UserId']
31
        else:
32
            userID = "0"
33
        event = logs(dateLog, dateTime, threat, ip, userID, status, message)
34
        db.session.add(event)
35
        db.session.commit()
36
    except:
37
        userID = "0"
38
        ip = "0.0.0.0"
39
        event = "Datelog: "+dateLog+" "+" Datetime: "+dateTime+" "+"Threat: "+threat+" "+" IP:"+ip+" "+"UserId: "+userID+" "+"Status: "+status+" "+"Message: "+message
40
41
42
def val_alpha(value):
43
    """User input validation for checking a-zA-Z"""
44
    match = re.findall(r"[^\w]|[\d]", str(value))
45
    if match:
46
        log("User supplied not an a-zA-Z", "MEDIUM", "FAIL")
47
        abort(400, "Validation Error")
48
    else:
49
        return True
50
51
52
def val_alpha_num(value):
53
    """User input validation for checking a-z A-Z 0-9 _ . -"""
54
    match = re.findall(r"[^\ \w\.-]", value)
55
    if match:
56
        log("User supplied not an a-z A-Z 0-9 _ . - value", "MEDIUM", "FAIL")
57
        abort(400, "Validation Error")
58
    else:
59
        return True
60
61
62
def val_alpha_num_special(value):
63
    """User input validation for checking a-z A-Z 0-9 _ . - ' , " """
64
    match = re.findall(r"[^\ \w_\.\-\'\",\+\(\)\/\:@\?\&\=\%]", value)
65
    if match:
66
        log("User supplied not an a-z A-Z 0-9 _ . - +' \" , value", "MEDIUM", "FAIL")
67
        abort(400, "Validation Error")
68
    else:
69
        return True
70
71
72
73
def val_num(value): 
74
    """User input validation for checking numeric values only 0-9"""
75
    if not isinstance( value, int ):
76
        log("User supplied not an 0-9", "MEDIUM", "FAIL")
77
        abort(400, "Validation Error")
78
    else:
79
        return True
80
81
82
def val_float(value): 
83
    """User input validation for checking float values only 0-9 ."""
84
    if not isinstance( value, float ):
85
        log("User supplied not a float value.", "MEDIUM", "FAIL")
86
        abort(400, "Validation Error")
87
    else:
88
        return True
89
90
91
def validate_privilege(self, privilege):
92
    """Validates the JWT privileges"""
93
    if not request.headers.get('Authorization'):
94
        log("Request sent with missing JWT header", "HIGH", "FAIL")
95
        abort(403, 'JWT missing authorization header')
96
    try:
97
        check_privilege = select_privilege_jwt(self)
98
    except jwt.exceptions.DecodeError:
99
        log("User JWT header could not be decoded", "HIGH", "FAIL")
100
        abort(403, 'JWT decode error')
101
    except jwt.exceptions.ExpiredSignature:
102
        log("User JWT header is expired", "HIGH", "FAIL")
103
        abort(403, 'JWT token expired')
104
    privileges = check_privilege['privilege'].split(':')
105
    for value in privileges:
106
        if value == privilege:
107
            return True
108
    log("User JWT header contains wrong privilege", "HIGH", "FAIL")
109
    return  abort(403, 'JWT wrong privileges')
110
111
112
def select_userid_jwt(self):
113
    """Returns the userID from the JWT authorization token"""
114
    token = request.headers.get('Authorization').split()[0]
115
    try:
116
        checkClaims = jwt.decode(token, settings.JWT_SECRET, algorithms='HS256')
117
    except jwt.exceptions.DecodeError:
118
        log("User JWT header could not be decoded", "HIGH", "FAIL")
119
        abort(403, 'JWT decode error')
120
    except jwt.exceptions.ExpiredSignature:
121
        log("User JWT header is expired", "HIGH", "FAIL")
122
        abort(403, 'JWT token expired')
123
    return checkClaims['UserId']
124
125
126
def select_privilege_jwt(self):
127
    """Returns the privileges from the JWT authorization token"""
128
    token = request.headers.get('Authorization').split()[0]
129
    try:
130
        check_privilege = jwt.decode(token, settings.JWT_SECRET, algorithms='HS256')
131
    except jwt.exceptions.DecodeError:
132
        log("User JWT header could not be decoded", "HIGH", "FAIL")
133
        abort(403, 'JWT decode error')
134
    except jwt.exceptions.ExpiredSignature:
135
        log("User JWT header is expired", "HIGH", "FAIL")
136
        abort(403, 'JWT token expired')
137
    return check_privilege
138