Passed
Pull Request — master (#63)
by
unknown
01:27
created

timestamp_to_epoch_time()   A

Complexity

Conditions 2

Size

Total Lines 18
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 18
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
# -*- coding: utf-8 -*-
2
import re
3
import os
4
import pathlib
5
6
import xml.etree.ElementTree as ET
7
8
from datetime import datetime
9
from nextcloud.base import WithRequester
10
11
12
class WebDAV(WithRequester):
13
14
    API_URL = "/remote.php/dav/files"
15
16
    def __init__(self, *args, **kwargs):
17
        super(WebDAV, self).__init__(*args)
18
        self.json_output = kwargs.get('json_output')
19
20
    def list_folders(self, uid, path=None, depth=1, all_properties=False):
21
        """
22
        Get path files list with files properties for given user, with given depth
23
24
        Args:
25
            uid (str): uid of user
26
            path (str/None): files path
27
            depth (int): depth of listing files (directories content for example)
28
            all_properties (bool): list all available file properties in Nextcloud
29
30
        Returns:
31
            list of dicts if json_output
32
            list of File objects if not json_output
33
        """
34
        if all_properties:
35
            data = """<?xml version="1.0"?>
36
                <d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"
37
                            xmlns:nc="http://nextcloud.org/ns">
38
                  <d:prop>
39
                        <d:getlastmodified />
40
                        <d:getetag />
41
                        <d:getcontenttype />
42
                        <d:resourcetype />
43
                        <oc:fileid />
44
                        <oc:permissions />
45
                        <oc:size />
46
                        <d:getcontentlength />
47
                        <nc:has-preview />
48
                        <oc:favorite />
49
                        <oc:comments-unread />
50
                        <oc:owner-display-name />
51
                        <oc:share-types />
52
                  </d:prop>
53
                </d:propfind>
54
            """
55
        else:
56
            data = None
57
        additional_url = uid
58
        if path:
59
            additional_url = "{}/{}".format(additional_url, path)
60
        resp = self.requester.propfind(additional_url=additional_url,
61
                                       headers={"Depth": str(depth)},
62
                                       data=data)
63
        if not resp.is_ok:
64
            resp.data = None
65
            return resp
66
        response_data = resp.data
67
        response_xml_data = ET.fromstring(response_data)
68
        files_data = [File(single_file) for single_file in response_xml_data]
69
        resp.data = files_data if not self.json_output else [each.as_dict() for each in files_data]
70
        return resp
71
72
    def download_file(self, uid, path, download_path, overwrite=False):
73
        """
74
        Download file of given user by path
75
        File will be saved to working directory
76
        path argument must be valid file path
77
        Modified time of saved file will be synced with the file properties in Nextcloud
78
79
        Exception will be raised if:
80
            * path doesn't exist,
81
            * path is a directory, or if
82
            * file with same name already exists in working directory
83
84
        Args:
85
            uid (str): uid of user
86
            path (str): file path
87
            download_path (str): directory where the files will be downloaded
88
            overwrite (bool): overwrite files if the name is the same
89
90
        Returns:
91
            None
92
        """
93
        additional_url = "/".join([uid, path])
94
        filename = path.split('/')[-1] if '/' in path else path
95
        file_data = self.list_folders(uid=uid, path=path, depth=0)
96
97
        if not file_data:
98
            raise ValueError("Given path doesn't exist")
99
100
        file_resource_type = (file_data.data[0].get('resource_type')
101
                              if self.json_output
102
                              else file_data.data[0].resource_type)
103
104
        if file_resource_type == File.COLLECTION_RESOURCE_TYPE:
105
            raise ValueError("This is a collection, please specify file path")
106
107
        if filename in os.listdir(download_path) and not overwrite:
108
            raise ValueError("File with such name already exists in this directory")
109
110
        res = self.requester.download(additional_url)
111
112
        file_path = os.path.join(download_path, filename)
113
        with open(file_path, 'wb') as f:
114
            if type(res.data) == str:
115
                f.write(res.data.encode())
116
            else:
117
                f.write(res.data)
118
119
        # get timestamp of downloaded file from file property on Nextcloud
120
        # If it succeeded, set the timestamp to saved local file
121
        # If the timestamp string is invalid or broken, the timestamp is downloaded time.
122
        file_timestamp_str = (file_data.data[0].get('last_modified')
123
                              if self.json_output
124
                              else file_data.data[0].last_modified)
125
        file_timestamp = timestamp_to_epoch_time(file_timestamp_str)
126
        if isinstance(file_timestamp, int):
127
            os.utime(file_path, (datetime.now().timestamp(), file_timestamp))
128
129
    def upload_file(self, uid, local_filepath, remote_filepath, timestamp=None):
130
        """
131
        Upload file to Nextcloud storage
132
133
        Args:
134
            uid (str): uid of user
135
            local_filepath (str): path to file on local storage
136
            remote_filepath (str): path where to upload file on Nextcloud storage
137
            timestamp (int): timestamp of upload file. If None, get time by local file.
138
        """
139
        with open(local_filepath, 'rb') as f:
140
            file_contents = f.read()
141
        if timestamp is None:
142
            timestamp = int(os.path.getmtime(local_filepath))
143
        return self.upload_file_contents(uid, file_contents, remote_filepath, timestamp)
144
145
    def upload_file_contents(self, uid, file_contents, remote_filepath, timestamp=None):
146
        """
147
        Upload file to Nextcloud storage
148
149
        Args:
150
            uid (str): uid of user
151
            file_contents (bytes): Bytes the file to be uploaded consists of
152
            remote_filepath (str): path where to upload file on Nextcloud storage
153
            timestamp (int):  mtime of upload file
154
        """
155
        additional_url = "/".join([uid, remote_filepath])
156
        return self.requester.put_with_timestamp(additional_url, data=file_contents, timestamp=timestamp)
157
158
    def create_folder(self, uid, folder_path):
159
        """
160
        Create folder on Nextcloud storage
161
162
        Args:
163
            uid (str): uid of user
164
            folder_path (str): folder path
165
        """
166
        return self.requester.make_collection(additional_url="/".join([uid, folder_path]))
167
168
    def assure_folder_exists(self, uid, folder_path):
169
        """
170
        Create folder on Nextcloud storage, don't do anything if the folder already exists.
171
        Args:
172
            uid (str): uid of user
173
            folder_path (str): folder path
174
        Returns:
175
        """
176
        self.create_folder(uid, folder_path)
177
        return True
178
179
    def assure_tree_exists(self, uid, tree_path):
180
        """
181
        Make sure that the folder structure on Nextcloud storage exists
182
        Args:
183
            uid (str): uid of user
184
            folder_path (str): The folder tree
185
        Returns:
186
        """
187
        tree = pathlib.PurePath(tree_path)
188
        parents = list(tree.parents)
189
        ret = True
190
        subfolders = parents[:-1][::-1] + [tree]
191
        for subf in subfolders:
192
            ret = self.assure_folder_exists(uid, str(subf))
193
        return ret
194
195
    def delete_path(self, uid, path):
196
        """
197
        Delete file or folder with all content of given user by path
198
199
        Args:
200
            uid (str): uid of user
201
            path (str): file or folder path to delete
202
        """
203
        url = "/".join([uid, path])
204
        return self.requester.delete(url=url)
205
206
    def move_path(self, uid, path, destination_path, overwrite=False):
207
        """
208
        Move file or folder to destination
209
210
        Args:
211
            uid (str): uid of user
212
            path (str): file or folder path to move
213
            destionation_path (str): destination where to move
214
            overwrite (bool): allow destination path overriding
215
        """
216
        path_url = "/".join([uid, path])
217
        destination_path_url = "/".join([uid, destination_path])
218
        return self.requester.move(url=path_url,
219
                                   destination=destination_path_url, overwrite=overwrite)
220
221
    def copy_path(self, uid, path, destination_path, overwrite=False):
222
        """
223
        Copy file or folder to destination
224
225
        Args:
226
            uid (str): uid of user
227
            path (str): file or folder path to copy
228
            destionation_path (str): destination where to copy
229
            overwrite (bool): allow destination path overriding
230
        """
231
        path_url = "/".join([uid, path])
232
        destination_path_url = "/".join([uid, destination_path])
233
        return self.requester.copy(url=path_url,
234
                                   destination=destination_path_url, overwrite=overwrite)
235
236
    def set_favorites(self, uid, path):
237
        """
238
        Set files of a user favorite
239
240
        Args:
241
            uid (str): uid of user
242
            path (str): file or folder path to make favorite
243
        """
244
        data = """<?xml version="1.0"?>
245
        <d:propertyupdate xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns">
246
          <d:set>
247
                <d:prop>
248
                  <oc:favorite>1</oc:favorite>
249
                </d:prop>
250
          </d:set>
251
        </d:propertyupdate>
252
        """
253
        url = "/".join([uid, path])
254
        return self.requester.proppatch(additional_url=url, data=data)
255
256
    def list_favorites(self, uid, path=""):
257
        """
258
        Set files of a user favorite
259
260
        Args:
261
            uid (str): uid of user
262
            path (str): file or folder path to make favorite
263
        """
264
        data = """<?xml version="1.0"?>
265
        <oc:filter-files xmlns:d="DAV:"
266
                         xmlns:oc="http://owncloud.org/ns"
267
                         xmlns:nc="http://nextcloud.org/ns">
268
                 <oc:filter-rules>
269
                         <oc:favorite>1</oc:favorite>
270
                 </oc:filter-rules>
271
         </oc:filter-files>
272
        """
273
        url = "/".join([uid, path])
274
        res = self.requester.report(additional_url=url, data=data)
275
        if not res.is_ok:
276
            res.data = None
277
            return res
278
        response_xml_data = ET.fromstring(res.data)
279
        files_data = [File(single_file) for single_file in response_xml_data]
280
        res.data = files_data if not self.json_output else [each.as_dict() for each in files_data]
281
        return res
282
283
284
class File(object):
285
    SUCCESS_STATUS = 'HTTP/1.1 200 OK'
286
287
    # key is NextCloud property, value is python variable name
288
    FILE_PROPERTIES = {
289
        # d:
290
        "getlastmodified": "last_modified",
291
        "getetag": "etag",
292
        "getcontenttype": "content_type",
293
        "resourcetype": "resource_type",
294
        "getcontentlength": "content_length",
295
        # oc:
296
        "id": "id",
297
        "fileid": "file_id",
298
        "favorite": "favorite",
299
        "comments-href": "comments_href",
300
        "comments-count": "comments_count",
301
        "comments-unread": "comments_unread",
302
        "owner-id": "owner_id",
303
        "owner-display-name": "owner_display_name",
304
        "share-types": "share_types",
305
        "checksums": "check_sums",
306
        "size": "size",
307
        "href": "href",
308
        # nc:
309
        "has-preview": "has_preview",
310
    }
311
    xml_namespaces_map = {
312
        "d": "DAV:",
313
        "oc": "http://owncloud.org/ns",
314
        "nc": "http://nextcloud.org/ns"
315
    }
316
    COLLECTION_RESOURCE_TYPE = 'collection'
317
318
    def __init__(self, xml_data):
319
        self.href = xml_data.find('d:href', self.xml_namespaces_map).text
320
        for propstat in xml_data.iter('{DAV:}propstat'):
321
            if propstat.find('d:status', self.xml_namespaces_map).text != self.SUCCESS_STATUS:
322
                continue
323
            for file_property in propstat.find('d:prop', self.xml_namespaces_map):
324
                file_property_name = re.sub("{.*}", "", file_property.tag)
325
                if file_property_name not in self.FILE_PROPERTIES:
326
                    continue
327
                if file_property_name == 'resourcetype':
328
                    value = self._extract_resource_type(file_property)
329
                else:
330
                    value = file_property.text
331
                setattr(self, self.FILE_PROPERTIES[file_property_name], value)
332
333
    def _extract_resource_type(self, file_property):
334
        file_type = list(file_property)
335
        if file_type:
336
            return re.sub("{.*}", "", file_type[0].tag)
337
        return None
338
339
    def as_dict(self):
340
        return {key: value
341
                for key, value in self.__dict__.items()
342
                if key in self.FILE_PROPERTIES.values()}
343
344
345
class WebDAVStatusCodes(object):
346
    CREATED_CODE = 201
347
    NO_CONTENT_CODE = 204
348
    MULTISTATUS_CODE = 207
349
    ALREADY_EXISTS_CODE = 405
350
    PRECONDITION_FAILED_CODE = 412
351
352
353
def timestamp_to_epoch_time(rfc1123_date=""):
354
    """
355
    literal date time string (use in DAV:getlastmodified) to Epoch time
356
357
    No longer, Only rfc1123-date productions are legal as values for DAV:getlastmodified
358
    However, the value may be broken or invalid.
359
360
    Args:
361
        rfc1123_date (str): rfc1123-date (defined in RFC2616)
362
    Return:
363
        int or None : Epoch time, if date string value is invalid return None
364
    """
365
    try:
366
        epoch_time = datetime.strptime(rfc1123_date, '%a, %d %b %Y %H:%M:%S GMT').timestamp()
367
    except ValueError:
368
        # validation error (DAV:getlastmodified property is broken or invalid)
369
        return None
370
    return int(epoch_time)
371