1 | #!/usr/bin/env python3 |
||
2 | # -*- coding: utf-8 -*- |
||
3 | """ |
||
4 | Created on Thu May 24 15:46:37 2018 |
||
5 | |||
6 | @author: Paolo Cozzi <[email protected]> |
||
7 | |||
8 | .. _`python_jwt.process_jwt`: https://rawgit.now.sh/davedoesdev/python-jwt/master/docs/_build/html/index.html#python_jwt.process_jwt |
||
0 ignored issues
–
show
|
|||
9 | |||
10 | """ # noqa |
||
11 | |||
12 | import requests |
||
13 | import datetime |
||
14 | import logging |
||
15 | |||
16 | import python_jwt |
||
17 | |||
18 | from . import settings |
||
19 | from .exceptions import USIConnectionError |
||
20 | |||
21 | logger = logging.getLogger(__name__) |
||
22 | |||
23 | |||
24 | class Auth(): |
||
25 | """ |
||
26 | Deal with EBI AAP tokens. Starts from a token object or by providing |
||
27 | user credentials. It parse token and provide methods like checking |
||
28 | expiration times. |
||
29 | |||
30 | Attributes: |
||
31 | auth_url (str): Default url for EBI AAP. |
||
32 | expire (datetime.datetime): when token expires |
||
33 | issued (datetime.datetime): when token was requested |
||
34 | header (dict): token header read by `python_jwt.process_jwt`_ |
||
35 | claims (dict): token claims read by `python_jwt.process_jwt`_ |
||
36 | """ |
||
37 | |||
38 | auth_url = None |
||
39 | |||
40 | def __init__(self, user=None, password=None, token=None): |
||
41 | """ |
||
42 | Instantiate a new python EBI AAP Object. You can generate a new object |
||
43 | providing both user and password, or by passing a valid token |
||
44 | string |
||
45 | |||
46 | Args: |
||
47 | user (str): your aap username |
||
48 | password (str): your password |
||
49 | token (str): a valid EBI AAP jwt token |
||
50 | """ |
||
51 | |||
52 | self.expire = None |
||
53 | self.issued = None |
||
54 | self.header = None |
||
55 | self.claims = None |
||
56 | self._token = None |
||
57 | |||
58 | # load auth url form init |
||
59 | self.auth_url = settings.AUTH_URL + "/auth" |
||
60 | |||
61 | # get a response |
||
62 | if password and user: |
||
63 | logger.debug("Authenticating user {user}".format(user=user)) |
||
64 | self.response = requests.get( |
||
65 | self.auth_url, auth=requests.auth.HTTPBasicAuth( |
||
66 | user, password)) |
||
67 | |||
68 | # set status code |
||
69 | self.status_code = self.response.status_code |
||
70 | |||
71 | if self.status_code != 200: |
||
72 | logger.error("Got status %s" % (self.status_code)) |
||
73 | raise USIConnectionError( |
||
74 | "Got status %s: '%s'" % ( |
||
75 | self.status_code, self.response.text)) |
||
76 | |||
77 | logger.debug("Got status %s" % (self.status_code)) |
||
78 | |||
79 | # Set token with token.setter |
||
80 | self.token = self.response.text |
||
81 | |||
82 | elif token: |
||
83 | # Set token with token.setter |
||
84 | self.token = token |
||
85 | |||
86 | else: |
||
87 | raise ValueError( |
||
88 | "You need to provide user/password or a valid token") |
||
89 | |||
90 | def _decode(self, token=None): |
||
91 | """Decode JWT token using python_jwt""" |
||
92 | |||
93 | # process token |
||
94 | self.header, self.claims = python_jwt.process_jwt(token) |
||
95 | |||
96 | # debug |
||
97 | logger.debug("Decoded tocken with %s" % (self.header['alg'])) |
||
98 | |||
99 | # record useful values |
||
100 | self.issued = datetime.datetime.fromtimestamp(self.claims['iat']) |
||
101 | self.expire = datetime.datetime.fromtimestamp(self.claims['exp']) |
||
102 | |||
103 | @property |
||
104 | def token(self): |
||
105 | """Get/Set token as a string""" |
||
106 | |||
107 | return self._token |
||
108 | |||
109 | @token.setter |
||
110 | def token(self, token): |
||
111 | self._decode(token) |
||
112 | self._token = token |
||
113 | |||
114 | def get_duration(self): |
||
115 | """Get token remaining time before expiration |
||
116 | |||
117 | Returns: |
||
118 | datetime.timedelta: remaining time as |
||
119 | :py:class:`timedelta <datetime.timedelta>` object |
||
120 | """ |
||
121 | now = datetime.datetime.now() |
||
122 | duration = (self.expire - now) |
||
123 | |||
124 | # debug |
||
125 | if 0 < duration.total_seconds() < 300: |
||
126 | logger.warning( |
||
127 | "Token for {user} will expire in {seconds} seconds".format( |
||
128 | user=self.claims['name'], |
||
129 | seconds=duration.total_seconds() |
||
130 | ) |
||
131 | ) |
||
132 | elif duration.total_seconds() < 0: |
||
133 | logger.error( |
||
134 | "Token for {user} is expired".format( |
||
135 | user=self.claims['name'] |
||
136 | ) |
||
137 | ) |
||
138 | else: |
||
139 | logger.debug( |
||
140 | "Token for {user} will expire in {seconds} seconds".format( |
||
141 | user=self.claims['name'], |
||
142 | seconds=duration.total_seconds() |
||
143 | ) |
||
144 | ) |
||
145 | |||
146 | return duration |
||
147 | |||
148 | def is_expired(self): |
||
149 | """Return True if token is exipired, False otherwise |
||
150 | |||
151 | Returns: |
||
152 | bool: True if token is exipired |
||
153 | """ |
||
154 | |||
155 | return self.get_duration().days < 0 |
||
156 | |||
157 | def __str__(self): |
||
158 | duration = self.get_duration() |
||
159 | total_time = duration.total_seconds() |
||
160 | |||
161 | formatted = "%.2d:%.2d:%.2d" % ( |
||
162 | duration.seconds // 3600, |
||
163 | (duration.seconds // 60) % 60, |
||
164 | duration.seconds % 60) |
||
165 | |||
166 | if total_time < 0: |
||
167 | return "Token for {user} is expired".format( |
||
168 | user=self.claims['name']) |
||
169 | else: |
||
170 | return "Token for {user} will expire in {duration}".format( |
||
171 | user=self.claims['name'], |
||
172 | duration=formatted) |
||
173 | |||
174 | def get_domains(self): |
||
175 | """Returns a list of domain managed by this object |
||
176 | |||
177 | Returns: |
||
178 | list: a list of managed domains |
||
179 | """ |
||
180 | |||
181 | return self.claims['domains'] |
||
182 |
This check looks for lines that are too long. You can specify the maximum line length.