WebDAV.assure_folder_exists()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 10
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 10
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
# -*- coding: utf-8 -*-
2
import re
3
import os
4
import pathlib
5
6
import xml.etree.ElementTree as ET
7
8
from datetime import datetime
9
from nextcloud.base import WithRequester
10
11
12
class WebDAV(WithRequester):
13
14
    API_URL = "/remote.php/dav/files"
15
16
    def __init__(self, *args, **kwargs):
17
        super(WebDAV, self).__init__(*args)
18
        self.json_output = kwargs.get('json_output')
19
20
    def list_folders(self, uid, path=None, depth=1, all_properties=False):
21
        """
22
        Get path files list with files properties for given user, with given depth
23
24
        Args:
25
            uid (str): uid of user
26
            path (str/None): files path
27
            depth (int): depth of listing files (directories content for example)
28
            all_properties (bool): list all available file properties in Nextcloud
29
30
        Returns:
31
            list of dicts if json_output
32
            list of File objects if not json_output
33
        """
34
        if all_properties:
35
            data = """<?xml version="1.0"?>
36
                <d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"
37
                            xmlns:nc="http://nextcloud.org/ns">
38
                  <d:prop>
39
                        <d:getlastmodified />
40
                        <d:getetag />
41
                        <d:getcontenttype />
42
                        <d:resourcetype />
43
                        <oc:fileid />
44
                        <oc:permissions />
45
                        <oc:size />
46
                        <d:getcontentlength />
47
                        <nc:has-preview />
48
                        <oc:favorite />
49
                        <oc:comments-unread />
50
                        <oc:owner-display-name />
51
                        <oc:share-types />
52
                  </d:prop>
53
                </d:propfind>
54
            """
55
        else:
56
            data = None
57
        additional_url = uid
58
        if path:
59
            additional_url = "{}/{}".format(additional_url, path)
60
        resp = self.requester.propfind(additional_url=additional_url,
61
                                       headers={"Depth": str(depth)},
62
                                       data=data)
63
        if not resp.is_ok:
64
            resp.data = None
65
            return resp
66
        response_data = resp.data
67
        response_xml_data = ET.fromstring(response_data)
68
        files_data = [File(single_file) for single_file in response_xml_data]
69
        resp.data = files_data if not self.json_output else [each.as_dict() for each in files_data]
70
        return resp
71
72
    def download_file(self, uid, path):
73
        """
74
        Download file of given user by path
75
        File will be saved to working directory
76
        path argument must be valid file path
77
        Modified time of saved file will be synced with the file properties in Nextcloud
78
79
        Exception will be raised if:
80
            * path doesn't exist,
81
            * path is a directory, or if
82
            * file with same name already exists in working directory
83
84
        Args:
85
            uid (str): uid of user
86
            path (str): file path
87
88
        Returns:
89
            None
90
        """
91
        additional_url = "/".join([uid, path])
92
        filename = path.split('/')[-1] if '/' in path else path
93
        file_data = self.list_folders(uid=uid, path=path, depth=0)
94
        if not file_data:
95
            raise ValueError("Given path doesn't exist")
96
        file_resource_type = (file_data.data[0].get('resource_type')
97
                              if self.json_output
98
                              else file_data.data[0].resource_type)
99
        if file_resource_type == File.COLLECTION_RESOURCE_TYPE:
100
            raise ValueError("This is a collection, please specify file path")
101
        if filename in os.listdir('./'):
102
            raise ValueError("File with such name already exists in this directory")
103
        res = self.requester.download(additional_url)
104
        with open(filename, 'wb') as f:
105
            f.write(res.data)
106
107
        # get timestamp of downloaded file from file property on Nextcloud
108
        # If it succeeded, set the timestamp to saved local file
109
        # If the timestamp string is invalid or broken, the timestamp is downloaded time.
110
        file_timestamp_str = (file_data.data[0].get('last_modified')
111
                              if self.json_output
112
                              else file_data.data[0].last_modified)
113
        file_timestamp = timestamp_to_epoch_time(file_timestamp_str)
114
        if isinstance(file_timestamp, int):
115
            os.utime(filename, (datetime.now().timestamp(), file_timestamp))
116
117
    def upload_file(self, uid, local_filepath, remote_filepath, timestamp=None):
118
        """
119
        Upload file to Nextcloud storage
120
121
        Args:
122
            uid (str): uid of user
123
            local_filepath (str): path to file on local storage
124
            remote_filepath (str): path where to upload file on Nextcloud storage
125
            timestamp (int): timestamp of upload file. If None, get time by local file.
126
        """
127
        with open(local_filepath, 'rb') as f:
128
            file_contents = f.read()
129
        if timestamp is None:
130
            timestamp = int(os.path.getmtime(local_filepath))
131
        return self.upload_file_contents(uid, file_contents, remote_filepath, timestamp)
132
133
    def upload_file_contents(self, uid, file_contents, remote_filepath, timestamp=None):
134
        """
135
        Upload file to Nextcloud storage
136
137
        Args:
138
            uid (str): uid of user
139
            file_contents (bytes): Bytes the file to be uploaded consists of
140
            remote_filepath (str): path where to upload file on Nextcloud storage
141
            timestamp (int):  mtime of upload file
142
        """
143
        additional_url = "/".join([uid, remote_filepath])
144
        return self.requester.put_with_timestamp(additional_url, data=file_contents, timestamp=timestamp)
145
146
    def create_folder(self, uid, folder_path):
147
        """
148
        Create folder on Nextcloud storage
149
150
        Args:
151
            uid (str): uid of user
152
            folder_path (str): folder path
153
        """
154
        return self.requester.make_collection(additional_url="/".join([uid, folder_path]))
155
156
    def assure_folder_exists(self, uid, folder_path):
157
        """
158
        Create folder on Nextcloud storage, don't do anything if the folder already exists.
159
        Args:
160
            uid (str): uid of user
161
            folder_path (str): folder path
162
        Returns:
163
        """
164
        self.create_folder(uid, folder_path)
165
        return True
166
167
    def assure_tree_exists(self, uid, tree_path):
168
        """
169
        Make sure that the folder structure on Nextcloud storage exists
170
        Args:
171
            uid (str): uid of user
172
            folder_path (str): The folder tree
173
        Returns:
174
        """
175
        tree = pathlib.PurePath(tree_path)
176
        parents = list(tree.parents)
177
        ret = True
178
        subfolders = parents[:-1][::-1] + [tree]
179
        for subf in subfolders:
180
            ret = self.assure_folder_exists(uid, str(subf))
181
        return ret
182
183
    def delete_path(self, uid, path):
184
        """
185
        Delete file or folder with all content of given user by path
186
187
        Args:
188
            uid (str): uid of user
189
            path (str): file or folder path to delete
190
        """
191
        url = "/".join([uid, path])
192
        return self.requester.delete(url=url)
193
194
    def move_path(self, uid, path, destination_path, overwrite=False):
195
        """
196
        Move file or folder to destination
197
198
        Args:
199
            uid (str): uid of user
200
            path (str): file or folder path to move
201
            destionation_path (str): destination where to move
202
            overwrite (bool): allow destination path overriding
203
        """
204
        path_url = "/".join([uid, path])
205
        destination_path_url = "/".join([uid, destination_path])
206
        return self.requester.move(url=path_url,
207
                                   destination=destination_path_url, overwrite=overwrite)
208
209
    def copy_path(self, uid, path, destination_path, overwrite=False):
210
        """
211
        Copy file or folder to destination
212
213
        Args:
214
            uid (str): uid of user
215
            path (str): file or folder path to copy
216
            destionation_path (str): destination where to copy
217
            overwrite (bool): allow destination path overriding
218
        """
219
        path_url = "/".join([uid, path])
220
        destination_path_url = "/".join([uid, destination_path])
221
        return self.requester.copy(url=path_url,
222
                                   destination=destination_path_url, overwrite=overwrite)
223
224
    def set_favorites(self, uid, path):
225
        """
226
        Set files of a user favorite
227
228
        Args:
229
            uid (str): uid of user
230
            path (str): file or folder path to make favorite
231
        """
232
        data = """<?xml version="1.0"?>
233
        <d:propertyupdate xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
234
          <d:set>
235
                <d:prop>
236
                  <oc:favorite>1</oc:favorite>
237
                </d:prop>
238
          </d:set>
239
        </d:propertyupdate>
240
        """
241
        url = "/".join([uid, path])
242
        return self.requester.proppatch(additional_url=url, data=data)
243
244
    def list_favorites(self, uid, path=""):
245
        """
246
        Set files of a user favorite
247
248
        Args:
249
            uid (str): uid of user
250
            path (str): file or folder path to make favorite
251
        """
252
        data = """<?xml version="1.0"?>
253
        <oc:filter-files xmlns:d="DAV:"
254
                         xmlns:oc="http://owncloud.org/ns"
255
                         xmlns:nc="http://nextcloud.org/ns">
256
                 <oc:filter-rules>
257
                         <oc:favorite>1</oc:favorite>
258
                 </oc:filter-rules>
259
         </oc:filter-files>
260
        """
261
        url = "/".join([uid, path])
262
        res = self.requester.report(additional_url=url, data=data)
263
        if not res.is_ok:
264
            res.data = None
265
            return res
266
        response_xml_data = ET.fromstring(res.data)
267
        files_data = [File(single_file) for single_file in response_xml_data]
268
        res.data = files_data if not self.json_output else [each.as_dict() for each in files_data]
269
        return res
270
271
272
class File(object):
273
    SUCCESS_STATUS = 'HTTP/1.1 200 OK'
274
275
    # key is NextCloud property, value is python variable name
276
    FILE_PROPERTIES = {
277
        # d:
278
        "getlastmodified": "last_modified",
279
        "getetag": "etag",
280
        "getcontenttype": "content_type",
281
        "resourcetype": "resource_type",
282
        "getcontentlength": "content_length",
283
        # oc:
284
        "id": "id",
285
        "fileid": "file_id",
286
        "favorite": "favorite",
287
        "comments-href": "comments_href",
288
        "comments-count": "comments_count",
289
        "comments-unread": "comments_unread",
290
        "owner-id": "owner_id",
291
        "owner-display-name": "owner_display_name",
292
        "share-types": "share_types",
293
        "checksums": "check_sums",
294
        "size": "size",
295
        "href": "href",
296
        # nc:
297
        "has-preview": "has_preview",
298
    }
299
    xml_namespaces_map = {
300
        "d": "DAV:",
301
        "oc": "http://owncloud.org/ns",
302
        "nc": "http://nextcloud.org/ns"
303
    }
304
    COLLECTION_RESOURCE_TYPE = 'collection'
305
306
    def __init__(self, xml_data):
307
        self.href = xml_data.find('d:href', self.xml_namespaces_map).text
308
        for propstat in xml_data.iter('{DAV:}propstat'):
309
            if propstat.find('d:status', self.xml_namespaces_map).text != self.SUCCESS_STATUS:
310
                continue
311
            for file_property in propstat.find('d:prop', self.xml_namespaces_map):
312
                file_property_name = re.sub("{.*}", "", file_property.tag)
313
                if file_property_name not in self.FILE_PROPERTIES:
314
                    continue
315
                if file_property_name == 'resourcetype':
316
                    value = self._extract_resource_type(file_property)
317
                else:
318
                    value = file_property.text
319
                setattr(self, self.FILE_PROPERTIES[file_property_name], value)
320
321
    def _extract_resource_type(self, file_property):
322
        file_type = list(file_property)
323
        if file_type:
324
            return re.sub("{.*}", "", file_type[0].tag)
325
        return None
326
327
    def as_dict(self):
328
        return {key: value
329
                for key, value in self.__dict__.items()
330
                if key in self.FILE_PROPERTIES.values()}
331
332
333
class WebDAVStatusCodes(object):
334
    CREATED_CODE = 201
335
    NO_CONTENT_CODE = 204
336
    MULTISTATUS_CODE = 207
337
    ALREADY_EXISTS_CODE = 405
338
    PRECONDITION_FAILED_CODE = 412
339
340
341
def timestamp_to_epoch_time(rfc1123_date=""):
342
    """
343
    literal date time string (use in DAV:getlastmodified) to Epoch time
344
345
    No longer, Only rfc1123-date productions are legal as values for DAV:getlastmodified
346
    However, the value may be broken or invalid.
347
348
    Args:
349
        rfc1123_date (str): rfc1123-date (defined in RFC2616)
350
    Return:
351
        int or None : Epoch time, if date string value is invalid return None
352
    """
353
    try:
354
        epoch_time = datetime.strptime(rfc1123_date, '%a, %d %b %Y %H:%M:%S GMT').timestamp()
355
    except ValueError:
356
        # validation error (DAV:getlastmodified property is broken or invalid)
357
        return None
358
    return int(epoch_time)
359