Passed
Pull Request — master (#69)
by
unknown
02:32 queued 01:04
created

nextcloud.response   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 94
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 16
eloc 59
dl 0
loc 94
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
A BaseResponse._compute_is_ok() 0 11 3
A BaseResponse.__repr__() 0 3 2
A WebDAVResponse.__init__() 0 4 1
A BaseResponse.__init__() 0 10 4
B OCSResponse.__init__() 0 29 6
1
# -*- coding: utf-8 -*-
2
"""
3
Define requests responses (automatically check if the request is OK)
4
"""
5
try:
6
    from json import JSONDecodeError
7
except ImportError:
8
    JSONDecodeError = ValueError
9
10
11
class BaseResponse(object):
12
    """
13
    Base Response that take HTTP reponse and take the following attrs
14
    - raw         : the raw response
15
    - status_code : the HTTP code
16
    - data        : the asked data (json or xml value)
17
    - is_ok       : True if the request is succesfully achieved
18
    """
19
20
    def __init__(self, response, data=None, json_output=True,
21
                 status_code=None, success_code=None, **kwargs):
22
        self.raw = response
23
        self.data = data if data is not None else (
24
            response.json() if json_output else response.content.decode('UTF-8')
25
        )
26
        self.status_code = status_code or response.status_code
27
        for k in kwargs:
28
            setattr(self, k, kwargs[k])
29
        self._compute_is_ok(success_code)
30
31
    def _compute_is_ok(self, success_code):
32
        if isinstance(success_code, dict):
33
            method = self.raw.request.method
34
            success_codes = success_code.get(method, [])
35
        else:
36
            success_codes = (
37
                success_code if isinstance(success_code, list) else
38
                [success_code]
39
            )
40
41
        self.is_ok = self.status_code in success_codes
42
43
    def __repr__(self):
44
        is_ok_str = "OK" if self.is_ok else "Failed"
45
        return "<{}: Status: {}>".format(self.__class__.__name__, is_ok_str)
46
47
48
class OCSResponse(BaseResponse):
49
    """
50
    Response class for OCS api methods
51
    Add some attributes:
52
    - meta      : ocs json metadata
53
    - full_data : json data of the ocs response
54
    """
55
56
    def __init__(self, response, json_output=True, success_code=None):
57
        data = None
58
        full_data = None
59
        meta = None
60
        status_code = None
61
        if (success_code or json_output):
62
            try:
63
                full_data = response.json()
64
                if 'ocs' in full_data:
65
                    ocs_data = full_data['ocs']
66
                    meta = ocs_data['meta']
67
                    status_code = meta['statuscode']
68
                    if json_output:
69
                        data = ocs_data['data']
70
                else:
71
                    data = full_data
72
                    meta = data
73
                    status_code = -1
74
            except JSONDecodeError:
75
                data = {'message': 'Unable to parse JSON response'}
76
                meta = data
77
                status_code = -1
78
79
        super(OCSResponse, self).__init__(response, data=data,
80
                                          json_output=json_output,
81
                                          full_data=full_data,
82
                                          status_code=status_code,
83
                                          meta=meta,
84
                                          success_code=success_code)
85
86
87
class WebDAVResponse(BaseResponse):
88
    """ Response class for WebDAV api methods """
89
90
    def __init__(self, response, data=None, success_code=None, json_output=False):
91
        super(WebDAVResponse, self).__init__(response, data=data,
92
                                             json_output=json_output,
93
                                             success_code=success_code)
94