kytos.utils.decorators.kytos_auth.authenticate()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 29
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 22
nop 1
dl 0
loc 29
rs 9.352
c 0
b 0
f 0
1
"""Decorators for Kytos-utils."""
2
import logging
3
import os
4
from getpass import getpass
5
6
import requests
7
8
from kytos.utils.config import KytosConfig
9
10
LOG = logging.getLogger(__name__)
11
12
13
# This class is used as decorator, so this class name is lowercase and the
14
# invalid-name warning from pylint is disabled below.
15
class kytos_auth:  # pylint: disable=invalid-name
16
    """Class to be used as decorator to require authentication."""
17
18
    def __init__(self, func):
19
        """Init method.
20
21
        Save the function on the func attribute and bootstrap a new config.
22
        """
23
        self.func = func
24
        self.config = KytosConfig().config
25
        self.cls = None
26
        self.obj = None
27
28
    def __call__(self, *args, **kwargs):
29
        """Code run when func is called."""
30
        if not (self.config.has_option('napps', 'api') and
31
                self.config.has_option('napps', 'repo')):
32
            uri = input("Enter the kytos napps server address: ")
33
            self.config.set('napps', 'api', os.path.join(uri, 'api', ''))
34
            self.config.set('napps', 'repo', os.path.join(uri, 'repo', ''))
35
36
        if not self.config.has_option('auth', 'user'):
37
            user = input("Enter the username: ")
38
            self.config.set('auth', 'user', user)
39
        else:
40
            user = self.config.get('auth', 'user')
41
42
        if not self.config.has_option('auth', 'token'):
43
            token = self.authenticate()
44
        else:
45
            token = self.config.get('auth', 'token')
46
47
        # Ignore private attribute warning. We don't wanna make it public only
48
        # because of a decorator.
49
        config = self.obj._config  # pylint: disable=protected-access
50
        config.set('auth', 'user', user)
51
        config.set('auth', 'token', token)
52
        self.func.__call__(self.obj, *args, **kwargs)
53
54
    def __get__(self, instance, owner):
55
        """Deal with owner class."""
56
        self.cls = owner
57
        self.obj = instance
58
59
        return self.__call__
60
61
    # pylint: disable=inconsistent-return-statements
62
    def authenticate(self):
63
        """Check the user authentication."""
64
        endpoint = os.path.join(self.config.get('napps', 'api'), 'auth', '')
65
        username = self.config.get('auth', 'user')
66
        password = getpass('Enter the password for {}: '.format(username))
67
        try:
68
            response = requests.get(endpoint, auth=(username, password),
69
                                    timeout=20)
70
        except (requests.exceptions.ConnectionError,
71
                requests.exceptions.Timeout):
72
            print(f"It couldn't connect to {endpoint}. Try again later.")
73
74
        # Check if it is unauthorized
75
        if response.status_code == 401:
76
            print(f'Error with status code: {response.status_code}.\n'
77
                  'Possible causes: incorrect credentials, the token was '
78
                  'not set or was expired.')
79
80
        if response.status_code != 201:
81
            LOG.error(response.content)
82
            LOG.error('ERROR: %s: %s', response.status_code, response.reason)
83
            print('Press Ctrl+C or CTRL+Z to stop the process.')
84
            user = input('Enter the username: ')
85
            self.config.set('auth', 'user', user)
86
            self.authenticate()
87
        else:
88
            data = response.json()
89
            KytosConfig().save_token(username, data.get('hash'))
90
            return data.get('hash')
91