Completed
Push — develop ( ca1745...b1b5c2 )
by Bastien
14s queued 11s
created

tracim_backend.lib.utils.authentification   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 135
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 29
eloc 83
dl 0
loc 135
rs 10
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
B basic_auth_check_credentials() 0 22 6
A _get_auth_unsafe_user() 0 17 3

8 Methods

Rating   Name   Duplication   Size   Complexity  
B ApiTokenAuthentificationPolicy.authenticated_userid() 0 13 7
A CookieSessionAuthentificationPolicy.forget() 0 5 2
A ApiTokenAuthentificationPolicy.forget() 0 2 1
A ApiTokenAuthentificationPolicy.__init__() 0 3 1
A ApiTokenAuthentificationPolicy.remember() 0 2 1
A ApiTokenAuthentificationPolicy.unauthenticated_userid() 0 2 1
B CookieSessionAuthentificationPolicy.authenticated_userid() 0 15 6
A CookieSessionAuthentificationPolicy.__init__() 0 3 1
1
import datetime
2
import typing
3
4
from pyramid.authentication import CallbackAuthenticationPolicy
5
from pyramid.authentication import SessionAuthenticationPolicy
6
from pyramid.interfaces import IAuthenticationPolicy
7
from pyramid.request import Request
8
from zope.interface import implementer
9
10
from tracim_backend.exceptions import UserDoesNotExist
11
from tracim_backend.lib.core.user import UserApi
12
from tracim_backend.lib.utils.request import TracimRequest
13
from tracim_backend.models import User
14
15
BASIC_AUTH_WEBUI_REALM = "tracim"
16
TRACIM_API_KEY_HEADER = "Tracim-Api-Key"
17
TRACIM_API_USER_EMAIL_LOGIN_HEADER = "Tracim-Api-Login"
18
19
###
20
# Pyramid HTTP Basic Auth
21
###
22
23
24
def basic_auth_check_credentials(
25
        login: str,
26
        cleartext_password: str,
27
        request: TracimRequest
28
) -> typing.Optional[list]:
29
    """
30
    Check credential for pyramid basic_auth
31
    :param login: login of user
32
    :param cleartext_password: user password in cleartext
33
    :param request: Pyramid request
34
    :return: None if auth failed, list of permissions if auth succeed
35
    """
36
37
    # Do not accept invalid user
38
    user = _get_auth_unsafe_user(request)
39
    if not user \
40
            or user.email != login \
41
            or not user.is_active \
42
            or user.is_deleted \
43
            or not user.validate_password(cleartext_password):
44
        return None
45
    return []
46
47
48
def _get_auth_unsafe_user(
49
    request: Request,
50
) -> typing.Optional[User]:
51
    """
52
    :param request: pyramid request
53
    :return: User or None
54
    """
55
    app_config = request.registry.settings['CFG']
56
    uapi = UserApi(None, session=request.dbsession, config=app_config)
57
    try:
58
        login = request.unauthenticated_userid
59
        if not login:
60
            return None
61
        user = uapi.get_one_by_email(login)
62
    except UserDoesNotExist:
63
        return None
64
    return user
65
66
###
67
# Pyramid cookie auth policy
68
###
69
70
71
@implementer(IAuthenticationPolicy)
72
class CookieSessionAuthentificationPolicy(SessionAuthenticationPolicy):
73
74
    def __init__(self, reissue_time: int, debug: bool = False):
75
        SessionAuthenticationPolicy.__init__(self, debug=debug, callback=None)
76
        self._reissue_time = reissue_time
77
78
    def authenticated_userid(self, request):
79
        # check if user is correct
80
        user = _get_auth_unsafe_user(request)
81
        # do not allow invalid_user + ask for cleanup of session cookie
82
        if not user or not user.is_active or user.is_deleted:
83
            request.session.delete()
84
            return None
85
        # recreate session if need renew
86
        if not request.session.new:
87
            now = datetime.datetime.now()
88
            last_access_datetime = datetime.datetime.utcfromtimestamp(request.session.last_accessed)
89
            reissue_limit = last_access_datetime + datetime.timedelta(seconds=self._reissue_time)  # nopep8
90
            if now > reissue_limit:  # nopep8
91
                request.session.regenerate_id()
92
        return request.unauthenticated_userid
93
94
    def forget(self, request):
95
        """ Remove the stored userid from the session."""
96
        if self.userid_key in request.session:
97
            request.session.delete()
98
        return []
99
100
101
###
102
# Pyramid API key auth
103
###
104
105
106
@implementer(IAuthenticationPolicy)
107
class ApiTokenAuthentificationPolicy(CallbackAuthenticationPolicy):
108
109
    def __init__(self, api_key_header: str, api_user_email_login_header: str):
110
        self.api_key_header = api_key_header
111
        self.api_user_email_login_header = api_user_email_login_header
112
113
    def authenticated_userid(self, request):
114
        app_config = request.registry.settings['CFG']  # type:'CFG'
115
        valid_api_key = app_config.API_KEY
116
        api_key = request.headers.get(self.api_key_header)
117
        if not api_key or not valid_api_key:
118
            return None
119
        if valid_api_key != api_key:
120
            return None
121
        # check if user is correct
122
        user = _get_auth_unsafe_user(request)
123
        if not user or not user.is_active or user.is_deleted:
124
            return None
125
        return request.unauthenticated_userid
126
127
    def unauthenticated_userid(self, request):
128
        return request.headers.get(self.api_user_email_login_header)
129
130
    def remember(self, request, userid, **kw):
131
        return []
132
133
    def forget(self, request):
134
        return []
135