Passed
Pull Request — master (#63)
by
unknown
03:49
created

WebDAV.list_favorites()   A

Complexity

Conditions 3

Size

Total Lines 26
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 26
rs 9.85
c 0
b 0
f 0
cc 3
nop 3
1
# -*- coding: utf-8 -*-
2
import re
3
import os
4
import pathlib
5
from urllib.parse import unquote
6
7
import xml.etree.ElementTree as ET
8
9
from datetime import datetime
10
from nextcloud.base import WithRequester
11
12
13
class WebDAV(WithRequester):
14
15
    API_URL = "/remote.php/dav/files"
16
17
    def __init__(self, *args, **kwargs):
18
        super(WebDAV, self).__init__(*args)
19
        self.json_output = kwargs.get('json_output')
20
21
    def list_folders(self, uid, path=None, depth=1, all_properties=False):
22
        """
23
        Get path files list with files properties for given user, with given depth
24
25
        Args:
26
            uid (str): uid of user
27
            path (str/None): files path
28
            depth (int): depth of listing files (directories content for example)
29
            all_properties (bool): list all available file properties in Nextcloud
30
31
        Returns:
32
            list of dicts if json_output
33
            list of File objects if not json_output
34
        """
35
        if all_properties:
36
            data = """<?xml version="1.0"?>
37
                <d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"
38
                            xmlns:nc="http://nextcloud.org/ns">
39
                  <d:prop>
40
                        <d:getlastmodified />
41
                        <d:getetag />
42
                        <d:getcontenttype />
43
                        <d:resourcetype />
44
                        <oc:fileid />
45
                        <oc:permissions />
46
                        <oc:size />
47
                        <d:getcontentlength />
48
                        <nc:has-preview />
49
                        <oc:favorite />
50
                        <oc:comments-unread />
51
                        <oc:owner-display-name />
52
                        <oc:share-types />
53
                  </d:prop>
54
                </d:propfind>
55
            """
56
        else:
57
            data = None
58
        additional_url = uid
59
        if path:
60
            additional_url = "{}/{}".format(additional_url, path)
61
        resp = self.requester.propfind(additional_url=additional_url,
62
                                       headers={"Depth": str(depth)},
63
                                       data=data)
64
        if not resp.is_ok:
65
            resp.data = None
66
            return resp
67
        response_data = resp.data
68
        response_xml_data = ET.fromstring(response_data)
69
        files_data = [File(single_file) for single_file in response_xml_data]
70
        resp.data = files_data if not self.json_output else [each.as_dict() for each in files_data]
71
        return resp
72
73
    def download_file(self, uid, path, download_path, overwrite=False):
74
        """
75
        Download file of given user by path
76
        File will be saved to working directory
77
        path argument must be valid file path
78
        Modified time of saved file will be synced with the file properties in Nextcloud
79
80
        Exception will be raised if:
81
            * path doesn't exist,
82
            * path is a directory, or if
83
            * file with same name already exists in working directory
84
85
        Args:
86
            uid (str): uid of user
87
            path (str): file path
88
            download_path (str): directory where the files will be downloaded
89
            overwrite (bool): overwrite files if the name is the same
90
91
        Returns:
92
            None
93
        """
94
        additional_url = "/".join([uid, path])
95
        filename = path.split('/')[-1] if '/' in path else path
96
        filename = unquote(filename)
97
        file_data = self.list_folders(uid=uid, path=path, depth=0)
98
99
        if not file_data:
100
            raise ValueError("Given path doesn't exist")
101
102
        file_resource_type = (file_data.data[0].get('resource_type')
103
                              if self.json_output
104
                              else file_data.data[0].resource_type)
105
106
        if file_resource_type == File.COLLECTION_RESOURCE_TYPE:
107
            raise ValueError("This is a collection, please specify file path")
108
109
        if filename in os.listdir(download_path) and not overwrite:
110
            raise ValueError("File with such name already exists in this directory")
111
112
        res = self.requester.download(additional_url)
113
114
        file_path = os.path.join(download_path, filename)
115
        with open(file_path, 'wb') as f:
116
            if type(res.data) == str:
117
                f.write(res.data.encode())
118
            else:
119
                f.write(res.data)
120
121
        # get timestamp of downloaded file from file property on Nextcloud
122
        # If it succeeded, set the timestamp to saved local file
123
        # If the timestamp string is invalid or broken, the timestamp is downloaded time.
124
        file_timestamp_str = (file_data.data[0].get('last_modified')
125
                              if self.json_output
126
                              else file_data.data[0].last_modified)
127
        file_timestamp = timestamp_to_epoch_time(file_timestamp_str)
128
        if isinstance(file_timestamp, int):
129
            os.utime(file_path, (datetime.now().timestamp(), file_timestamp))
130
131
    def upload_file(self, uid, local_filepath, remote_filepath, timestamp=None):
132
        """
133
        Upload file to Nextcloud storage
134
135
        Args:
136
            uid (str): uid of user
137
            local_filepath (str): path to file on local storage
138
            remote_filepath (str): path where to upload file on Nextcloud storage
139
            timestamp (int): timestamp of upload file. If None, get time by local file.
140
        """
141
        with open(local_filepath, 'rb') as f:
142
            file_contents = f.read()
143
        if timestamp is None:
144
            timestamp = int(os.path.getmtime(local_filepath))
145
        return self.upload_file_contents(uid, file_contents, remote_filepath, timestamp)
146
147
    def upload_file_contents(self, uid, file_contents, remote_filepath, timestamp=None):
148
        """
149
        Upload file to Nextcloud storage
150
151
        Args:
152
            uid (str): uid of user
153
            file_contents (bytes): Bytes the file to be uploaded consists of
154
            remote_filepath (str): path where to upload file on Nextcloud storage
155
            timestamp (int):  mtime of upload file
156
        """
157
        additional_url = "/".join([uid, remote_filepath])
158
        return self.requester.put_with_timestamp(additional_url, data=file_contents, timestamp=timestamp)
159
160
    def create_folder(self, uid, folder_path):
161
        """
162
        Create folder on Nextcloud storage
163
164
        Args:
165
            uid (str): uid of user
166
            folder_path (str): folder path
167
        """
168
        return self.requester.make_collection(additional_url="/".join([uid, folder_path]))
169
170
    def assure_folder_exists(self, uid, folder_path):
171
        """
172
        Create folder on Nextcloud storage, don't do anything if the folder already exists.
173
        Args:
174
            uid (str): uid of user
175
            folder_path (str): folder path
176
        Returns:
177
        """
178
        self.create_folder(uid, folder_path)
179
        return True
180
181
    def assure_tree_exists(self, uid, tree_path):
182
        """
183
        Make sure that the folder structure on Nextcloud storage exists
184
        Args:
185
            uid (str): uid of user
186
            folder_path (str): The folder tree
187
        Returns:
188
        """
189
        tree = pathlib.PurePath(tree_path)
190
        parents = list(tree.parents)
191
        ret = True
192
        subfolders = parents[:-1][::-1] + [tree]
193
        for subf in subfolders:
194
            ret = self.assure_folder_exists(uid, str(subf))
195
        return ret
196
197
    def delete_path(self, uid, path):
198
        """
199
        Delete file or folder with all content of given user by path
200
201
        Args:
202
            uid (str): uid of user
203
            path (str): file or folder path to delete
204
        """
205
        url = "/".join([uid, path])
206
        return self.requester.delete(url=url)
207
208
    def move_path(self, uid, path, destination_path, overwrite=False):
209
        """
210
        Move file or folder to destination
211
212
        Args:
213
            uid (str): uid of user
214
            path (str): file or folder path to move
215
            destionation_path (str): destination where to move
216
            overwrite (bool): allow destination path overriding
217
        """
218
        path_url = "/".join([uid, path])
219
        destination_path_url = "/".join([uid, destination_path])
220
        return self.requester.move(url=path_url,
221
                                   destination=destination_path_url, overwrite=overwrite)
222
223
    def copy_path(self, uid, path, destination_path, overwrite=False):
224
        """
225
        Copy file or folder to destination
226
227
        Args:
228
            uid (str): uid of user
229
            path (str): file or folder path to copy
230
            destionation_path (str): destination where to copy
231
            overwrite (bool): allow destination path overriding
232
        """
233
        path_url = "/".join([uid, path])
234
        destination_path_url = "/".join([uid, destination_path])
235
        return self.requester.copy(url=path_url,
236
                                   destination=destination_path_url, overwrite=overwrite)
237
238
    def set_favorites(self, uid, path):
239
        """
240
        Set files of a user favorite
241
242
        Args:
243
            uid (str): uid of user
244
            path (str): file or folder path to make favorite
245
        """
246
        data = """<?xml version="1.0"?>
247
        <d:propertyupdate xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
248
          <d:set>
249
                <d:prop>
250
                  <oc:favorite>1</oc:favorite>
251
                </d:prop>
252
          </d:set>
253
        </d:propertyupdate>
254
        """
255
        url = "/".join([uid, path])
256
        return self.requester.proppatch(additional_url=url, data=data)
257
258
    def list_favorites(self, uid, path=""):
259
        """
260
        Set files of a user favorite
261
262
        Args:
263
            uid (str): uid of user
264
            path (str): file or folder path to make favorite
265
        """
266
        data = """<?xml version="1.0"?>
267
        <oc:filter-files xmlns:d="DAV:"
268
                         xmlns:oc="http://owncloud.org/ns"
269
                         xmlns:nc="http://nextcloud.org/ns">
270
                 <oc:filter-rules>
271
                         <oc:favorite>1</oc:favorite>
272
                 </oc:filter-rules>
273
         </oc:filter-files>
274
        """
275
        url = "/".join([uid, path])
276
        res = self.requester.report(additional_url=url, data=data)
277
        if not res.is_ok:
278
            res.data = None
279
            return res
280
        response_xml_data = ET.fromstring(res.data)
281
        files_data = [File(single_file) for single_file in response_xml_data]
282
        res.data = files_data if not self.json_output else [each.as_dict() for each in files_data]
283
        return res
284
285
286
class File(object):
287
    SUCCESS_STATUS = 'HTTP/1.1 200 OK'
288
289
    # key is NextCloud property, value is python variable name
290
    FILE_PROPERTIES = {
291
        # d:
292
        "getlastmodified": "last_modified",
293
        "getetag": "etag",
294
        "getcontenttype": "content_type",
295
        "resourcetype": "resource_type",
296
        "getcontentlength": "content_length",
297
        # oc:
298
        "id": "id",
299
        "fileid": "file_id",
300
        "favorite": "favorite",
301
        "comments-href": "comments_href",
302
        "comments-count": "comments_count",
303
        "comments-unread": "comments_unread",
304
        "owner-id": "owner_id",
305
        "owner-display-name": "owner_display_name",
306
        "share-types": "share_types",
307
        "checksums": "check_sums",
308
        "size": "size",
309
        "href": "href",
310
        # nc:
311
        "has-preview": "has_preview",
312
    }
313
    xml_namespaces_map = {
314
        "d": "DAV:",
315
        "oc": "http://owncloud.org/ns",
316
        "nc": "http://nextcloud.org/ns"
317
    }
318
    COLLECTION_RESOURCE_TYPE = 'collection'
319
320
    def __init__(self, xml_data):
321
        self.href = xml_data.find('d:href', self.xml_namespaces_map).text
322
        for propstat in xml_data.iter('{DAV:}propstat'):
323
            if propstat.find('d:status', self.xml_namespaces_map).text != self.SUCCESS_STATUS:
324
                continue
325
            for file_property in propstat.find('d:prop', self.xml_namespaces_map):
326
                file_property_name = re.sub("{.*}", "", file_property.tag)
327
                if file_property_name not in self.FILE_PROPERTIES:
328
                    continue
329
                if file_property_name == 'resourcetype':
330
                    value = self._extract_resource_type(file_property)
331
                else:
332
                    value = file_property.text
333
                setattr(self, self.FILE_PROPERTIES[file_property_name], value)
334
335
    def _extract_resource_type(self, file_property):
336
        file_type = list(file_property)
337
        if file_type:
338
            return re.sub("{.*}", "", file_type[0].tag)
339
        return None
340
341
    def as_dict(self):
342
        return {key: value
343
                for key, value in self.__dict__.items()
344
                if key in self.FILE_PROPERTIES.values()}
345
346
347
class WebDAVStatusCodes(object):
348
    CREATED_CODE = 201
349
    NO_CONTENT_CODE = 204
350
    MULTISTATUS_CODE = 207
351
    ALREADY_EXISTS_CODE = 405
352
    PRECONDITION_FAILED_CODE = 412
353
354
355
def timestamp_to_epoch_time(rfc1123_date=""):
356
    """
357
    literal date time string (use in DAV:getlastmodified) to Epoch time
358
359
    No longer, Only rfc1123-date productions are legal as values for DAV:getlastmodified
360
    However, the value may be broken or invalid.
361
362
    Args:
363
        rfc1123_date (str): rfc1123-date (defined in RFC2616)
364
    Return:
365
        int or None : Epoch time, if date string value is invalid return None
366
    """
367
    try:
368
        epoch_time = datetime.strptime(rfc1123_date, '%a, %d %b %Y %H:%M:%S GMT').timestamp()
369
    except ValueError:
370
        # validation error (DAV:getlastmodified property is broken or invalid)
371
        return None
372
    return int(epoch_time)
373