pyUSIrest.auth   A
last analyzed

Complexity

Total Complexity 15

Size/Duplication

Total Lines 182
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 80
dl 0
loc 182
rs 10
c 0
b 0
f 0
wmc 15

7 Methods

Rating   Name   Duplication   Size   Complexity  
B Auth.__init__() 0 49 5
A Auth.__str__() 0 16 2
A Auth.get_duration() 0 33 3
A Auth.is_expired() 0 8 1
A Auth._decode() 0 12 1
A Auth.token() 0 5 1
A Auth.get_domains() 0 8 1
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
"""
4
Created on Thu May 24 15:46:37 2018
5
6
@author: Paolo Cozzi <[email protected]>
7
8
.. _`python_jwt.process_jwt`: https://rawgit.now.sh/davedoesdev/python-jwt/master/docs/_build/html/index.html#python_jwt.process_jwt
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (132/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
9
10
"""  # noqa
11
12
import requests
13
import datetime
14
import logging
15
16
import python_jwt
17
18
from . import settings
19
from .exceptions import USIConnectionError
20
21
logger = logging.getLogger(__name__)
22
23
24
class Auth():
25
    """
26
    Deal with EBI AAP tokens. Starts from a token object or by providing
27
    user credentials. It parse token and provide methods like checking
28
    expiration times.
29
30
    Attributes:
31
        auth_url (str): Default url for EBI AAP.
32
        expire (datetime.datetime): when token expires
33
        issued (datetime.datetime): when token was requested
34
        header (dict): token header read by `python_jwt.process_jwt`_
35
        claims (dict): token claims read by `python_jwt.process_jwt`_
36
    """
37
38
    auth_url = None
39
40
    def __init__(self, user=None, password=None, token=None):
41
        """
42
        Instantiate a new python EBI AAP Object. You can generate a new object
43
        providing both user and password, or by passing a valid token
44
        string
45
46
        Args:
47
            user (str): your aap username
48
            password (str): your password
49
            token (str): a valid EBI AAP jwt token
50
        """
51
52
        self.expire = None
53
        self.issued = None
54
        self.header = None
55
        self.claims = None
56
        self._token = None
57
58
        # load auth url form init
59
        self.auth_url = settings.AUTH_URL + "/auth"
60
61
        # get a response
62
        if password and user:
63
            logger.debug("Authenticating user {user}".format(user=user))
64
            self.response = requests.get(
65
                self.auth_url, auth=requests.auth.HTTPBasicAuth(
66
                    user, password))
67
68
            # set status code
69
            self.status_code = self.response.status_code
70
71
            if self.status_code != 200:
72
                logger.error("Got status %s" % (self.status_code))
73
                raise USIConnectionError(
74
                    "Got status %s: '%s'" % (
75
                        self.status_code, self.response.text))
76
77
            logger.debug("Got status %s" % (self.status_code))
78
79
            # Set token with token.setter
80
            self.token = self.response.text
81
82
        elif token:
83
            # Set token with token.setter
84
            self.token = token
85
86
        else:
87
            raise ValueError(
88
                "You need to provide user/password or a valid token")
89
90
    def _decode(self, token=None):
91
        """Decode JWT token using python_jwt"""
92
93
        # process token
94
        self.header, self.claims = python_jwt.process_jwt(token)
95
96
        # debug
97
        logger.debug("Decoded tocken with %s" % (self.header['alg']))
98
99
        # record useful values
100
        self.issued = datetime.datetime.fromtimestamp(self.claims['iat'])
101
        self.expire = datetime.datetime.fromtimestamp(self.claims['exp'])
102
103
    @property
104
    def token(self):
105
        """Get/Set token as a string"""
106
107
        return self._token
108
109
    @token.setter
110
    def token(self, token):
111
        self._decode(token)
112
        self._token = token
113
114
    def get_duration(self):
115
        """Get token remaining time before expiration
116
117
        Returns:
118
            datetime.timedelta: remaining time as
119
            :py:class:`timedelta <datetime.timedelta>` object
120
        """
121
        now = datetime.datetime.now()
122
        duration = (self.expire - now)
123
124
        # debug
125
        if 0 < duration.total_seconds() < 300:
126
            logger.warning(
127
                "Token for {user} will expire in {seconds} seconds".format(
128
                    user=self.claims['name'],
129
                    seconds=duration.total_seconds()
130
                )
131
            )
132
        elif duration.total_seconds() < 0:
133
            logger.error(
134
                "Token for {user} is expired".format(
135
                    user=self.claims['name']
136
                )
137
            )
138
        else:
139
            logger.debug(
140
                "Token for {user} will expire in {seconds} seconds".format(
141
                    user=self.claims['name'],
142
                    seconds=duration.total_seconds()
143
                )
144
            )
145
146
        return duration
147
148
    def is_expired(self):
149
        """Return True if token is exipired, False otherwise
150
151
        Returns:
152
            bool: True if token is exipired
153
        """
154
155
        return self.get_duration().days < 0
156
157
    def __str__(self):
158
        duration = self.get_duration()
159
        total_time = duration.total_seconds()
160
161
        formatted = "%.2d:%.2d:%.2d" % (
162
            duration.seconds // 3600,
163
            (duration.seconds // 60) % 60,
164
            duration.seconds % 60)
165
166
        if total_time < 0:
167
            return "Token for {user} is expired".format(
168
                user=self.claims['name'])
169
        else:
170
            return "Token for {user} will expire in {duration}".format(
171
                user=self.claims['name'],
172
                duration=formatted)
173
174
    def get_domains(self):
175
        """Returns a list of domain managed by this object
176
177
        Returns:
178
            list: a list of managed domains
179
        """
180
181
        return self.claims['domains']
182