|
1
|
|
|
import requests |
|
2
|
|
|
|
|
3
|
|
|
from .response import WebDAVResponse |
|
4
|
|
|
|
|
5
|
|
|
|
|
6
|
|
|
class Requester(object): |
|
7
|
|
|
def __init__(self, endpoint, user, passwd, json_output=False): |
|
8
|
|
|
self.query_components = [] |
|
9
|
|
|
|
|
10
|
|
|
self.json_output = json_output |
|
11
|
|
|
|
|
12
|
|
|
self.base_url = endpoint |
|
13
|
|
|
# GroupFolders.url = endpoint + "/ocs/v2.php/apps/groupfolders/folders" |
|
14
|
|
|
|
|
15
|
|
|
self.h_get = {"OCS-APIRequest": "true"} |
|
16
|
|
|
self.h_post = {"OCS-APIRequest": "true", |
|
17
|
|
|
"Content-Type": "application/x-www-form-urlencoded"} |
|
18
|
|
|
self.auth_pk = (user, passwd) |
|
19
|
|
|
self.API_URL = None |
|
20
|
|
|
|
|
21
|
|
|
def rtn(self, resp): |
|
22
|
|
|
if self.json_output: |
|
23
|
|
|
return resp.json() |
|
24
|
|
|
else: |
|
25
|
|
|
return resp.content.decode("UTF-8") |
|
26
|
|
|
|
|
27
|
|
|
def get(self, url="", params=None): |
|
28
|
|
|
url = self.get_full_url(url) |
|
29
|
|
|
res = requests.get(url, auth=self.auth_pk, headers=self.h_get, params=params) |
|
30
|
|
|
return self.rtn(res) |
|
31
|
|
|
|
|
32
|
|
|
def post(self, url="", data=None): |
|
33
|
|
|
url = self.get_full_url(url) |
|
34
|
|
|
res = requests.post(url, auth=self.auth_pk, data=data, headers=self.h_post) |
|
35
|
|
|
return self.rtn(res) |
|
36
|
|
|
|
|
37
|
|
|
def put(self, url="", data=None): |
|
38
|
|
|
url = self.get_full_url(url) |
|
39
|
|
|
res = requests.put(url, auth=self.auth_pk, data=data, headers=self.h_post) |
|
40
|
|
|
return self.rtn(res) |
|
41
|
|
|
|
|
42
|
|
|
def delete(self, url="", data=None): |
|
43
|
|
|
url = self.get_full_url(url) |
|
44
|
|
|
res = requests.delete(url, auth=self.auth_pk, data=data, headers=self.h_post) |
|
45
|
|
|
return self.rtn(res) |
|
46
|
|
|
|
|
47
|
|
|
def get_full_url(self, additional_url=""): |
|
48
|
|
|
""" |
|
49
|
|
|
Build full url for request to NextCloud api |
|
50
|
|
|
|
|
51
|
|
|
Construct url from self.base_url, self.API_URL, additional_url (if given), add format=json param if self.json |
|
52
|
|
|
|
|
53
|
|
|
:param additional_url: str |
|
54
|
|
|
add to url after api_url |
|
55
|
|
|
:return: str |
|
56
|
|
|
""" |
|
57
|
|
|
if additional_url and not str(additional_url).startswith("/"): |
|
58
|
|
|
additional_url = "/{}".format(additional_url) |
|
59
|
|
|
|
|
60
|
|
|
if self.json_output: |
|
61
|
|
|
self.query_components.append("format=json") |
|
62
|
|
|
|
|
63
|
|
|
ret = "{base_url}{api_url}{additional_url}".format( |
|
64
|
|
|
base_url=self.base_url, api_url=self.API_URL, additional_url=additional_url) |
|
65
|
|
|
|
|
66
|
|
|
if self.json_output: |
|
67
|
|
|
ret += "?format=json" |
|
68
|
|
|
return ret |
|
69
|
|
|
|
|
70
|
|
|
|
|
71
|
|
|
class WebDAVRequester(Requester): |
|
72
|
|
|
|
|
73
|
|
|
def __init__(self, *args, **kwargs): |
|
74
|
|
|
super(WebDAVRequester, self).__init__(*args, **kwargs) |
|
75
|
|
|
|
|
76
|
|
|
def rtn(self, resp, data=None): |
|
77
|
|
|
return WebDAVResponse(response=resp, data=data) |
|
78
|
|
|
|
|
79
|
|
|
def propfind(self, additional_url="", headers=None, data=None): |
|
80
|
|
|
url = self.get_full_url(additional_url=additional_url) |
|
81
|
|
|
res = requests.request('PROPFIND', url, auth=self.auth_pk, headers=headers, data=data) |
|
82
|
|
|
return self.rtn(res) |
|
83
|
|
|
|
|
84
|
|
|
def proppatch(self, additional_url="", data=None): |
|
85
|
|
|
url = self.get_full_url(additional_url=additional_url) |
|
86
|
|
|
res = requests.request('PROPPATCH', url, auth=self.auth_pk, data=data) |
|
87
|
|
|
return self.rtn(resp=res) |
|
88
|
|
|
|
|
89
|
|
|
def report(self, additional_url="", data=None): |
|
90
|
|
|
url = self.get_full_url(additional_url=additional_url) |
|
91
|
|
|
res = requests.request('REPORT', url, auth=self.auth_pk, data=data) |
|
92
|
|
|
return self.rtn(resp=res) |
|
93
|
|
|
|
|
94
|
|
|
def download(self, url="", params=None): |
|
95
|
|
|
url = self.get_full_url(url) |
|
96
|
|
|
res = requests.get(url, auth=self.auth_pk, headers=self.h_get, params=params) |
|
97
|
|
|
return self.rtn(resp=res, data=res.content) |
|
98
|
|
|
|
|
99
|
|
|
def make_collection(self, additional_url=""): |
|
100
|
|
|
url = self.get_full_url(additional_url=additional_url) |
|
101
|
|
|
res = requests.request("MKCOL", url=url, auth=self.auth_pk) |
|
102
|
|
|
return self.rtn(resp=res) |
|
103
|
|
|
|
|
104
|
|
|
def move(self, url, destination, overwrite=False): |
|
105
|
|
|
url = self.get_full_url(additional_url=url) |
|
106
|
|
|
destionation_url = self.get_full_url(additional_url=destination) |
|
107
|
|
|
headers = { |
|
108
|
|
|
"Destination": destionation_url, |
|
109
|
|
|
"Overwrite": "T" if overwrite else "F" |
|
110
|
|
|
} |
|
111
|
|
|
res = requests.request("MOVE", url=url, auth=self.auth_pk, headers=headers) |
|
112
|
|
|
return self.rtn(resp=res) |
|
113
|
|
|
|
|
114
|
|
|
def copy(self, url, destination, overwrite=False): |
|
115
|
|
|
url = self.get_full_url(additional_url=url) |
|
116
|
|
|
destionation_url = self.get_full_url(additional_url=destination) |
|
117
|
|
|
headers = { |
|
118
|
|
|
"Destination": destionation_url, |
|
119
|
|
|
"Overwrite": "T" if overwrite else "F" |
|
120
|
|
|
} |
|
121
|
|
|
res = requests.request("COPY", url=url, auth=self.auth_pk, headers=headers) |
|
122
|
|
|
return self.rtn(resp=res) |
|
123
|
|
|
|