Passed
Pull Request — develop (#67)
by inkhey
02:33
created

TracimBasicAuthAuthenticationPolicy.__init__()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 7
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
import datetime
2
import typing
3
4
from pyramid.authentication import BasicAuthAuthenticationPolicy
5
from pyramid.authentication import CallbackAuthenticationPolicy
6
from pyramid.authentication import SessionAuthenticationPolicy
7
from pyramid.authentication import extract_http_basic_credentials
8
from pyramid.interfaces import IAuthenticationPolicy
9
from pyramid.request import Request
10
from zope.interface import implementer
11
12
from tracim_backend.exceptions import UserDoesNotExist
13
from tracim_backend.lib.core.user import UserApi
14
from tracim_backend.models import User
15
16
BASIC_AUTH_WEBUI_REALM = "tracim"
17
TRACIM_API_KEY_HEADER = "Tracim-Api-Key"
18
TRACIM_API_USER_EMAIL_LOGIN_HEADER = "Tracim-Api-Login"
19
20
21
def _get_auth_unsafe_user(
22
    request: Request,
23
    email: str=None,
24
    user_id: int=None,
25
) -> typing.Optional[User]:
26
    """
27
    :param request: pyramid request
28
    :return: User or None
29
    """
30
    app_config = request.registry.settings['CFG']
31
    uapi = UserApi(None, session=request.dbsession, config=app_config)
32
    try:
33
        _, user = uapi.find(user_id=user_id, email=email)
34
        return user
35
    except UserDoesNotExist:
36
        return None
37
38
###
39
# Pyramid HTTP Basic Auth
40
###
41
42
43
@implementer(IAuthenticationPolicy)
44
class TracimBasicAuthAuthenticationPolicy(BasicAuthAuthenticationPolicy):
45
46
    def __init__(self, realm):
47
        BasicAuthAuthenticationPolicy.__init__(self, check=None, realm=realm)
48
        # TODO - G.M - 2018-09-21 - Disable callback is needed to have BasicAuth
49
        # correctly working, if enabled, callback method will try check method
50
        # who is now disabled (uneeded because we use directly
51
        # authenticated_user_id) and failed.
52
        self.callback = None
53
54
    def authenticated_userid(self, request):
55
        # check if user is correct
56
        credentials = extract_http_basic_credentials(request)
57
        user = _get_auth_unsafe_user(
58
            request,
59
            email=request.unauthenticated_userid
60
        )
61
        if not user \
62
                or user.email != request.unauthenticated_userid \
63
                or not user.is_active \
64
                or user.is_deleted \
65
                or not credentials \
66
                or not user.validate_password(credentials.password):
67
            return None
68
        return user.user_id
69
70
71
###
72
# Pyramid cookie auth policy
73
###
74
75
76
@implementer(IAuthenticationPolicy)
77
class CookieSessionAuthentificationPolicy(SessionAuthenticationPolicy):
78
79
    def __init__(self, reissue_time: int, debug: bool = False):
80
        SessionAuthenticationPolicy.__init__(self, debug=debug, callback=None)
81
        self._reissue_time = reissue_time
82
        self.callback = None
83
84
    def authenticated_userid(self, request):
85
        # check if user is correct
86
        user = _get_auth_unsafe_user(request, user_id=request.unauthenticated_userid)  # nopep8
87
        # do not allow invalid_user + ask for cleanup of session cookie
88
        if not user or not user.is_active or user.is_deleted:
89
            request.session.delete()
90
            return None
91
        # recreate session if need renew
92
        if not request.session.new:
93
            now = datetime.datetime.now()
94
            last_access_datetime = datetime.datetime.utcfromtimestamp(request.session.last_accessed)  # nopep8
95
            reissue_limit = last_access_datetime + datetime.timedelta(seconds=self._reissue_time)  # nopep8
96
            if now > reissue_limit:  # nopep8
97
                request.session.regenerate_id()
98
        return user.user_id
99
100
    def forget(self, request):
101
        """ Remove the stored userid from the session."""
102
        if self.userid_key in request.session:
103
            request.session.delete()
104
        return []
105
106
107
###
108
# Pyramid API key auth
109
###
110
111
112
@implementer(IAuthenticationPolicy)
113
class ApiTokenAuthentificationPolicy(CallbackAuthenticationPolicy):
114
115
    def __init__(self, api_key_header: str, api_user_email_login_header: str):
116
        self.api_key_header = api_key_header
117
        self.api_user_email_login_header = api_user_email_login_header
118
        self.callback = None
119
120
    def authenticated_userid(self, request):
121
        app_config = request.registry.settings['CFG']  # type:'CFG'
122
        valid_api_key = app_config.API_KEY
123
        api_key = request.headers.get(self.api_key_header)
124
        if not api_key or not valid_api_key:
125
            return None
126
        if valid_api_key != api_key:
127
            return None
128
        # check if user is correct
129
        user = _get_auth_unsafe_user(request, email=request.unauthenticated_userid)
130
        if not user or not user.is_active or user.is_deleted:
131
            return None
132
        return user.user_id
133
134
    def unauthenticated_userid(self, request):
135
        return request.headers.get(self.api_user_email_login_header)
136
137
    def remember(self, request, userid, **kw):
138
        return []
139
140
    def forget(self, request):
141
        return []
142