Passed
Pull Request — master (#69)
by
unknown
02:32 queued 01:04
created

nextcloud.session.Session._set_credentials()   B

Complexity

Conditions 6

Size

Total Lines 14
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 14
rs 8.6666
c 0
b 0
f 0
cc 6
nop 4
1
# -*- coding: utf-8 -*-
2
from functools import wraps
3
import requests
4
from .compat import encode_requests_password
5
6
import logging
7
_logger = logging.getLogger(__name__)
8
9
10
class NextCloudConnectionError(Exception):
11
    """ A connection error occurred """
12
13
class NextCloudLoginError(Exception):
14
    """ A login error occurred """
15
16
def catch_connection_error(func):
17
18
    @wraps(func)
19
    def wrapper(*args, **kwargs):
20
        try:
21
            return func(*args, **kwargs)
22
        except requests.RequestException as e:
23
            raise NextCloudConnectionError(
24
                'Failed to establish connection to NextCloud', getattr(e.request, 'url', None), e)
25
26
    return wrapper
27
28
29
class Session(object):
30
    """ Session for requesting """
31
32
    def __init__(self, url=None, user=None, password=None, auth=None, session_kwargs=None):
33
        self.session = None
34
        self.auth = None
35
        self.user = None
36
        self._set_credentials(user, password, auth)
37
        self.url = url.rstrip('/')
38
        self._session_kwargs = session_kwargs or {}
39
40
    def _set_credentials(self, user, password, auth):
41
        if auth:
42
            self.auth = auth
43
        if user:
44
            self.user = user
45
        else:
46
            if isinstance(self.auth, tuple):
47
                (self.user, password) = self.auth
48
                self.auth = None
49
            else:
50
                if isinstance(self.auth, requests.auth.AuthBase):
51
                    self.user = self.auth.username
52
        if not self.auth:
53
            self.auth = (self.user, encode_requests_password(password))
54
55
    def request(self, method, url, **kwargs):
56
        if self.session:
57
            return self.session.request(method=method, url=url, **kwargs)
58
        else:
59
            _kwargs = self._session_kwargs
60
            _kwargs.update(kwargs)
61
            if not kwargs.get('auth', False):
62
                _kwargs['auth'] = self.auth
63
            return requests.request(method, url, **_kwargs)
64
65
    def login(self, user=None, password=None, auth=None, client=None):
66
        """Create a stable session on the server.
67
68
        :param user_id: user id
69
        :param password: password
70
        :param auth: object for any auth method
71
        :param client: object for any auth method
72
        :raises: HTTPResponseError in case an HTTP error status was returned
73
        """
74
        self.session = requests.Session()
75
        for k in self._session_kwargs:
76
            setattr(self.session, k, self._session_kwargs[k])
77
78
        self._set_credentials(user, password, auth)
79
        self.session.auth = self.auth
80
        if client:
81
            self._check_session(client.with_attr(json_output=True), retry=3)
82
83
84
    def _check_session(self, client=None, retry=None):
85
        def _clear():
86
            if self.session:
87
                self.logout()
88
89
        def _raise(e):
90
            if retry:
91
                _logger.warning('Retry session check (%s)', self.url)
92
                return self._check_session(client, retry=retry - 1)
93
            else:
94
                _clear()
95
                raise e
96
97
        try:
98
            resp = client.get_user()
99
            if not resp.is_ok:
100
                _raise(NextCloudLoginError(
101
                        'Failed to login to NextCloud', self.url, resp))
102
        except requests.exceptions.SSLError as e:
103
            _raise(e)
104
        except NextCloudConnectionError as e:
105
            _raise(e)
106
        except Exception as e:
107
            _clear()
108
            raise e
109
110
    def logout(self):
111
        """Log out the authenticated user and close the session.
112
113
        :returns: True if the operation succeeded, False otherwise
114
        :raises: HTTPResponseError in case an HTTP error status was returned
115
        """
116
        self.session.close()
117
        self.session = None
118
        return True
119